You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/11/25 20:25:05 UTC
[24/42] incubator-streams git commit: STREAMS-440: custom
checkstyle.xml, address compliance
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
index d7dc918..9de1863 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
@@ -18,14 +18,6 @@
package org.apache.streams.rss.provider;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Uninterruptibles;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
@@ -37,6 +29,15 @@ import org.apache.streams.rss.FeedDetails;
import org.apache.streams.rss.RssStreamConfiguration;
import org.apache.streams.rss.provider.perpetual.RssFeedScheduler;
import org.apache.streams.util.ComponentUtils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,173 +62,159 @@ import java.util.concurrent.atomic.AtomicBoolean;
/**
* RSS {@link org.apache.streams.core.StreamsProvider} that provides content from rss feeds in boilerpipe format
- *
- * To use from command line:
- *
- * Supply configuration similar to src/test/resources/rss.conf
- *
- * Launch using:
- *
- * mvn exec:java -Dexec.mainClass=org.apache.streams.rss.provider.RssStreamProvider -Dexec.args="rss.conf articles.json"
*/
public class RssStreamProvider implements StreamsProvider {
- public static final String STREAMS_ID = "RssStreamProvider";
-
- private final static Logger LOGGER = LoggerFactory.getLogger(RssStreamProvider.class);
-
- private final static int MAX_SIZE = 1000;
-
- private RssStreamConfiguration config;
- private boolean perpetual;
- private ExecutorService executor;
- private BlockingQueue<StreamsDatum> dataQueue;
- private AtomicBoolean isComplete;
-
- @VisibleForTesting
- protected RssFeedScheduler scheduler;
-
- public RssStreamProvider() {
- this(new ComponentConfigurator<>(RssStreamConfiguration.class)
- .detectConfiguration(StreamsConfigurator.getConfig().getConfig("rss")), false);
- }
-
- public RssStreamProvider(boolean perpetual) {
- this(new ComponentConfigurator<>(RssStreamConfiguration.class)
- .detectConfiguration(StreamsConfigurator.getConfig().getConfig("rss")), perpetual);
- }
-
- public RssStreamProvider(RssStreamConfiguration config) {
- this(config, false);
- }
-
- public RssStreamProvider(RssStreamConfiguration config, boolean perpetual) {
- this.perpetual = perpetual;
- this.config = config;
- }
-
- @Override
- public String getId() {
- return STREAMS_ID;
- }
-
- public void setConfig(RssStreamConfiguration config) {
- this.config = config;
- }
-
- public void setRssFeeds(Set<String> urlFeeds) {
- }
-
- public void setRssFeeds(Map<String, Long> feeds) {
- if(this.config == null) {
- this.config = new RssStreamConfiguration();
- }
- List<FeedDetails> feedDetails = new ArrayList<>();
- for(String feed : feeds.keySet()) {
- Long delay = feeds.get(feed);
- FeedDetails detail = new FeedDetails();
- detail.setUrl(feed);
- detail.setPollIntervalMillis(delay);
- feedDetails.add(detail);
- }
- this.config.setFeeds(feedDetails);
- }
-
- @Override
- public void startStream() {
- LOGGER.trace("Starting Rss Scheduler");
- this.executor.submit(this.scheduler);
- }
-
- @Override
- public StreamsResultSet readCurrent() {
- Queue<StreamsDatum> batch = new ConcurrentLinkedQueue<>();
- int batchSize = 0;
- while(!this.dataQueue.isEmpty() && batchSize < MAX_SIZE) {
- StreamsDatum datum = ComponentUtils.pollWhileNotEmpty(this.dataQueue);
- if(datum != null) {
- ++batchSize;
- batch.add(datum);
- }
+ public static final String STREAMS_ID = "RssStreamProvider";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(RssStreamProvider.class);
+
+ private static final int MAX_SIZE = 1000;
+
+ private RssStreamConfiguration config;
+ private boolean perpetual;
+ private ExecutorService executor;
+ private BlockingQueue<StreamsDatum> dataQueue;
+ private AtomicBoolean isComplete;
+
+ @VisibleForTesting
+ protected RssFeedScheduler scheduler;
+
+ public RssStreamProvider() {
+ this(new ComponentConfigurator<>(RssStreamConfiguration.class)
+ .detectConfiguration(StreamsConfigurator.getConfig().getConfig("rss")), false);
+ }
+
+ public RssStreamProvider(boolean perpetual) {
+ this(new ComponentConfigurator<>(RssStreamConfiguration.class)
+ .detectConfiguration(StreamsConfigurator.getConfig().getConfig("rss")), perpetual);
+ }
+
+ public RssStreamProvider(RssStreamConfiguration config) {
+ this(config, false);
+ }
+
+ public RssStreamProvider(RssStreamConfiguration config, boolean perpetual) {
+ this.perpetual = perpetual;
+ this.config = config;
+ }
+
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
+
+ @Override
+ public void startStream() {
+ LOGGER.trace("Starting Rss Scheduler");
+ this.executor.submit(this.scheduler);
+ }
+
+ @Override
+ public StreamsResultSet readCurrent() {
+ Queue<StreamsDatum> batch = new ConcurrentLinkedQueue<>();
+ int batchSize = 0;
+ while (!this.dataQueue.isEmpty() && batchSize < MAX_SIZE) {
+ StreamsDatum datum = ComponentUtils.pollWhileNotEmpty(this.dataQueue);
+ if (datum != null) {
+ ++batchSize;
+ batch.add(datum);
+ }
+ }
+ this.isComplete.set(this.scheduler.isComplete() && batch.isEmpty() && this.dataQueue.isEmpty());
+ return new StreamsResultSet(batch);
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ return null;
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ return null;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return !this.isComplete.get();
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ this.executor = new ThreadPoolExecutor(1, 4, 15L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+ this.dataQueue = new LinkedBlockingQueue<>();
+ this.scheduler = getScheduler(this.dataQueue);
+ this.isComplete = new AtomicBoolean(false);
+ int consecutiveEmptyReads = 0;
+ }
+
+ @VisibleForTesting
+ protected RssFeedScheduler getScheduler(BlockingQueue<StreamsDatum> queue) {
+ if (this.perpetual) {
+ return new RssFeedScheduler(this.executor, this.config.getFeeds(), queue);
+ } else {
+ return new RssFeedScheduler(this.executor, this.config.getFeeds(), queue, 0);
+ }
+ }
+
+ @Override
+ public void cleanUp() {
+ this.scheduler.stop();
+ ComponentUtils.shutdownExecutor(this.executor, 10, 10);
+ }
+
+ /**
+ * To use from command line:
+ *
+ * <p/>
+ * Supply configuration similar to src/test/resources/rss.conf
+ *
+ * <p/>
+ * Launch using:
+ *
+ * <p/>
+ * mvn exec:java -Dexec.mainClass=org.apache.streams.rss.provider.RssStreamProvider -Dexec.args="rss.conf articles.json"
+ * @param args args
+ * @throws Exception Exception
+ */
+ public static void main(String[] args) throws Exception {
+
+ Preconditions.checkArgument(args.length >= 2);
+
+ String configfile = args[0];
+ String outfile = args[1];
+
+ Config reference = ConfigFactory.load();
+ File file = new File(configfile);
+ assert (file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+
+ StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+ RssStreamConfiguration config = new ComponentConfigurator<>(RssStreamConfiguration.class).detectConfiguration(typesafe, "rss");
+ RssStreamProvider provider = new RssStreamProvider(config);
+
+ ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+ provider.prepare(config);
+ provider.startStream();
+ do {
+ Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+ for (StreamsDatum datum : provider.readCurrent()) {
+ String json;
+ try {
+ json = mapper.writeValueAsString(datum.getDocument());
+ outStream.println(json);
+ } catch (JsonProcessingException ex) {
+ System.err.println(ex.getMessage());
}
- this.isComplete.set(this.scheduler.isComplete() && batch.isEmpty() && this.dataQueue.isEmpty());
- return new StreamsResultSet(batch);
- }
-
- @Override
- public StreamsResultSet readNew(BigInteger sequence) {
- return null;
- }
-
- @Override
- public StreamsResultSet readRange(DateTime start, DateTime end) {
- return null;
- }
-
- @Override
- public boolean isRunning() {
- return !this.isComplete.get();
- }
-
- @Override
- public void prepare(Object configurationObject) {
- this.executor = new ThreadPoolExecutor(1, 4, 15L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
- this.dataQueue = new LinkedBlockingQueue<>();
- this.scheduler = getScheduler(this.dataQueue);
- this.isComplete = new AtomicBoolean(false);
- int consecutiveEmptyReads = 0;
- }
-
- @VisibleForTesting
- protected RssFeedScheduler getScheduler(BlockingQueue<StreamsDatum> queue) {
- if(this.perpetual)
- return new RssFeedScheduler(this.executor, this.config.getFeeds(), queue);
- else
- return new RssFeedScheduler(this.executor, this.config.getFeeds(), queue, 0);
- }
-
- @Override
- public void cleanUp() {
- this.scheduler.stop();
- ComponentUtils.shutdownExecutor(this.executor, 10, 10);
- }
-
- public static void main(String[] args) throws Exception {
-
- Preconditions.checkArgument(args.length >= 2);
-
- String configfile = args[0];
- String outfile = args[1];
-
- Config reference = ConfigFactory.load();
- File conf_file = new File(configfile);
- assert(conf_file.exists());
- Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-
- Config typesafe = testResourceConfig.withFallback(reference).resolve();
-
- StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
- RssStreamConfiguration config = new ComponentConfigurator<>(RssStreamConfiguration.class).detectConfiguration(typesafe, "rss");
- RssStreamProvider provider = new RssStreamProvider(config);
-
- ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
- PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
- provider.prepare(config);
- provider.startStream();
- do {
- Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
- for (StreamsDatum datum : provider.readCurrent()) {
- String json;
- try {
- json = mapper.writeValueAsString(datum.getDocument());
- outStream.println(json);
- } catch (JsonProcessingException e) {
- System.err.println(e.getMessage());
- }
- }
- } while( provider.isRunning());
- provider.cleanUp();
- outStream.flush();
+ }
}
+ while ( provider.isRunning());
+ provider.cleanUp();
+ outStream.flush();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProviderTask.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProviderTask.java
index 3800a51..03a66d1 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProviderTask.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProviderTask.java
@@ -18,27 +18,23 @@
package org.apache.streams.rss.provider;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.data.util.RFC3339Utils;
+import org.apache.streams.rss.serializer.SyndEntrySerializer;
+
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
import com.sun.syndication.feed.synd.SyndEntry;
import com.sun.syndication.feed.synd.SyndFeed;
import com.sun.syndication.io.FeedException;
import com.sun.syndication.io.SyndFeedInput;
-import com.sun.syndication.io.XmlReader;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.data.util.RFC3339Utils;
-import org.apache.streams.rss.FeedDetails;
-import org.apache.streams.rss.serializer.SyndEntrySerializer;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStreamReader;
-import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.util.Collections;
@@ -50,15 +46,19 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* A {@link java.lang.Runnable} task that queues rss feed data.
*
+ * <p/>
* <code>RssStreamProviderTask</code> reads the content of an rss feed and queues the articles from
* the feed inform of a {@link com.fasterxml.jackson.databind.node.ObjectNode} wrapped in a {@link org.apache.streams.core.StreamsDatum}.
* The task can filter articles by a published date. If the task cannot parse the date of the article or the article does not contain a
* published date, by default the task will attempt to queue article.
*
- * A task can be run in perpetual mode which will store the article urls in a static variable. The next time a <code>RssStreamProviderTask</code>
- * is run, it will not queue data that was seen the previous time the rss feed was read. This is an attempt to reduce
- * multiple copies of an article from being out put by a {@link org.apache.streams.rss.provider.RssStreamProvider}.
+ * <p/>
+ * A task can be run in perpetual mode which will store the article urls in a static variable. The next time a
+ * <code>RssStreamProviderTask</code> is run, it will not queue data that was seen the previous time the rss feed was read.
+ * This is an attempt to reduce multiple copies of an article from being output by a
+ * {@link org.apache.streams.rss.provider.RssStreamProvider}.
*
+ * <p/>
* ** Warning! **
* It still is possible to output multiples of the same article. If multiple tasks executions for the same rss feed overlap
* in execution time, it possible that the previously seen articles static variable will not have been updated in time.
@@ -66,183 +66,187 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class RssStreamProviderTask implements Runnable {
- private final static Logger LOGGER = LoggerFactory.getLogger(RssStreamProviderTask.class);
- private static final int DEFAULT_TIME_OUT = 10000; // 10 seconds
- private static final String RSS_KEY = "rssFeed";
- private static final String URI_KEY = "uri";
- private static final String LINK_KEY = "link";
- private static final String DATE_KEY = "publishedDate";
-
- /**
- * Map that contains the Set of previously seen articles by an rss feed.
- */
- @VisibleForTesting
- protected static final Map<String, Set<String>> PREVIOUSLY_SEEN = new ConcurrentHashMap<>();
-
-
- private BlockingQueue<StreamsDatum> dataQueue;
- private String rssFeed;
- private int timeOut;
- private SyndEntrySerializer serializer;
- private DateTime publishedSince;
- private boolean perpetual;
-
-
- /**
- * Non-perpetual mode, no date filter, time out of 10 sec
- * @see {@link org.apache.streams.rss.provider.RssStreamProviderTask#RssStreamProviderTask(java.util.concurrent.BlockingQueue, String, org.joda.time.DateTime, int, boolean)}
- * @param queue
- * @param rssFeed
- */
- public RssStreamProviderTask(BlockingQueue<StreamsDatum> queue, String rssFeed) {
- this(queue, rssFeed, new DateTime().minusYears(30), DEFAULT_TIME_OUT, false);
- }
-
- /**
- * Non-perpetual mode, no date filter
- * @see {@link org.apache.streams.rss.provider.RssStreamProviderTask#RssStreamProviderTask(java.util.concurrent.BlockingQueue, String, org.joda.time.DateTime, int, boolean)}
- * @param queue
- * @param rssFeed
- * @param timeOut
- */
- public RssStreamProviderTask(BlockingQueue<StreamsDatum> queue, String rssFeed, int timeOut) {
- this(queue, rssFeed, new DateTime().minusYears(30), timeOut, false);
- }
-
- /**
- * Non-perpetual mode, time out of 10 sec
- * @see {@link org.apache.streams.rss.provider.RssStreamProviderTask#RssStreamProviderTask(java.util.concurrent.BlockingQueue, String, org.joda.time.DateTime, int, boolean)}
- * @param queue
- * @param rssFeed
- * @param publishedSince
- */
- public RssStreamProviderTask(BlockingQueue<StreamsDatum> queue, String rssFeed, DateTime publishedSince) {
- this(queue, rssFeed, publishedSince, DEFAULT_TIME_OUT, false);
- }
-
- /**
- * RssStreamProviderTask that reads an rss feed url and queues the resulting articles as StreamsDatums with the documents
- * being object nodes.
- * @param queue Queue to push data to
- * @param rssFeed url of rss feed to read
- * @param publishedSince DateTime to filter articles by, will queue articles with published times after this
- * @param timeOut url connection timeout in milliseconds
- * @param perpetual true, if you want to run in perpetual mode. NOT RECOMMENDED
- */
- public RssStreamProviderTask(BlockingQueue<StreamsDatum> queue, String rssFeed, DateTime publishedSince, int timeOut, boolean perpetual) {
- this.dataQueue = queue;
- this.rssFeed = rssFeed;
- this.timeOut = timeOut;
- this.publishedSince = publishedSince;
- this.serializer = new SyndEntrySerializer();
- this.perpetual = perpetual;
- }
-
- /**
- * The rss feed url that this task is responsible for reading
- * @return rss feed url
- */
- public String getRssFeed() {
- return this.rssFeed;
- }
-
- @Override
- public void run() {
- try {
- Set<String> batch = queueFeedEntries(new URL(this.rssFeed));
- if(this.perpetual)
- PREVIOUSLY_SEEN.put(this.getRssFeed(), batch);
- } catch (IOException | FeedException e) {
- LOGGER.warn("Exception while reading rss stream, {} : {}", this.rssFeed, e);
- }
+ private static final Logger LOGGER = LoggerFactory.getLogger(RssStreamProviderTask.class);
+ private static final int DEFAULT_TIME_OUT = 10000; // 10 seconds
+ private static final String RSS_KEY = "rssFeed";
+ private static final String URI_KEY = "uri";
+ private static final String LINK_KEY = "link";
+ private static final String DATE_KEY = "publishedDate";
+
+ /**
+ * Map that contains the Set of previously seen articles by an rss feed.
+ */
+ @VisibleForTesting
+ protected static final Map<String, Set<String>> PREVIOUSLY_SEEN = new ConcurrentHashMap<>();
+
+
+ private BlockingQueue<StreamsDatum> dataQueue;
+ private String rssFeed;
+ private int timeOut;
+ private SyndEntrySerializer serializer;
+ private DateTime publishedSince;
+ private boolean perpetual;
+
+
+ /**
+ * Non-perpetual mode, no date filter, time out of 10 sec
+ * @see {@link org.apache.streams.rss.provider.RssStreamProviderTask
+ * #RssStreamProviderTask(java.util.concurrent.BlockingQueue, String, org.joda.time.DateTime, int, boolean)}
+ * @param queue queue
+ * @param rssFeed rssFeed
+ */
+ public RssStreamProviderTask(BlockingQueue<StreamsDatum> queue, String rssFeed) {
+ this(queue, rssFeed, new DateTime().minusYears(30), DEFAULT_TIME_OUT, false);
+ }
+
+ /**
+ * Non-perpetual mode, no date filter.
+ * @see {@link org.apache.streams.rss.provider.RssStreamProviderTask
+ * #RssStreamProviderTask(java.util.concurrent.BlockingQueue, String, org.joda.time.DateTime, int, boolean)}
+ * @param queue queue
+ * @param rssFeed rssFeed
+ * @param timeOut timeOut
+ */
+ public RssStreamProviderTask(BlockingQueue<StreamsDatum> queue, String rssFeed, int timeOut) {
+ this(queue, rssFeed, new DateTime().minusYears(30), timeOut, false);
+ }
+
+ /**
+ * Non-perpetual mode, time out of 10 sec
+ * @see {@link org.apache.streams.rss.provider.RssStreamProviderTask
+ * #RssStreamProviderTask(java.util.concurrent.BlockingQueue, String, org.joda.time.DateTime, int, boolean)}
+ * @param queue queue
+ * @param rssFeed rssFeed
+ * @param publishedSince publishedSince
+ */
+ public RssStreamProviderTask(BlockingQueue<StreamsDatum> queue, String rssFeed, DateTime publishedSince) {
+ this(queue, rssFeed, publishedSince, DEFAULT_TIME_OUT, false);
+ }
+
+ /**
+ * RssStreamProviderTask that reads an rss feed url and queues the resulting articles as StreamsDatums with the documents
+ * being object nodes.
+ * @param queue Queue to push data to
+ * @param rssFeed url of rss feed to read
+ * @param publishedSince DateTime to filter articles by, will queue articles with published times after this
+ * @param timeOut url connection timeout in milliseconds
+ * @param perpetual true, if you want to run in perpetual mode. NOT RECOMMENDED
+ */
+ public RssStreamProviderTask(BlockingQueue<StreamsDatum> queue, String rssFeed, DateTime publishedSince, int timeOut, boolean perpetual) {
+ this.dataQueue = queue;
+ this.rssFeed = rssFeed;
+ this.timeOut = timeOut;
+ this.publishedSince = publishedSince;
+ this.serializer = new SyndEntrySerializer();
+ this.perpetual = perpetual;
+ }
+
+ /**
+ * The rss feed url that this task is responsible for reading.
+ * @return rss feed url
+ */
+ public String getRssFeed() {
+ return this.rssFeed;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Set<String> batch = queueFeedEntries(new URL(this.rssFeed));
+ if (this.perpetual) {
+ PREVIOUSLY_SEEN.put(this.getRssFeed(), batch);
+ }
+ } catch (IOException | FeedException ex) {
+ LOGGER.warn("Exception while reading rss stream, {} : {}", this.rssFeed, ex);
}
-
- /**
- * Reads the url and queues the data
- * @param feedUrl rss feed url
- * @return set of all article urls that were read from the feed
- * @throws IOException when it cannot connect to the url or the url is malformed
- * @throws FeedException when it cannot reed the feed.
- */
- @VisibleForTesting
- protected Set<String> queueFeedEntries(URL feedUrl) throws IOException, FeedException {
-
- // ConcurrentHashSet is preferable, but it's only in guava 15+
- // spark 1.5.0 uses guava 14 so for the moment this is the workaround
- Set<String> batch = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
- URLConnection connection = feedUrl.openConnection();
- connection.setConnectTimeout(this.timeOut);
- connection.setConnectTimeout(this.timeOut);
- SyndFeedInput input = new SyndFeedInput();
- SyndFeed feed = input.build(new InputStreamReader(connection.getInputStream()));
- for (Object entryObj : feed.getEntries()) {
- SyndEntry entry = (SyndEntry) entryObj;
- ObjectNode nodeEntry = this.serializer.deserialize(entry);
- nodeEntry.put(RSS_KEY, this.rssFeed);
- String entryId = determineId(nodeEntry);
- batch.add(entryId);
- StreamsDatum datum = new StreamsDatum(nodeEntry);
- try {
- JsonNode published = nodeEntry.get(DATE_KEY);
- if (published != null) {
- try {
- DateTime date = RFC3339Utils.parseToUTC(published.asText());
- if (date.isAfter(this.publishedSince) && (!this.perpetual || !seenBefore(entryId, this.rssFeed))) {
- this.dataQueue.put(datum);
- LOGGER.debug("Added entry, {}, to provider queue.", entryId);
- }
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- } catch (Exception e) {
- LOGGER.trace("Failed to parse date from object node, attempting to add node to queue by default.");
- if(!this.perpetual || !seenBefore(entryId, this.rssFeed)) {
- this.dataQueue.put(datum);
- LOGGER.debug("Added entry, {}, to provider queue.", entryId);
- }
- }
- } else {
- LOGGER.debug("No published date present, attempting to add node to queue by default.");
- if(!this.perpetual || !seenBefore(entryId, this.rssFeed)) {
- this.dataQueue.put(datum);
- LOGGER.debug("Added entry, {}, to provider queue.", entryId);
- }
- }
- } catch (InterruptedException ie) {
- LOGGER.error("Interupted Exception.");
- Thread.currentThread().interrupt();
+ }
+
+ /**
+ * Reads the url and queues the data
+ * @param feedUrl rss feed url
+ * @return set of all article urls that were read from the feed
+ * @throws IOException when it cannot connect to the url or the url is malformed
+ * @throws FeedException when it cannot reed the feed.
+ */
+ @VisibleForTesting
+ protected Set<String> queueFeedEntries(URL feedUrl) throws IOException, FeedException {
+
+ // ConcurrentHashSet is preferable, but it's only in guava 15+
+ // spark 1.5.0 uses guava 14 so for the moment this is the workaround
+ Set<String> batch = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+ URLConnection connection = feedUrl.openConnection();
+ connection.setConnectTimeout(this.timeOut);
+ connection.setConnectTimeout(this.timeOut);
+ SyndFeedInput input = new SyndFeedInput();
+ SyndFeed feed = input.build(new InputStreamReader(connection.getInputStream()));
+ for (Object entryObj : feed.getEntries()) {
+ SyndEntry entry = (SyndEntry) entryObj;
+ ObjectNode nodeEntry = this.serializer.deserialize(entry);
+ nodeEntry.put(RSS_KEY, this.rssFeed);
+ String entryId = determineId(nodeEntry);
+ batch.add(entryId);
+ StreamsDatum datum = new StreamsDatum(nodeEntry);
+ try {
+ JsonNode published = nodeEntry.get(DATE_KEY);
+ if (published != null) {
+ try {
+ DateTime date = RFC3339Utils.parseToUTC(published.asText());
+ if (date.isAfter(this.publishedSince) && (!this.perpetual || !seenBefore(entryId, this.rssFeed))) {
+ this.dataQueue.put(datum);
+ LOGGER.debug("Added entry, {}, to provider queue.", entryId);
+ }
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ } catch (Exception ex) {
+ LOGGER.trace("Failed to parse date from object node, attempting to add node to queue by default.");
+ if (!this.perpetual || !seenBefore(entryId, this.rssFeed)) {
+ this.dataQueue.put(datum);
+ LOGGER.debug("Added entry, {}, to provider queue.", entryId);
}
+ }
+ } else {
+ LOGGER.debug("No published date present, attempting to add node to queue by default.");
+ if (!this.perpetual || !seenBefore(entryId, this.rssFeed)) {
+ this.dataQueue.put(datum);
+ LOGGER.debug("Added entry, {}, to provider queue.", entryId);
+ }
}
- return batch;
+ } catch (InterruptedException ie) {
+ LOGGER.error("Interupted Exception.");
+ Thread.currentThread().interrupt();
+ }
}
-
- /**
- * Returns a link to the article to use as the id
- * @param node
- * @return
- */
- private String determineId(ObjectNode node) {
- String id = null;
- if(node.get(URI_KEY) != null && !node.get(URI_KEY).textValue().equals("")) {
- id = node.get(URI_KEY).textValue();
- } else if(node.get(LINK_KEY) != null && !node.get(LINK_KEY).textValue().equals("")) {
- id = node.get(LINK_KEY).textValue();
- }
- return id;
+ return batch;
+ }
+
+ /**
+ * Returns link to the article to use as the id.
+ * @param node node
+ * @return String
+ */
+ private String determineId(ObjectNode node) {
+ String id = null;
+ if (node.get(URI_KEY) != null && !node.get(URI_KEY).textValue().equals("")) {
+ id = node.get(URI_KEY).textValue();
+ } else if (node.get(LINK_KEY) != null && !node.get(LINK_KEY).textValue().equals("")) {
+ id = node.get(LINK_KEY).textValue();
}
-
- /**
- * Returns false if the artile was previously seen in another task for this feed
- * @param id
- * @param rssFeed
- * @return
- */
- private boolean seenBefore(String id, String rssFeed) {
- Set<String> previousBatch = PREVIOUSLY_SEEN.get(rssFeed);
- if(previousBatch == null) {
- return false;
- }
- return previousBatch.contains(id);
+ return id;
+ }
+
+ /**
+ * Returns false if the artile was previously seen in another task for this feed.
+ * @param id id
+ * @param rssFeed rssFeed
+ * @return boolean seenBefore
+ */
+ private boolean seenBefore(String id, String rssFeed) {
+ Set<String> previousBatch = PREVIOUSLY_SEEN.get(rssFeed);
+ if (previousBatch == null) {
+ return false;
}
+ return previousBatch.contains(id);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java
index 99ccbf3..e4bfd35 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java
@@ -15,12 +15,14 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.rss.provider.perpetual;
-import com.google.common.collect.Maps;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.rss.FeedDetails;
import org.apache.streams.rss.provider.RssStreamProviderTask;
+
+import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,82 +33,92 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
/**
- *
+ * RssFeedScheduler launches threads to collect data from rss feeds.
*/
public class RssFeedScheduler implements Runnable {
- private static final Logger LOGGER = LoggerFactory.getLogger(RssFeedScheduler.class);
- private static final int DEFAULT_PEROID = 10; // 1 minute
+ private static final Logger LOGGER = LoggerFactory.getLogger(RssFeedScheduler.class);
+ private static final int DEFAULT_PEROID = 10; // 1 minute
- private ExecutorService service;
- private List<FeedDetails> feedDetailsList;
- private int peroid;
- private AtomicBoolean keepRunning;
- private AtomicBoolean complete;
- private Map<String, Long> lastScheduled;
- private BlockingQueue<StreamsDatum> dataQueue;
+ private ExecutorService service;
+ private List<FeedDetails> feedDetailsList;
+ private int peroid;
+ private AtomicBoolean keepRunning;
+ private AtomicBoolean complete;
+ private Map<String, Long> lastScheduled;
+ private BlockingQueue<StreamsDatum> dataQueue;
- public RssFeedScheduler(ExecutorService service, List<FeedDetails> feedDetailsList, BlockingQueue<StreamsDatum> dataQueue) {
- this(service, feedDetailsList, dataQueue, DEFAULT_PEROID);
- }
+ public RssFeedScheduler(ExecutorService service, List<FeedDetails> feedDetailsList, BlockingQueue<StreamsDatum> dataQueue) {
+ this(service, feedDetailsList, dataQueue, DEFAULT_PEROID);
+ }
- public RssFeedScheduler(ExecutorService service, List<FeedDetails> feedDetailsList, BlockingQueue<StreamsDatum> dataQueue, int peroid) {
- this.service = service;
- this.feedDetailsList = feedDetailsList;
- this.peroid = peroid;
- this.keepRunning = new AtomicBoolean(true);
- this.lastScheduled = Maps.newHashMap();
- this.dataQueue = dataQueue;
- this.complete = new AtomicBoolean(false);
- }
+ /**
+ * RssFeedScheduler constructor.
+ * @param service service
+ * @param feedDetailsList feedDetailsList
+ * @param dataQueue dataQueue
+ * @param peroid peroid
+ */
+ public RssFeedScheduler(ExecutorService service, List<FeedDetails> feedDetailsList, BlockingQueue<StreamsDatum> dataQueue, int peroid) {
+ this.service = service;
+ this.feedDetailsList = feedDetailsList;
+ this.peroid = peroid;
+ this.keepRunning = new AtomicBoolean(true);
+ this.lastScheduled = Maps.newHashMap();
+ this.dataQueue = dataQueue;
+ this.complete = new AtomicBoolean(false);
+ }
- public void stop() {
- this.keepRunning.set(false);
- }
+ public void stop() {
+ this.keepRunning.set(false);
+ }
- public boolean isComplete() {
- return this.complete.get();
- }
+ public boolean isComplete() {
+ return this.complete.get();
+ }
- @Override
- public void run() {
- this.complete.set(false);
- try {
- if(this.peroid <= 0) {
- scheduleFeeds();
- } else {
- while (this.keepRunning.get()) {
- scheduleFeeds();
- Thread.sleep(this.peroid * 60000);
- }
- }
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- } finally {
- this.service = null;
- LOGGER.info("{} completed scheduling of feeds.", this.getClass().getName());
- this.complete.set(true);
+ @Override
+ public void run() {
+ this.complete.set(false);
+ try {
+ if (this.peroid <= 0) {
+ scheduleFeeds();
+ } else {
+ while (this.keepRunning.get()) {
+ scheduleFeeds();
+ Thread.sleep(this.peroid * 60000);
}
+ }
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ } finally {
+ this.service = null;
+ LOGGER.info("{} completed scheduling of feeds.", this.getClass().getName());
+ this.complete.set(true);
}
+ }
- public void scheduleFeeds() {
- for(FeedDetails detail : this.feedDetailsList) {
- Long lastTime = null;
- if((lastTime = this.lastScheduled.get(detail.getUrl())) == null) {
- lastTime = 0L;
- }
- long currentTime = System.currentTimeMillis();
- long pollInterval;
- if(detail.getPollIntervalMillis() == null) {
- pollInterval = 0;
- } else {
- pollInterval = detail.getPollIntervalMillis();
- }
- if(currentTime - lastTime > pollInterval) {
- this.service.execute(new RssStreamProviderTask(this.dataQueue, detail.getUrl()));
- this.LOGGER.trace("Scheduled data collection on rss feed, {}", detail.getUrl());
- this.lastScheduled.put(detail.getUrl(), currentTime);
- }
- }
+ /**
+ * Schedule Feeds.
+ */
+ public void scheduleFeeds() {
+ for (FeedDetails detail : this.feedDetailsList) {
+ Long lastTime = null;
+ if ((lastTime = this.lastScheduled.get(detail.getUrl())) == null) {
+ lastTime = 0L;
+ }
+ long currentTime = System.currentTimeMillis();
+ long pollInterval;
+ if (detail.getPollIntervalMillis() == null) {
+ pollInterval = 0;
+ } else {
+ pollInterval = detail.getPollIntervalMillis();
+ }
+ if (currentTime - lastTime > pollInterval) {
+ this.service.execute(new RssStreamProviderTask(this.dataQueue, detail.getUrl()));
+ this.LOGGER.trace("Scheduled data collection on rss feed, {}", detail.getUrl());
+ this.lastScheduled.put(detail.getUrl(), currentTime);
+ }
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
index e323f27..1e3aedd 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
@@ -18,11 +18,6 @@
package org.apache.streams.rss.serializer;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
import org.apache.streams.data.ActivitySerializer;
import org.apache.streams.data.util.RFC3339Utils;
import org.apache.streams.jackson.StreamsJacksonMapper;
@@ -30,6 +25,12 @@ import org.apache.streams.pojo.json.Activity;
import org.apache.streams.pojo.json.ActivityObject;
import org.apache.streams.pojo.json.Author;
import org.apache.streams.pojo.json.Provider;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
@@ -40,194 +41,200 @@ import java.util.List;
public class SyndEntryActivitySerializer implements ActivitySerializer<ObjectNode> {
- private static final Logger LOGGER = LoggerFactory.getLogger(SyndEntryActivitySerializer.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(SyndEntryActivitySerializer.class);
- private boolean includeRomeExtension;
+ private boolean includeRomeExtension;
- public SyndEntryActivitySerializer() {
- this(true);
- }
-
- public SyndEntryActivitySerializer(boolean includeRomeExtension) {
- this.includeRomeExtension = includeRomeExtension;
- }
+ public SyndEntryActivitySerializer() {
+ this(true);
+ }
+ public SyndEntryActivitySerializer(boolean includeRomeExtension) {
+ this.includeRomeExtension = includeRomeExtension;
+ }
- @Override
- public List<Activity> deserializeAll(List<ObjectNode> objectNodes) {
- List<Activity> result = new LinkedList<>();
- for (ObjectNode node : objectNodes) {
- result.add(deserialize(node));
- }
- return result;
+ @Override
+ public List<Activity> deserializeAll(List<ObjectNode> objectNodes) {
+ List<Activity> result = new LinkedList<>();
+ for (ObjectNode node : objectNodes) {
+ result.add(deserialize(node));
}
-
- @Override
- public String serializationFormat() {
- return "application/streams-provider-rss";
+ return result;
+ }
+
+ @Override
+ public String serializationFormat() {
+ return "application/streams-provider-rss";
+ }
+
+ @Override
+ public ObjectNode serialize(Activity deserialized) {
+ throw new UnsupportedOperationException("Cannot currently serialize to Rome");
+ }
+
+ @Override
+ public Activity deserialize(ObjectNode syndEntry) {
+ return deserializeWithRomeExtension(syndEntry, this.includeRomeExtension);
+ }
+
+ /**
+ * deserializeWithRomeExtension ObjectNode entry withExtension.
+ * @param entry ObjectNode
+ * @param withExtension whether to add Rome Extension
+ * @return Activity
+ */
+ public Activity deserializeWithRomeExtension(ObjectNode entry, boolean withExtension) {
+ Preconditions.checkNotNull(entry);
+
+ Activity activity = new Activity();
+ Provider provider = buildProvider(entry);
+ ActivityObject actor = buildActor(entry);
+ ActivityObject activityObject = buildActivityObject(entry);
+
+ activityObject.setUrl(provider.getUrl());
+ activityObject.setAuthor(actor.getAuthor());
+
+ activity.setUrl(provider.getUrl());
+ activity.setProvider(provider);
+ activity.setActor(actor);
+ activity.setVerb("post");
+ activity.setId("id:rss:post:" + activity.getUrl());
+
+ JsonNode published = entry.get("publishedDate");
+ if (published != null) {
+ try {
+ activity.setPublished(RFC3339Utils.parseToUTC(published.textValue()));
+ } catch (Exception ex) {
+ LOGGER.warn("Failed to parse date : {}", published.textValue());
+
+ DateTime now = DateTime.now().withZone(DateTimeZone.UTC);
+ activity.setPublished(now);
+ }
}
- @Override
- public ObjectNode serialize(Activity deserialized) {
- throw new UnsupportedOperationException("Cannot currently serialize to Rome");
- }
+ activity.setUpdated(activityObject.getUpdated());
+ activity.setObject(activityObject);
- @Override
- public Activity deserialize(ObjectNode syndEntry) {
- return deserializeWithRomeExtension(syndEntry, this.includeRomeExtension);
+ if (withExtension) {
+ activity = addRomeExtension(activity, entry);
}
- public Activity deserializeWithRomeExtension(ObjectNode entry, boolean withExtension) {
- Preconditions.checkNotNull(entry);
-
- Activity activity = new Activity();
- Provider provider = buildProvider(entry);
- ActivityObject actor = buildActor(entry);
- ActivityObject activityObject = buildActivityObject(entry);
-
- activityObject.setUrl(provider.getUrl());
- activityObject.setAuthor(actor.getAuthor());
-
- activity.setUrl(provider.getUrl());
- activity.setProvider(provider);
- activity.setActor(actor);
- activity.setVerb("post");
- activity.setId("id:rss:post:" + activity.getUrl());
-
- JsonNode published = entry.get("publishedDate");
- if (published != null) {
- try {
- activity.setPublished(RFC3339Utils.parseToUTC(published.textValue()));
- } catch (Exception e) {
- LOGGER.warn("Failed to parse date : {}", published.textValue());
-
- DateTime now = DateTime.now().withZone(DateTimeZone.UTC);
- activity.setPublished(now);
- }
- }
-
- activity.setUpdated(activityObject.getUpdated());
- activity.setObject(activityObject);
-
- if (withExtension) {
- activity = addRomeExtension(activity, entry);
- }
-
- return activity;
+ return activity;
+ }
+
+ /**
+ * Given an RSS entry, extra out the author and actor information and return it
+ * in an actor object
+ *
+ * @param entry entry
+ * @return $.actor
+ */
+ private ActivityObject buildActor(ObjectNode entry) {
+ ActivityObject actor = new ActivityObject();
+ Author author = new Author();
+
+ if (entry.get("author") != null) {
+ author.setId(entry.get("author").textValue());
+ author.setDisplayName(entry.get("author").textValue());
+
+ actor.setAuthor(author);
+ String uriToSet = entry.get("rssFeed") != null ? entry.get("rssFeed").asText() : null;
+
+ actor.setId("id:rss:" + uriToSet + ":" + author.getId());
+ actor.setDisplayName(author.getDisplayName());
}
- /**
- * Given an RSS entry, extra out the author and actor information and return it
- * in an actor object
- *
- * @param entry
- * @return
- */
- private ActivityObject buildActor(ObjectNode entry) {
- ActivityObject actor = new ActivityObject();
- Author author = new Author();
-
- if (entry.get("author") != null) {
- author.setId(entry.get("author").textValue());
- author.setDisplayName(entry.get("author").textValue());
-
- actor.setAuthor(author);
- String uriToSet = entry.get("rssFeed") != null ? entry.get("rssFeed").asText() : null;
-
- actor.setId("id:rss:" + uriToSet + ":" + author.getId());
- actor.setDisplayName(author.getDisplayName());
- }
-
- return actor;
+ return actor;
+ }
+
+ /**
+ * Given an RSS object, build the ActivityObject.
+ *
+ * @param entry ObjectNode
+ * @return $.object
+ */
+ private ActivityObject buildActivityObject(ObjectNode entry) {
+ ActivityObject activityObject = new ActivityObject();
+
+ JsonNode summary = entry.get("description");
+ if (summary != null) {
+ activityObject.setSummary(summary.textValue());
+ } else if ((summary = entry.get("title")) != null) {
+ activityObject.setSummary(summary.textValue());
}
- /**
- * Given an RSS object, build the ActivityObject
- *
- * @param entry
- * @return
- */
- private ActivityObject buildActivityObject(ObjectNode entry) {
- ActivityObject activityObject = new ActivityObject();
+ return activityObject;
+ }
- JsonNode summary = entry.get("description");
- if (summary != null)
- activityObject.setSummary(summary.textValue());
- else if((summary = entry.get("title")) != null) {
- activityObject.setSummary(summary.textValue());
- }
+ /**
+ * Given an RSS object, build and return the Provider object.
+ *
+ * @param entry ObjectNode
+ * @return $.provider
+ */
+ private Provider buildProvider(ObjectNode entry) {
+ Provider provider = new Provider();
- return activityObject;
- }
+ String link = null;
+ String uri = null;
+ String resourceLocation = null;
- /**
- * Given an RSS object, build and return the Provider object
- *
- * @param entry
- * @return
- */
- private Provider buildProvider(ObjectNode entry) {
- Provider provider = new Provider();
-
- String link = null;
- String uri = null;
- String resourceLocation = null;
-
- if (entry.get("link") != null)
- link = entry.get("link").textValue();
- if (entry.get("uri") != null)
- uri = entry.get("uri").textValue();
-
- /*
- * Order of precedence for resourceLocation selection
- *
- * 1. Valid URI
- * 2. Valid Link
- * 3. Non-null URI
- * 4. Non-null Link
- */
- if(isValidResource(uri))
- resourceLocation = uri;
- else if(isValidResource(link))
- resourceLocation = link;
- else if(uri != null || link != null) {
- resourceLocation = (uri != null) ? uri : link;
- }
-
- provider.setId("id:providers:rss");
- provider.setUrl(resourceLocation);
- provider.setDisplayName("RSS");
-
- return provider;
+ if (entry.get("link") != null) {
+ link = entry.get("link").textValue();
}
-
- /**
- * Tests whether or not the passed in resource is a valid URI
- * @param resource
- * @return boolean of whether or not the resource is valid
- */
- private boolean isValidResource(String resource) {
- return resource != null && (resource.startsWith("http") || resource.startsWith("www"));
+ if (entry.get("uri") != null) {
+ uri = entry.get("uri").textValue();
}
-
- /**
- * Given an RSS object and an existing activity,
- * add the Rome extension to that activity and return it
+ /*
+ * Order of precedence for resourceLocation selection
*
- * @param activity
- * @param entry
- * @return
+ * 1. Valid URI
+ * 2. Valid Link
+ * 3. Non-null URI
+ * 4. Non-null Link
*/
- private Activity addRomeExtension(Activity activity, ObjectNode entry) {
- ObjectMapper mapper = StreamsJacksonMapper.getInstance();
- ObjectNode activityRoot = mapper.convertValue(activity, ObjectNode.class);
- ObjectNode extensions = JsonNodeFactory.instance.objectNode();
-
- extensions.put("rome", entry);
- activityRoot.put("extensions", extensions);
-
- activity = mapper.convertValue(activityRoot, Activity.class);
-
- return activity;
+ if (isValidResource(uri)) {
+ resourceLocation = uri;
+ } else if (isValidResource(link)) {
+ resourceLocation = link;
+ } else if (uri != null || link != null) {
+ resourceLocation = (uri != null) ? uri : link;
}
+
+ provider.setId("id:providers:rss");
+ provider.setUrl(resourceLocation);
+ provider.setDisplayName("RSS");
+
+ return provider;
+ }
+
+ /**
+ * Tests whether or not the passed in resource is a valid URI.
+ * @param resource resource
+ * @return boolean of whether or not the resource is valid
+ */
+ private boolean isValidResource(String resource) {
+ return resource != null && (resource.startsWith("http") || resource.startsWith("www"));
+ }
+
+ /**
+ * Given an RSS object and an existing activity,
+ * add the Rome extension to that activity and return it.
+ *
+ * @param activity Activity
+ * @param entry ObjectNode
+ * @return Activity
+ */
+ private Activity addRomeExtension(Activity activity, ObjectNode entry) {
+ ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+ ObjectNode activityRoot = mapper.convertValue(activity, ObjectNode.class);
+ ObjectNode extensions = JsonNodeFactory.instance.objectNode();
+
+ extensions.put("rome", entry);
+ activityRoot.put("extensions", extensions);
+
+ activity = mapper.convertValue(activityRoot, Activity.class);
+
+ return activity;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java
index 1135172..6868bfc 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java
@@ -26,7 +26,12 @@ import com.sun.syndication.feed.module.Module;
import com.sun.syndication.feed.rss.Category;
import com.sun.syndication.feed.rss.Content;
import com.sun.syndication.feed.rss.Enclosure;
-import com.sun.syndication.feed.synd.*;
+import com.sun.syndication.feed.synd.SyndContent;
+import com.sun.syndication.feed.synd.SyndEnclosure;
+import com.sun.syndication.feed.synd.SyndEntry;
+import com.sun.syndication.feed.synd.SyndFeed;
+import com.sun.syndication.feed.synd.SyndImage;
+import com.sun.syndication.feed.synd.SyndLinkImpl;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import org.slf4j.Logger;
@@ -42,267 +47,284 @@ import java.util.List;
*/
public class SyndEntrySerializer {
- private static final Logger LOGGER = LoggerFactory.getLogger(SyndEntrySerializer.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(SyndEntrySerializer.class);
- public ObjectNode deserialize(SyndEntry entry) {
- return deserializeRomeEntry(entry);
- }
-
-
- public List<ObjectNode> deserializeAll(Collection<SyndEntry> entries) {
- List<ObjectNode> result = Lists.newLinkedList();
- for(SyndEntry entry : entries) {
- result.add(deserialize(entry));
- }
- return result;
- }
+ public ObjectNode deserialize(SyndEntry entry) {
+ return deserializeRomeEntry(entry);
+ }
+ private ObjectNode deserializeRomeEntry(SyndEntry entry) {
+ JsonNodeFactory factory = JsonNodeFactory.instance;
+ ObjectNode root = factory.objectNode();
+ serializeString(entry.getAuthor(), "author", root);
+ serializeListOfStrings(entry.getAuthors(), "authors", root, factory);
+ serializeCategories(root, factory, entry.getCategories());
+ serializeContents(root, factory, entry.getContents());
+ serializeListOfStrings(entry.getContributors(), "contributors", root, factory);
+ serializeDescription(root, factory, entry.getDescription());
+ serializeEnclosures(root, factory, entry.getEnclosures());
+ serializeForeignMarkUp(root, factory, entry.getForeignMarkup());
+ serializeString(entry.getLink(), "link", root);
+ serializeLinks(root, factory, entry.getLinks());
+ serializeModules(root, factory, entry.getModules());
+ serializeDate(root, entry.getPublishedDate(), "publishedDate");
+ serializeSource(root, factory, entry.getSource());
+ serializeString(entry.getTitle(), "title", root);
+ serializeDate(root, entry.getUpdatedDate(), "updateDate");
+ serializeString(entry.getUri(), "uri", root);
- private ObjectNode deserializeRomeEntry(SyndEntry entry) {
- JsonNodeFactory factory = JsonNodeFactory.instance;
- ObjectNode root = factory.objectNode();
+ return root;
+ }
- serializeString(entry.getAuthor(), "author", root);
- serializeListOfStrings(entry.getAuthors(), "authors", root, factory);
- serializeCategories(root, factory, entry.getCategories());
- serializeContents(root, factory, entry.getContents());
- serializeListOfStrings(entry.getContributors(), "contributors", root, factory);
- serializeDescription(root, factory, entry.getDescription());
- serializeEnclosures(root, factory, entry.getEnclosures());
- serializeForeignMarkUp(root, factory, entry.getForeignMarkup());
- serializeString(entry.getLink(), "link", root);
- serializeLinks(root, factory, entry.getLinks());
- serializeModules(root, factory, entry.getModules());
- serializeDate(root, entry.getPublishedDate(), "publishedDate");
- serializeSource(root, factory, entry.getSource());
- serializeString(entry.getTitle(), "title", root);
- serializeDate(root, entry.getUpdatedDate(), "updateDate");
- serializeString(entry.getUri(), "uri", root);
- return root;
+ private void serializeCategories(ObjectNode root, JsonNodeFactory factory, List categories) {
+ if (categories == null || categories.size() == 0) {
+ return;
}
-
-
- private void serializeCategories(ObjectNode root, JsonNodeFactory factory, List categories) {
- if(categories == null || categories.size() == 0)
- return;
- ArrayNode cats = factory.arrayNode();
- for(Object obj : categories) {
- if(obj instanceof Category) {
- ObjectNode catNode = factory.objectNode();
- Category category = (Category) obj;
- if(category.getDomain() != null)
- catNode.put("domain", category.getDomain());
- if(category.getValue() != null)
- catNode.put("value", category.getValue());
- cats.add(catNode);
- }
- else if(obj instanceof com.sun.syndication.feed.atom.Category) {
- com.sun.syndication.feed.atom.Category category = (com.sun.syndication.feed.atom.Category) obj;
- ObjectNode catNode = factory.objectNode();
- if(category.getLabel() != null)
- catNode.put("label", category.getLabel());
- if(category.getScheme() != null)
- catNode.put("scheme", category.getScheme());
- if(category.getSchemeResolved() != null)
- catNode.put("schemeResolved", category.getSchemeResolved());
- if(category.getTerm() != null )
- catNode.put("term", category.getTerm());
- cats.add(catNode);
- }
+ ArrayNode cats = factory.arrayNode();
+ for (Object obj : categories) {
+ if (obj instanceof Category) {
+ ObjectNode catNode = factory.objectNode();
+ Category category = (Category) obj;
+ if (category.getDomain() != null) {
+ catNode.put("domain", category.getDomain());
+ }
+ if (category.getValue() != null) {
+ catNode.put("value", category.getValue());
}
- root.put("categories", cats);
+ cats.add(catNode);
+ } else if (obj instanceof com.sun.syndication.feed.atom.Category) {
+ com.sun.syndication.feed.atom.Category category = (com.sun.syndication.feed.atom.Category) obj;
+ ObjectNode catNode = factory.objectNode();
+ if (category.getLabel() != null) {
+ catNode.put("label", category.getLabel());
+ }
+ if (category.getScheme() != null) {
+ catNode.put("scheme", category.getScheme());
+ }
+ if (category.getSchemeResolved() != null) {
+ catNode.put("schemeResolved", category.getSchemeResolved());
+ }
+ if (category.getTerm() != null ) {
+ catNode.put("term", category.getTerm());
+ }
+ cats.add(catNode);
+ }
}
+ root.put("categories", cats);
+ }
- private void serializeContents(ObjectNode root, JsonNodeFactory factory, List contents) {
- if(contents == null || contents.size() == 0)
- return;
- ArrayNode contentsArray = factory.arrayNode();
- for(Object obj : contents) {
- ObjectNode content = factory.objectNode();
- if(obj instanceof Content) {
- Content rssContent = (Content) obj;
- content.put("type", rssContent.getType());
- content.put("value", rssContent.getValue());
- }
- if(obj instanceof com.sun.syndication.feed.atom.Content) {
- com.sun.syndication.feed.atom.Content atomContent = (com.sun.syndication.feed.atom.Content) obj;
- content.put("type", atomContent.getType());
- content.put("value", atomContent.getValue());
- content.put("mode", atomContent.getMode());
- content.put("src", atomContent.getSrc());
- }
- contentsArray.add(content);
- }
- root.put("contents", contentsArray);
+ private void serializeContents(ObjectNode root, JsonNodeFactory factory, List contents) {
+ if (contents == null || contents.size() == 0) {
+ return;
+ }
+ ArrayNode contentsArray = factory.arrayNode();
+ for (Object obj : contents) {
+ ObjectNode content = factory.objectNode();
+ if (obj instanceof Content) {
+ Content rssContent = (Content) obj;
+ content.put("type", rssContent.getType());
+ content.put("value", rssContent.getValue());
+ }
+ if (obj instanceof com.sun.syndication.feed.atom.Content) {
+ com.sun.syndication.feed.atom.Content atomContent = (com.sun.syndication.feed.atom.Content) obj;
+ content.put("type", atomContent.getType());
+ content.put("value", atomContent.getValue());
+ content.put("mode", atomContent.getMode());
+ content.put("src", atomContent.getSrc());
+ }
+ contentsArray.add(content);
}
+ root.put("contents", contentsArray);
+ }
- private void serializeDate(ObjectNode root, Date date, String key) {
- DateTimeFormatter formatter = ISODateTimeFormat.dateTime();
- if(date == null)
- return;
- root.put(key, formatter.print(date.getTime()));
+ private void serializeDate(ObjectNode root, Date date, String key) {
+ DateTimeFormatter formatter = ISODateTimeFormat.dateTime();
+ if (date == null) {
+ return;
}
+ root.put(key, formatter.print(date.getTime()));
+ }
- private void serializeDescription(ObjectNode root, JsonNodeFactory factory, SyndContent synd) {
- if(synd == null)
- return;
- ObjectNode content = factory.objectNode();
- if(synd.getValue() != null)
- content.put("value", synd.getValue());
- if(synd.getMode() != null)
- content.put("mode", synd.getMode());
- if(synd.getType() != null)
- content.put("type", synd.getType());
- root.put("description", content);
+ private void serializeDescription(ObjectNode root, JsonNodeFactory factory, SyndContent synd) {
+ if (synd == null) {
+ return;
+ }
+ ObjectNode content = factory.objectNode();
+ if (synd.getValue() != null) {
+ content.put("value", synd.getValue());
}
+ if (synd.getMode() != null) {
+ content.put("mode", synd.getMode());
+ }
+ if (synd.getType() != null) {
+ content.put("type", synd.getType());
+ }
+ root.put("description", content);
+ }
- private void serializeEnclosures(ObjectNode root, JsonNodeFactory factory, List enclosures) {
- if(enclosures == null || enclosures.size() == 0)
- return;
- ArrayNode encls = factory.arrayNode();
- for(Object obj : enclosures) {
- if(obj instanceof Enclosure){
- Enclosure enclosure = (Enclosure) obj;
- ObjectNode encl = factory.objectNode();
- if(enclosure.getType() != null)
- encl.put("type", enclosure.getType());
- if(enclosure.getUrl() != null)
- encl.put("url", enclosure.getUrl());
- encl.put("length", enclosure.getLength());
- encls.add(encl);
- } else if(obj instanceof SyndEnclosure) {
- SyndEnclosure enclosure = (SyndEnclosure) obj;
- ObjectNode encl = factory.objectNode();
- if(enclosure.getType() != null)
- encl.put("type", enclosure.getType());
- if(enclosure.getUrl() != null)
- encl.put("url", enclosure.getUrl());
- encl.put("length", enclosure.getLength());
- encls.add(encl);
- } else {
- LOGGER.warn("serializeEnclosures does not handle type : {}", obj.getClass().toString());
- }
+ private void serializeEnclosures(ObjectNode root, JsonNodeFactory factory, List enclosures) {
+ if (enclosures == null || enclosures.size() == 0) {
+ return;
+ }
+ ArrayNode encls = factory.arrayNode();
+ for (Object obj : enclosures) {
+ if (obj instanceof Enclosure) {
+ Enclosure enclosure = (Enclosure) obj;
+ ObjectNode encl = factory.objectNode();
+ if (enclosure.getType() != null) {
+ encl.put("type", enclosure.getType());
+ }
+ if (enclosure.getUrl() != null) {
+ encl.put("url", enclosure.getUrl());
}
- root.put("enclosures", encls);
+ encl.put("length", enclosure.getLength());
+ encls.add(encl);
+ } else if (obj instanceof SyndEnclosure) {
+ SyndEnclosure enclosure = (SyndEnclosure) obj;
+ ObjectNode encl = factory.objectNode();
+ if (enclosure.getType() != null) {
+ encl.put("type", enclosure.getType());
+ }
+ if (enclosure.getUrl() != null) {
+ encl.put("url", enclosure.getUrl());
+ }
+ encl.put("length", enclosure.getLength());
+ encls.add(encl);
+ } else {
+ LOGGER.warn("serializeEnclosures does not handle type : {}", obj.getClass().toString());
+ }
}
+ root.put("enclosures", encls);
+ }
- private void serializeForeignMarkUp(ObjectNode root, JsonNodeFactory factory, Object foreignMarkUp) {
- if(foreignMarkUp == null)
- return;
- if(foreignMarkUp instanceof String) {
- root.put("foreignEnclosures", (String) foreignMarkUp);
- } else if (foreignMarkUp instanceof List) {
- List foreignList = (List) foreignMarkUp;
- if(foreignList.size() == 0)
- return;
- if(foreignList.get(0) instanceof String) {
- serializeListOfStrings(foreignList, "foreignEnclosures", root, factory);
- } else {
- LOGGER.debug("SyndEntry.getForeignMarkUp is not of type String. Need to handle the case of class : {}", ((List)foreignMarkUp).get(0).getClass().toString());
- }
- } else {
- LOGGER.debug("SyndEntry.getForeignMarkUp is not of an expected type. Need to handle the case of class : {}", foreignMarkUp.getClass().toString());
- }
+ private void serializeForeignMarkUp(ObjectNode root, JsonNodeFactory factory, Object foreignMarkUp) {
+ if (foreignMarkUp == null) {
+ return;
}
+ if (foreignMarkUp instanceof String) {
+ root.put("foreignEnclosures", (String) foreignMarkUp);
+ } else if (foreignMarkUp instanceof List) {
+ List foreignList = (List) foreignMarkUp;
+ if (foreignList.size() == 0) {
+ return;
+ }
+ if (foreignList.get(0) instanceof String) {
+ serializeListOfStrings(foreignList, "foreignEnclosures", root, factory);
+ } else {
+ LOGGER.debug("SyndEntry.getForeignMarkUp is not of type String. Need to handle the case of class : {}",
+ ((List)foreignMarkUp).get(0).getClass().toString());
+ }
+ } else {
+ LOGGER.debug("SyndEntry.getForeignMarkUp is not of an expected type. Need to handle the case of class : {}",
+ foreignMarkUp.getClass().toString());
+ }
+ }
- private void serializeImage(ObjectNode root, JsonNodeFactory factory, SyndImage image) {
- if(image == null)
- return;
- ObjectNode imageNode = factory.objectNode();
- serializeString(image.getDescription(), "description", imageNode);
- serializeString(image.getLink(), "link", imageNode);
- serializeString(image.getUrl(), "url", imageNode);
- serializeString(image.getTitle(), "title", imageNode);
- root.put("image", imageNode);
+ private void serializeImage(ObjectNode root, JsonNodeFactory factory, SyndImage image) {
+ if (image == null) {
+ return;
}
+ ObjectNode imageNode = factory.objectNode();
+ serializeString(image.getDescription(), "description", imageNode);
+ serializeString(image.getLink(), "link", imageNode);
+ serializeString(image.getUrl(), "url", imageNode);
+ serializeString(image.getTitle(), "title", imageNode);
+ root.put("image", imageNode);
+ }
- private void serializeListOfStrings(List toSerialize, String key, ObjectNode node, JsonNodeFactory factory) {
- if(toSerialize == null || toSerialize.size() == 0)
- return;
- ArrayNode keyNode = factory.arrayNode();
- for(Object obj : toSerialize) {
- if(obj instanceof String) {
- keyNode.add((String) obj);
- } else {
- LOGGER.debug("Array at Key:{} was expecting item types of String. Received class : {}", key, obj.getClass().toString());
- }
- }
- node.put(key, keyNode);
+ private void serializeListOfStrings(List toSerialize, String key, ObjectNode node, JsonNodeFactory factory) {
+ if (toSerialize == null || toSerialize.size() == 0) {
+ return;
}
+ ArrayNode keyNode = factory.arrayNode();
+ for (Object obj : toSerialize) {
+ if (obj instanceof String) {
+ keyNode.add((String) obj);
+ } else {
+ LOGGER.debug("Array at Key:{} was expecting item types of String. Received class : {}", key, obj.getClass().toString());
+ }
+ }
+ node.put(key, keyNode);
+ }
- private void serializeLinks(ObjectNode root, JsonNodeFactory factory, List links) {
- if(links == null || links.size() == 0) {
- return;
- } else if(links.get(0) instanceof String) {
- serializeListOfStrings(links, "links", root, factory);
- } else if(links.get(0) instanceof SyndLinkImpl) {
- ArrayNode linksArray = factory.arrayNode();
- SyndLinkImpl syndLink;
- ObjectNode linkNode;
- for(Object obj : links) {
- linkNode = factory.objectNode();
- syndLink = (SyndLinkImpl) obj;
- linkNode.put("rel", syndLink.getRel());
- linkNode.put("href", syndLink.getHref());
- linkNode.put("type", syndLink.getType());
- linkNode.put("length", syndLink.getLength());
- linkNode.put("hrefLang", syndLink.getHreflang());
- linkNode.put("title", syndLink.getTitle());
- linksArray.add(linkNode);
- }
- root.put("links", linksArray);
- } else {
- LOGGER.error("No implementation for handling links of class : {}", links.get(0).getClass().toString());
- }
+ private void serializeLinks(ObjectNode root, JsonNodeFactory factory, List links) {
+ if (links == null || links.size() == 0) {
+ return;
+ } else if (links.get(0) instanceof String) {
+ serializeListOfStrings(links, "links", root, factory);
+ } else if (links.get(0) instanceof SyndLinkImpl) {
+ ArrayNode linksArray = factory.arrayNode();
+ SyndLinkImpl syndLink;
+ ObjectNode linkNode;
+ for (Object obj : links) {
+ linkNode = factory.objectNode();
+ syndLink = (SyndLinkImpl) obj;
+ linkNode.put("rel", syndLink.getRel());
+ linkNode.put("href", syndLink.getHref());
+ linkNode.put("type", syndLink.getType());
+ linkNode.put("length", syndLink.getLength());
+ linkNode.put("hrefLang", syndLink.getHreflang());
+ linkNode.put("title", syndLink.getTitle());
+ linksArray.add(linkNode);
+ }
+ root.put("links", linksArray);
+ } else {
+ LOGGER.error("No implementation for handling links of class : {}", links.get(0).getClass().toString());
}
+ }
- private void serializeModules(ObjectNode root, JsonNodeFactory factory, List modules) {
- if(modules == null || modules.size() == 0)
- return;
- ArrayNode modulesArray = factory.arrayNode();
- for(Object obj : modules) {
- if(obj instanceof Module) {
- Module mod = (Module) obj;
- if(mod.getUri() != null)
- modulesArray.add(mod.getUri());
- } else {
- LOGGER.debug("SyndEntry.getModules() items are not of type Module. Need to handle the case of class : {}", obj.getClass().toString());
- }
+ private void serializeModules(ObjectNode root, JsonNodeFactory factory, List modules) {
+ if (modules == null || modules.size() == 0) {
+ return;
+ }
+ ArrayNode modulesArray = factory.arrayNode();
+ for (Object obj : modules) {
+ if (obj instanceof Module) {
+ Module mod = (Module) obj;
+ if (mod.getUri() != null) {
+ modulesArray.add(mod.getUri());
}
- root.put("modules", modulesArray);
+ } else {
+ LOGGER.debug("SyndEntry.getModules() items are not of type Module. Need to handle the case of class : {}",
+ obj.getClass().toString());
+ }
}
+ root.put("modules", modulesArray);
+ }
- private void serializeSource(ObjectNode root, JsonNodeFactory factory, SyndFeed source) {
- if(source == null)
- return;
- ObjectNode sourceNode = factory.objectNode();
- serializeString(source.getAuthor(), "author", sourceNode);
- serializeListOfStrings(source.getAuthors(), "authors", sourceNode, factory);
- serializeCategories(sourceNode, factory, source.getCategories());
- serializeString(source.getCopyright(), "copyright", sourceNode);
- serializeListOfStrings(source.getContributors(), "contributors", sourceNode, factory);
- serializeString(source.getDescription(), "description", sourceNode);
- serializeDescription(sourceNode, factory, source.getDescriptionEx());
- // source.getEntries(); wtf?
- serializeString(source.getFeedType(), "feedType", sourceNode);
- serializeImage(sourceNode, factory, source.getImage());
- serializeForeignMarkUp(sourceNode, factory, source.getForeignMarkup());
- serializeString(source.getLanguage(), "language", sourceNode);
- serializeString(source.getLink(), "link", sourceNode);
- serializeListOfStrings(source.getLinks(), "links", sourceNode, factory);
- serializeModules(sourceNode, factory, source.getModules());
- serializeDate(sourceNode, source.getPublishedDate(), "publishedDate");
- serializeString(source.getTitle(), "title", sourceNode);
- serializeString(source.getUri(), "uri", sourceNode);
-
- root.put("source", sourceNode);
+ private void serializeSource(ObjectNode root, JsonNodeFactory factory, SyndFeed source) {
+ if (source == null) {
+ return;
}
+ ObjectNode sourceNode = factory.objectNode();
+ serializeString(source.getAuthor(), "author", sourceNode);
+ serializeListOfStrings(source.getAuthors(), "authors", sourceNode, factory);
+ serializeCategories(sourceNode, factory, source.getCategories());
+ serializeString(source.getCopyright(), "copyright", sourceNode);
+ serializeListOfStrings(source.getContributors(), "contributors", sourceNode, factory);
+ serializeString(source.getDescription(), "description", sourceNode);
+ serializeDescription(sourceNode, factory, source.getDescriptionEx());
+ // source.getEntries(); wtf?
+ serializeString(source.getFeedType(), "feedType", sourceNode);
+ serializeImage(sourceNode, factory, source.getImage());
+ serializeForeignMarkUp(sourceNode, factory, source.getForeignMarkup());
+ serializeString(source.getLanguage(), "language", sourceNode);
+ serializeString(source.getLink(), "link", sourceNode);
+ serializeListOfStrings(source.getLinks(), "links", sourceNode, factory);
+ serializeModules(sourceNode, factory, source.getModules());
+ serializeDate(sourceNode, source.getPublishedDate(), "publishedDate");
+ serializeString(source.getTitle(), "title", sourceNode);
+ serializeString(source.getUri(), "uri", sourceNode);
+
+ root.put("source", sourceNode);
+ }
- private void serializeString(String string, String key, ObjectNode node) {
- if(string != null && !string.equals(""))
- node.put(key, string);
+ private void serializeString(String string, String key, ObjectNode node) {
+ if (string != null && !string.equals("")) {
+ node.put(key, string);
}
+ }
}