You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/11/25 20:25:05 UTC

[24/42] incubator-streams git commit: STREAMS-440: custom checkstyle.xml, address compliance

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
index d7dc918..9de1863 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
@@ -18,14 +18,6 @@
 
 package org.apache.streams.rss.provider;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Uninterruptibles;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.config.StreamsConfigurator;
@@ -37,6 +29,15 @@ import org.apache.streams.rss.FeedDetails;
 import org.apache.streams.rss.RssStreamConfiguration;
 import org.apache.streams.rss.provider.perpetual.RssFeedScheduler;
 import org.apache.streams.util.ComponentUtils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,173 +62,159 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * RSS {@link org.apache.streams.core.StreamsProvider} that provides content from rss feeds in boilerpipe format
- *
- *  To use from command line:
- *
- *  Supply configuration similar to src/test/resources/rss.conf
- *
- *  Launch using:
- *
- *  mvn exec:java -Dexec.mainClass=org.apache.streams.rss.provider.RssStreamProvider -Dexec.args="rss.conf articles.json"
  */
 public class RssStreamProvider implements StreamsProvider {
 
-    public static final String STREAMS_ID = "RssStreamProvider";
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(RssStreamProvider.class);
-
-    private final static int MAX_SIZE = 1000;
-
-    private RssStreamConfiguration config;
-    private boolean perpetual;
-    private ExecutorService executor;
-    private BlockingQueue<StreamsDatum> dataQueue;
-    private AtomicBoolean isComplete;
-
-    @VisibleForTesting
-    protected RssFeedScheduler scheduler;
-
-    public RssStreamProvider() {
-        this(new ComponentConfigurator<>(RssStreamConfiguration.class)
-          .detectConfiguration(StreamsConfigurator.getConfig().getConfig("rss")), false);
-    }
-
-    public RssStreamProvider(boolean perpetual) {
-        this(new ComponentConfigurator<>(RssStreamConfiguration.class)
-          .detectConfiguration(StreamsConfigurator.getConfig().getConfig("rss")), perpetual);
-    }
-
-    public RssStreamProvider(RssStreamConfiguration config) {
-        this(config, false);
-    }
-
-    public RssStreamProvider(RssStreamConfiguration config, boolean perpetual) {
-        this.perpetual = perpetual;
-        this.config = config;
-    }
-
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
-
-    public void setConfig(RssStreamConfiguration config) {
-        this.config = config;
-    }
-
-    public void setRssFeeds(Set<String> urlFeeds) {
-    }
-
-    public void setRssFeeds(Map<String, Long> feeds) {
-        if(this.config == null) {
-            this.config = new RssStreamConfiguration();
-        }
-        List<FeedDetails> feedDetails = new ArrayList<>();
-        for(String feed : feeds.keySet()) {
-            Long delay = feeds.get(feed);
-            FeedDetails detail = new FeedDetails();
-            detail.setUrl(feed);
-            detail.setPollIntervalMillis(delay);
-            feedDetails.add(detail);
-        }
-        this.config.setFeeds(feedDetails);
-    }
-
-    @Override
-    public void startStream() {
-        LOGGER.trace("Starting Rss Scheduler");
-        this.executor.submit(this.scheduler);
-    }
-
-    @Override
-    public StreamsResultSet readCurrent() {
-        Queue<StreamsDatum> batch = new ConcurrentLinkedQueue<>();
-        int batchSize = 0;
-        while(!this.dataQueue.isEmpty() && batchSize < MAX_SIZE) {
-            StreamsDatum datum = ComponentUtils.pollWhileNotEmpty(this.dataQueue);
-            if(datum != null) {
-                ++batchSize;
-                batch.add(datum);
-            }
+  public static final String STREAMS_ID = "RssStreamProvider";
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(RssStreamProvider.class);
+
+  private static final int MAX_SIZE = 1000;
+
+  private RssStreamConfiguration config;
+  private boolean perpetual;
+  private ExecutorService executor;
+  private BlockingQueue<StreamsDatum> dataQueue;
+  private AtomicBoolean isComplete;
+
+  @VisibleForTesting
+  protected RssFeedScheduler scheduler;
+
+  public RssStreamProvider() {
+    this(new ComponentConfigurator<>(RssStreamConfiguration.class)
+        .detectConfiguration(StreamsConfigurator.getConfig().getConfig("rss")), false);
+  }
+
+  public RssStreamProvider(boolean perpetual) {
+    this(new ComponentConfigurator<>(RssStreamConfiguration.class)
+        .detectConfiguration(StreamsConfigurator.getConfig().getConfig("rss")), perpetual);
+  }
+
+  public RssStreamProvider(RssStreamConfiguration config) {
+    this(config, false);
+  }
+
+  public RssStreamProvider(RssStreamConfiguration config, boolean perpetual) {
+    this.perpetual = perpetual;
+    this.config = config;
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public void startStream() {
+    LOGGER.trace("Starting Rss Scheduler");
+    this.executor.submit(this.scheduler);
+  }
+
+  @Override
+  public StreamsResultSet readCurrent() {
+    Queue<StreamsDatum> batch = new ConcurrentLinkedQueue<>();
+    int batchSize = 0;
+    while (!this.dataQueue.isEmpty() && batchSize < MAX_SIZE) {
+      StreamsDatum datum = ComponentUtils.pollWhileNotEmpty(this.dataQueue);
+      if (datum != null) {
+        ++batchSize;
+        batch.add(datum);
+      }
+    }
+    this.isComplete.set(this.scheduler.isComplete() && batch.isEmpty() && this.dataQueue.isEmpty());
+    return new StreamsResultSet(batch);
+  }
+
+  @Override
+  public StreamsResultSet readNew(BigInteger sequence) {
+    return null;
+  }
+
+  @Override
+  public StreamsResultSet readRange(DateTime start, DateTime end) {
+    return null;
+  }
+
+  @Override
+  public boolean isRunning() {
+    return !this.isComplete.get();
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    this.executor = new ThreadPoolExecutor(1, 4, 15L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+    this.dataQueue = new LinkedBlockingQueue<>();
+    this.scheduler = getScheduler(this.dataQueue);
+    this.isComplete = new AtomicBoolean(false);
+    int consecutiveEmptyReads = 0;
+  }
+
+  @VisibleForTesting
+  protected RssFeedScheduler getScheduler(BlockingQueue<StreamsDatum> queue) {
+    if (this.perpetual) {
+      return new RssFeedScheduler(this.executor, this.config.getFeeds(), queue);
+    } else {
+      return new RssFeedScheduler(this.executor, this.config.getFeeds(), queue, 0);
+    }
+  }
+
+  @Override
+  public void cleanUp() {
+    this.scheduler.stop();
+    ComponentUtils.shutdownExecutor(this.executor, 10, 10);
+  }
+
+  /**
+   * To use from command line:
+   *
+   * <p/>
+   * Supply configuration similar to src/test/resources/rss.conf
+   *
+   * <p/>
+   * Launch using:
+   *
+   * <p/>
+   * mvn exec:java -Dexec.mainClass=org.apache.streams.rss.provider.RssStreamProvider -Dexec.args="rss.conf articles.json"
+   * @param args args
+   * @throws Exception Exception
+   */
+  public static void main(String[] args) throws Exception {
+
+    Preconditions.checkArgument(args.length >= 2);
+
+    String configfile = args[0];
+    String outfile = args[1];
+
+    Config reference = ConfigFactory.load();
+    File file = new File(configfile);
+    assert (file.exists());
+    Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+
+    StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+    RssStreamConfiguration config = new ComponentConfigurator<>(RssStreamConfiguration.class).detectConfiguration(typesafe, "rss");
+    RssStreamProvider provider = new RssStreamProvider(config);
+
+    ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+    provider.prepare(config);
+    provider.startStream();
+    do {
+      Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+      for (StreamsDatum datum : provider.readCurrent()) {
+        String json;
+        try {
+          json = mapper.writeValueAsString(datum.getDocument());
+          outStream.println(json);
+        } catch (JsonProcessingException ex) {
+          System.err.println(ex.getMessage());
         }
-        this.isComplete.set(this.scheduler.isComplete() && batch.isEmpty() && this.dataQueue.isEmpty());
-        return new StreamsResultSet(batch);
-    }
-
-    @Override
-    public StreamsResultSet readNew(BigInteger sequence) {
-        return null;
-    }
-
-    @Override
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        return null;
-    }
-
-    @Override
-    public boolean isRunning() {
-        return !this.isComplete.get();
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-        this.executor = new ThreadPoolExecutor(1, 4, 15L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
-        this.dataQueue = new LinkedBlockingQueue<>();
-        this.scheduler = getScheduler(this.dataQueue);
-        this.isComplete = new AtomicBoolean(false);
-        int consecutiveEmptyReads = 0;
-    }
-
-    @VisibleForTesting
-    protected RssFeedScheduler getScheduler(BlockingQueue<StreamsDatum> queue) {
-        if(this.perpetual)
-            return new RssFeedScheduler(this.executor, this.config.getFeeds(), queue);
-        else
-            return new RssFeedScheduler(this.executor, this.config.getFeeds(), queue, 0);
-    }
-
-    @Override
-    public void cleanUp() {
-        this.scheduler.stop();
-        ComponentUtils.shutdownExecutor(this.executor, 10, 10);
-    }
-
-    public static void main(String[] args) throws Exception {
-
-        Preconditions.checkArgument(args.length >= 2);
-
-        String configfile = args[0];
-        String outfile = args[1];
-
-        Config reference = ConfigFactory.load();
-        File conf_file = new File(configfile);
-        assert(conf_file.exists());
-        Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-
-        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
-
-        StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
-        RssStreamConfiguration config = new ComponentConfigurator<>(RssStreamConfiguration.class).detectConfiguration(typesafe, "rss");
-        RssStreamProvider provider = new RssStreamProvider(config);
-
-        ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-        PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
-        provider.prepare(config);
-        provider.startStream();
-        do {
-            Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
-            for (StreamsDatum datum : provider.readCurrent()) {
-                String json;
-                try {
-                    json = mapper.writeValueAsString(datum.getDocument());
-                    outStream.println(json);
-                } catch (JsonProcessingException e) {
-                    System.err.println(e.getMessage());
-                }
-            }
-        } while( provider.isRunning());
-        provider.cleanUp();
-        outStream.flush();
+      }
     }
+    while ( provider.isRunning());
+    provider.cleanUp();
+    outStream.flush();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProviderTask.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProviderTask.java
index 3800a51..03a66d1 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProviderTask.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProviderTask.java
@@ -18,27 +18,23 @@
 
 package org.apache.streams.rss.provider;
 
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.data.util.RFC3339Utils;
+import org.apache.streams.rss.serializer.SyndEntrySerializer;
+
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import com.sun.syndication.feed.synd.SyndEntry;
 import com.sun.syndication.feed.synd.SyndFeed;
 import com.sun.syndication.io.FeedException;
 import com.sun.syndication.io.SyndFeedInput;
-import com.sun.syndication.io.XmlReader;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.data.util.RFC3339Utils;
-import org.apache.streams.rss.FeedDetails;
-import org.apache.streams.rss.serializer.SyndEntrySerializer;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStreamReader;
-import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLConnection;
 import java.util.Collections;
@@ -50,15 +46,19 @@ import java.util.concurrent.ConcurrentHashMap;
 /**
  * A {@link java.lang.Runnable} task that queues rss feed data.
  *
+ * <p/>
  * <code>RssStreamProviderTask</code> reads the content of an rss feed and queues the articles from
  * the feed inform of a {@link com.fasterxml.jackson.databind.node.ObjectNode} wrapped in a {@link org.apache.streams.core.StreamsDatum}.
  * The task can filter articles by a published date.  If the task cannot parse the date of the article or the article does not contain a
  * published date, by default the task will attempt to queue article.
  *
- * A task can be run in perpetual mode which will store the article urls in a static variable.  The next time a <code>RssStreamProviderTask</code>
- * is run, it will not queue data that was seen the previous time the rss feed was read.  This is an attempt to reduce
- * multiple copies of an article from being out put by a {@link org.apache.streams.rss.provider.RssStreamProvider}.
+ * <p/>
+ * A task can be run in perpetual mode which will store the article urls in a static variable.  The next time a
+ * <code>RssStreamProviderTask</code> is run, it will not queue data that was seen the previous time the rss feed was read.
+ * This is an attempt to reduce multiple copies of an article from being output by a
+ * {@link org.apache.streams.rss.provider.RssStreamProvider}.
  *
+ * <p/>
  * ** Warning! **
  * It still is possible to output multiples of the same article.  If multiple tasks executions for the same rss feed overlap
  * in execution time, it possible that the previously seen articles static variable will not have been updated in time.
@@ -66,183 +66,187 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 public class RssStreamProviderTask implements Runnable {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(RssStreamProviderTask.class);
-    private static final int DEFAULT_TIME_OUT = 10000; // 10 seconds
-    private static final String RSS_KEY = "rssFeed";
-    private static final String URI_KEY = "uri";
-    private static final String LINK_KEY = "link";
-    private static final String DATE_KEY = "publishedDate";
-
-    /**
-     * Map that contains the Set of previously seen articles by an rss feed.
-     */
-    @VisibleForTesting
-    protected static final Map<String, Set<String>> PREVIOUSLY_SEEN = new ConcurrentHashMap<>();
-
-
-    private BlockingQueue<StreamsDatum> dataQueue;
-    private String rssFeed;
-    private int timeOut;
-    private SyndEntrySerializer serializer;
-    private DateTime publishedSince;
-    private boolean perpetual;
-
-
-    /**
-     * Non-perpetual mode, no date filter, time out of 10 sec
-     * @see {@link org.apache.streams.rss.provider.RssStreamProviderTask#RssStreamProviderTask(java.util.concurrent.BlockingQueue, String, org.joda.time.DateTime, int, boolean)}
-     * @param queue
-     * @param rssFeed
-     */
-    public RssStreamProviderTask(BlockingQueue<StreamsDatum> queue, String rssFeed) {
-        this(queue, rssFeed, new DateTime().minusYears(30), DEFAULT_TIME_OUT, false);
-    }
-
-    /**
-     * Non-perpetual mode, no date filter
-     * @see {@link org.apache.streams.rss.provider.RssStreamProviderTask#RssStreamProviderTask(java.util.concurrent.BlockingQueue, String, org.joda.time.DateTime, int, boolean)}
-     * @param queue
-     * @param rssFeed
-     * @param timeOut
-     */
-    public RssStreamProviderTask(BlockingQueue<StreamsDatum> queue, String rssFeed, int timeOut) {
-        this(queue, rssFeed, new DateTime().minusYears(30), timeOut, false);
-    }
-
-    /**
-     * Non-perpetual mode, time out of 10 sec
-     * @see {@link org.apache.streams.rss.provider.RssStreamProviderTask#RssStreamProviderTask(java.util.concurrent.BlockingQueue, String, org.joda.time.DateTime, int, boolean)}
-     * @param queue
-     * @param rssFeed
-     * @param publishedSince
-     */
-    public RssStreamProviderTask(BlockingQueue<StreamsDatum> queue, String rssFeed, DateTime publishedSince) {
-        this(queue, rssFeed, publishedSince, DEFAULT_TIME_OUT, false);
-    }
-
-    /**
-     * RssStreamProviderTask that reads an rss feed url and queues the resulting articles as StreamsDatums with the documents
-     * being object nodes.
-     * @param queue Queue to push data to
-     * @param rssFeed url of rss feed to read
-     * @param publishedSince DateTime to filter articles by, will queue articles with published times after this
-     * @param timeOut url connection timeout in milliseconds
-     * @param perpetual true, if you want to run in perpetual mode. NOT RECOMMENDED
-     */
-    public RssStreamProviderTask(BlockingQueue<StreamsDatum> queue, String rssFeed, DateTime publishedSince, int timeOut, boolean perpetual) {
-        this.dataQueue = queue;
-        this.rssFeed = rssFeed;
-        this.timeOut = timeOut;
-        this.publishedSince = publishedSince;
-        this.serializer = new SyndEntrySerializer();
-        this.perpetual = perpetual;
-    }
-
-    /**
-     * The rss feed url that this task is responsible for reading
-     * @return rss feed url
-     */
-    public String getRssFeed() {
-        return this.rssFeed;
-    }
-
-    @Override
-    public void run() {
-        try {
-            Set<String> batch = queueFeedEntries(new URL(this.rssFeed));
-            if(this.perpetual)
-                PREVIOUSLY_SEEN.put(this.getRssFeed(), batch);
-        } catch (IOException | FeedException e) {
-            LOGGER.warn("Exception while reading rss stream, {} : {}", this.rssFeed, e);
-        }
+  private static final Logger LOGGER = LoggerFactory.getLogger(RssStreamProviderTask.class);
+  private static final int DEFAULT_TIME_OUT = 10000; // 10 seconds
+  private static final String RSS_KEY = "rssFeed";
+  private static final String URI_KEY = "uri";
+  private static final String LINK_KEY = "link";
+  private static final String DATE_KEY = "publishedDate";
+
+  /**
+   * Map that contains the Set of previously seen articles by an rss feed.
+   */
+  @VisibleForTesting
+  protected static final Map<String, Set<String>> PREVIOUSLY_SEEN = new ConcurrentHashMap<>();
+
+
+  private BlockingQueue<StreamsDatum> dataQueue;
+  private String rssFeed;
+  private int timeOut;
+  private SyndEntrySerializer serializer;
+  private DateTime publishedSince;
+  private boolean perpetual;
+
+
+  /**
+   * Non-perpetual mode, no date filter, time out of 10 sec
+   * @see {@link org.apache.streams.rss.provider.RssStreamProviderTask
+   * #RssStreamProviderTask(java.util.concurrent.BlockingQueue, String, org.joda.time.DateTime, int, boolean)}
+   * @param queue queue
+   * @param rssFeed rssFeed
+   */
+  public RssStreamProviderTask(BlockingQueue<StreamsDatum> queue, String rssFeed) {
+    this(queue, rssFeed, new DateTime().minusYears(30), DEFAULT_TIME_OUT, false);
+  }
+
+  /**
+   * Non-perpetual mode, no date filter.
+   * @see {@link org.apache.streams.rss.provider.RssStreamProviderTask
+   * #RssStreamProviderTask(java.util.concurrent.BlockingQueue, String, org.joda.time.DateTime, int, boolean)}
+   * @param queue queue
+   * @param rssFeed rssFeed
+   * @param timeOut timeOut
+   */
+  public RssStreamProviderTask(BlockingQueue<StreamsDatum> queue, String rssFeed, int timeOut) {
+    this(queue, rssFeed, new DateTime().minusYears(30), timeOut, false);
+  }
+
+  /**
+   * Non-perpetual mode, time out of 10 sec
+   * @see {@link org.apache.streams.rss.provider.RssStreamProviderTask
+   * #RssStreamProviderTask(java.util.concurrent.BlockingQueue, String, org.joda.time.DateTime, int, boolean)}
+   * @param queue queue
+   * @param rssFeed rssFeed
+   * @param publishedSince publishedSince
+   */
+  public RssStreamProviderTask(BlockingQueue<StreamsDatum> queue, String rssFeed, DateTime publishedSince) {
+    this(queue, rssFeed, publishedSince, DEFAULT_TIME_OUT, false);
+  }
+
+  /**
+   * RssStreamProviderTask that reads an rss feed url and queues the resulting articles as StreamsDatums with the documents
+   * being object nodes.
+   * @param queue Queue to push data to
+   * @param rssFeed url of rss feed to read
+   * @param publishedSince DateTime to filter articles by, will queue articles with published times after this
+   * @param timeOut url connection timeout in milliseconds
+   * @param perpetual true, if you want to run in perpetual mode. NOT RECOMMENDED
+   */
+  public RssStreamProviderTask(BlockingQueue<StreamsDatum> queue, String rssFeed, DateTime publishedSince, int timeOut, boolean perpetual) {
+    this.dataQueue = queue;
+    this.rssFeed = rssFeed;
+    this.timeOut = timeOut;
+    this.publishedSince = publishedSince;
+    this.serializer = new SyndEntrySerializer();
+    this.perpetual = perpetual;
+  }
+
+  /**
+   * The rss feed url that this task is responsible for reading.
+   * @return rss feed url
+   */
+  public String getRssFeed() {
+    return this.rssFeed;
+  }
+
+  @Override
+  public void run() {
+    try {
+      Set<String> batch = queueFeedEntries(new URL(this.rssFeed));
+      if (this.perpetual) {
+        PREVIOUSLY_SEEN.put(this.getRssFeed(), batch);
+      }
+    } catch (IOException | FeedException ex) {
+      LOGGER.warn("Exception while reading rss stream, {} : {}", this.rssFeed, ex);
     }
-
-    /**
-     * Reads the url and queues the data
-     * @param feedUrl rss feed url
-     * @return set of all article urls that were read from the feed
-     * @throws IOException when it cannot connect to the url or the url is malformed
-     * @throws FeedException when it cannot reed the feed.
-     */
-    @VisibleForTesting
-    protected Set<String> queueFeedEntries(URL feedUrl) throws IOException, FeedException {
-
-        // ConcurrentHashSet is preferable, but it's only in guava 15+
-        // spark 1.5.0 uses guava 14 so for the moment this is the workaround
-        Set<String> batch = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
-        URLConnection connection = feedUrl.openConnection();
-        connection.setConnectTimeout(this.timeOut);
-        connection.setConnectTimeout(this.timeOut);
-        SyndFeedInput input = new SyndFeedInput();
-        SyndFeed feed = input.build(new InputStreamReader(connection.getInputStream()));
-        for (Object entryObj : feed.getEntries()) {
-            SyndEntry entry = (SyndEntry) entryObj;
-            ObjectNode nodeEntry = this.serializer.deserialize(entry);
-            nodeEntry.put(RSS_KEY, this.rssFeed);
-            String entryId = determineId(nodeEntry);
-            batch.add(entryId);
-            StreamsDatum datum = new StreamsDatum(nodeEntry);
-            try {
-                JsonNode published = nodeEntry.get(DATE_KEY);
-                if (published != null) {
-                    try {
-                        DateTime date = RFC3339Utils.parseToUTC(published.asText());
-                        if (date.isAfter(this.publishedSince) && (!this.perpetual || !seenBefore(entryId, this.rssFeed))) {
-                            this.dataQueue.put(datum);
-                            LOGGER.debug("Added entry, {}, to provider queue.", entryId);
-                        }
-                    } catch (InterruptedException ie) {
-                        Thread.currentThread().interrupt();
-                    } catch (Exception e) {
-                        LOGGER.trace("Failed to parse date from object node, attempting to add node to queue by default.");
-                        if(!this.perpetual || !seenBefore(entryId, this.rssFeed)) {
-                            this.dataQueue.put(datum);
-                            LOGGER.debug("Added entry, {}, to provider queue.", entryId);
-                        }
-                    }
-                } else {
-                    LOGGER.debug("No published date present, attempting to add node to queue by default.");
-                    if(!this.perpetual || !seenBefore(entryId, this.rssFeed)) {
-                        this.dataQueue.put(datum);
-                        LOGGER.debug("Added entry, {}, to provider queue.", entryId);
-                    }
-                }
-            } catch (InterruptedException ie) {
-                LOGGER.error("Interupted Exception.");
-                Thread.currentThread().interrupt();
+  }
+
+  /**
+   * Reads the url and queues the data
+   * @param feedUrl rss feed url
+   * @return set of all article urls that were read from the feed
+   * @throws IOException when it cannot connect to the url or the url is malformed
+   * @throws FeedException when it cannot reed the feed.
+   */
+  @VisibleForTesting
+  protected Set<String> queueFeedEntries(URL feedUrl) throws IOException, FeedException {
+
+    // ConcurrentHashSet is preferable, but it's only in guava 15+
+    // spark 1.5.0 uses guava 14 so for the moment this is the workaround
+    Set<String> batch = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+    URLConnection connection = feedUrl.openConnection();
+    connection.setConnectTimeout(this.timeOut);
+    connection.setConnectTimeout(this.timeOut);
+    SyndFeedInput input = new SyndFeedInput();
+    SyndFeed feed = input.build(new InputStreamReader(connection.getInputStream()));
+    for (Object entryObj : feed.getEntries()) {
+      SyndEntry entry = (SyndEntry) entryObj;
+      ObjectNode nodeEntry = this.serializer.deserialize(entry);
+      nodeEntry.put(RSS_KEY, this.rssFeed);
+      String entryId = determineId(nodeEntry);
+      batch.add(entryId);
+      StreamsDatum datum = new StreamsDatum(nodeEntry);
+      try {
+        JsonNode published = nodeEntry.get(DATE_KEY);
+        if (published != null) {
+          try {
+            DateTime date = RFC3339Utils.parseToUTC(published.asText());
+            if (date.isAfter(this.publishedSince) && (!this.perpetual || !seenBefore(entryId, this.rssFeed))) {
+              this.dataQueue.put(datum);
+              LOGGER.debug("Added entry, {}, to provider queue.", entryId);
+            }
+          } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+          } catch (Exception ex) {
+            LOGGER.trace("Failed to parse date from object node, attempting to add node to queue by default.");
+            if (!this.perpetual || !seenBefore(entryId, this.rssFeed)) {
+              this.dataQueue.put(datum);
+              LOGGER.debug("Added entry, {}, to provider queue.", entryId);
             }
+          }
+        } else {
+          LOGGER.debug("No published date present, attempting to add node to queue by default.");
+          if (!this.perpetual || !seenBefore(entryId, this.rssFeed)) {
+            this.dataQueue.put(datum);
+            LOGGER.debug("Added entry, {}, to provider queue.", entryId);
+          }
         }
-        return batch;
+      } catch (InterruptedException ie) {
+        LOGGER.error("Interupted Exception.");
+        Thread.currentThread().interrupt();
+      }
     }
-
-    /**
-     * Returns a link to the article to use as the id
-     * @param node
-     * @return
-     */
-    private String determineId(ObjectNode node) {
-        String id = null;
-        if(node.get(URI_KEY) != null && !node.get(URI_KEY).textValue().equals("")) {
-            id = node.get(URI_KEY).textValue();
-        } else if(node.get(LINK_KEY) != null && !node.get(LINK_KEY).textValue().equals("")) {
-            id = node.get(LINK_KEY).textValue();
-        }
-        return id;
+    return batch;
+  }
+
+  /**
+   * Returns link to the article to use as the id.
+   * @param node node
+   * @return String
+   */
+  private String determineId(ObjectNode node) {
+    String id = null;
+    if (node.get(URI_KEY) != null && !node.get(URI_KEY).textValue().equals("")) {
+      id = node.get(URI_KEY).textValue();
+    } else if (node.get(LINK_KEY) != null && !node.get(LINK_KEY).textValue().equals("")) {
+      id = node.get(LINK_KEY).textValue();
     }
-
-    /**
-     * Returns false if the artile was previously seen in another task for this feed
-     * @param id
-     * @param rssFeed
-     * @return
-     */
-    private boolean seenBefore(String id, String rssFeed) {
-        Set<String> previousBatch = PREVIOUSLY_SEEN.get(rssFeed);
-        if(previousBatch == null) {
-            return false;
-        }
-        return previousBatch.contains(id);
+    return id;
+  }
+
+  /**
+   * Returns false if the artile was previously seen in another task for this feed.
+   * @param id id
+   * @param rssFeed rssFeed
+   * @return boolean seenBefore
+   */
+  private boolean seenBefore(String id, String rssFeed) {
+    Set<String> previousBatch = PREVIOUSLY_SEEN.get(rssFeed);
+    if (previousBatch == null) {
+      return false;
     }
+    return previousBatch.contains(id);
+  }
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java
index 99ccbf3..e4bfd35 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java
@@ -15,12 +15,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.rss.provider.perpetual;
 
-import com.google.common.collect.Maps;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.rss.FeedDetails;
 import org.apache.streams.rss.provider.RssStreamProviderTask;
+
+import com.google.common.collect.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,82 +33,92 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
- *
+ * RssFeedScheduler launches threads to collect data from rss feeds.
  */
 public class RssFeedScheduler implements Runnable {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(RssFeedScheduler.class);
-    private static final int DEFAULT_PEROID = 10; // 1 minute
+  private static final Logger LOGGER = LoggerFactory.getLogger(RssFeedScheduler.class);
+  private static final int DEFAULT_PEROID = 10; // 1 minute
 
-    private ExecutorService service;
-    private List<FeedDetails> feedDetailsList;
-    private int peroid;
-    private AtomicBoolean keepRunning;
-    private AtomicBoolean complete;
-    private Map<String, Long> lastScheduled;
-    private BlockingQueue<StreamsDatum> dataQueue;
+  private ExecutorService service;
+  private List<FeedDetails> feedDetailsList;
+  private int peroid;
+  private AtomicBoolean keepRunning;
+  private AtomicBoolean complete;
+  private Map<String, Long> lastScheduled;
+  private BlockingQueue<StreamsDatum> dataQueue;
 
-    public RssFeedScheduler(ExecutorService service, List<FeedDetails> feedDetailsList, BlockingQueue<StreamsDatum> dataQueue) {
-        this(service, feedDetailsList, dataQueue,  DEFAULT_PEROID);
-    }
+  public RssFeedScheduler(ExecutorService service, List<FeedDetails> feedDetailsList, BlockingQueue<StreamsDatum> dataQueue) {
+    this(service, feedDetailsList, dataQueue,  DEFAULT_PEROID);
+  }
 
-    public RssFeedScheduler(ExecutorService service, List<FeedDetails> feedDetailsList, BlockingQueue<StreamsDatum> dataQueue, int peroid) {
-        this.service = service;
-        this.feedDetailsList = feedDetailsList;
-        this.peroid = peroid;
-        this.keepRunning = new AtomicBoolean(true);
-        this.lastScheduled = Maps.newHashMap();
-        this.dataQueue = dataQueue;
-        this.complete = new AtomicBoolean(false);
-    }
+  /**
+   * RssFeedScheduler constructor.
+   * @param service service
+   * @param feedDetailsList feedDetailsList
+   * @param dataQueue dataQueue
+   * @param peroid peroid
+   */
+  public RssFeedScheduler(ExecutorService service, List<FeedDetails> feedDetailsList, BlockingQueue<StreamsDatum> dataQueue, int peroid) {
+    this.service = service;
+    this.feedDetailsList = feedDetailsList;
+    this.peroid = peroid;
+    this.keepRunning = new AtomicBoolean(true);
+    this.lastScheduled = Maps.newHashMap();
+    this.dataQueue = dataQueue;
+    this.complete = new AtomicBoolean(false);
+  }
 
-    public void stop() {
-        this.keepRunning.set(false);
-    }
+  public void stop() {
+    this.keepRunning.set(false);
+  }
 
-    public boolean isComplete() {
-        return this.complete.get();
-    }
+  public boolean isComplete() {
+    return this.complete.get();
+  }
 
-    @Override
-    public void run() {
-        this.complete.set(false);
-        try {
-            if(this.peroid <= 0) {
-                scheduleFeeds();
-            } else {
-                while (this.keepRunning.get()) {
-                    scheduleFeeds();
-                    Thread.sleep(this.peroid * 60000);
-                }
-            }
-        } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-        } finally {
-            this.service = null;
-            LOGGER.info("{} completed scheduling of feeds.", this.getClass().getName());
-            this.complete.set(true);
+  @Override
+  public void run() {
+    this.complete.set(false);
+    try {
+      if (this.peroid <= 0) {
+        scheduleFeeds();
+      } else {
+        while (this.keepRunning.get()) {
+          scheduleFeeds();
+          Thread.sleep(this.peroid * 60000);
         }
+      }
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+    } finally {
+      this.service = null;
+      LOGGER.info("{} completed scheduling of feeds.", this.getClass().getName());
+      this.complete.set(true);
     }
+  }
 
-    public void scheduleFeeds() {
-        for(FeedDetails detail : this.feedDetailsList) {
-            Long lastTime = null;
-            if((lastTime = this.lastScheduled.get(detail.getUrl())) == null) {
-                lastTime = 0L;
-            }
-            long currentTime = System.currentTimeMillis();
-            long pollInterval;
-            if(detail.getPollIntervalMillis() == null) {
-                pollInterval = 0;
-            } else {
-                pollInterval = detail.getPollIntervalMillis();
-            }
-            if(currentTime - lastTime > pollInterval) {
-                this.service.execute(new RssStreamProviderTask(this.dataQueue, detail.getUrl()));
-                this.LOGGER.trace("Scheduled data collection on rss feed, {}", detail.getUrl());
-                this.lastScheduled.put(detail.getUrl(), currentTime);
-            }
-        }
+  /**
+   * Schedule Feeds.
+   */
+  public void scheduleFeeds() {
+    for (FeedDetails detail : this.feedDetailsList) {
+      Long lastTime = null;
+      if ((lastTime = this.lastScheduled.get(detail.getUrl())) == null) {
+        lastTime = 0L;
+      }
+      long currentTime = System.currentTimeMillis();
+      long pollInterval;
+      if (detail.getPollIntervalMillis() == null) {
+        pollInterval = 0;
+      } else {
+        pollInterval = detail.getPollIntervalMillis();
+      }
+      if (currentTime - lastTime > pollInterval) {
+        this.service.execute(new RssStreamProviderTask(this.dataQueue, detail.getUrl()));
+        this.LOGGER.trace("Scheduled data collection on rss feed, {}", detail.getUrl());
+        this.lastScheduled.put(detail.getUrl(), currentTime);
+      }
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
index e323f27..1e3aedd 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
@@ -18,11 +18,6 @@
 
 package org.apache.streams.rss.serializer;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
 import org.apache.streams.data.ActivitySerializer;
 import org.apache.streams.data.util.RFC3339Utils;
 import org.apache.streams.jackson.StreamsJacksonMapper;
@@ -30,6 +25,12 @@ import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.ActivityObject;
 import org.apache.streams.pojo.json.Author;
 import org.apache.streams.pojo.json.Provider;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.slf4j.Logger;
@@ -40,194 +41,200 @@ import java.util.List;
 
 public class SyndEntryActivitySerializer implements ActivitySerializer<ObjectNode> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(SyndEntryActivitySerializer.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(SyndEntryActivitySerializer.class);
 
-    private boolean includeRomeExtension;
+  private boolean includeRomeExtension;
 
-    public SyndEntryActivitySerializer() {
-        this(true);
-    }
-
-    public SyndEntryActivitySerializer(boolean includeRomeExtension) {
-        this.includeRomeExtension = includeRomeExtension;
-    }
+  public SyndEntryActivitySerializer() {
+    this(true);
+  }
 
+  public SyndEntryActivitySerializer(boolean includeRomeExtension) {
+    this.includeRomeExtension = includeRomeExtension;
+  }
 
-    @Override
-    public List<Activity> deserializeAll(List<ObjectNode> objectNodes) {
-        List<Activity> result = new LinkedList<>();
-        for (ObjectNode node : objectNodes) {
-            result.add(deserialize(node));
-        }
-        return result;
+  @Override
+  public List<Activity> deserializeAll(List<ObjectNode> objectNodes) {
+    List<Activity> result = new LinkedList<>();
+    for (ObjectNode node : objectNodes) {
+      result.add(deserialize(node));
     }
-
-    @Override
-    public String serializationFormat() {
-        return "application/streams-provider-rss";
+    return result;
+  }
+
+  @Override
+  public String serializationFormat() {
+    return "application/streams-provider-rss";
+  }
+
+  @Override
+  public ObjectNode serialize(Activity deserialized) {
+    throw new UnsupportedOperationException("Cannot currently serialize to Rome");
+  }
+
+  @Override
+  public Activity deserialize(ObjectNode syndEntry) {
+    return deserializeWithRomeExtension(syndEntry, this.includeRomeExtension);
+  }
+
+  /**
+   * deserializeWithRomeExtension ObjectNode entry withExtension.
+   * @param entry ObjectNode
+   * @param withExtension whether to add Rome Extension
+   * @return Activity
+   */
+  public Activity deserializeWithRomeExtension(ObjectNode entry, boolean withExtension) {
+    Preconditions.checkNotNull(entry);
+
+    Activity activity = new Activity();
+    Provider provider = buildProvider(entry);
+    ActivityObject actor = buildActor(entry);
+    ActivityObject activityObject = buildActivityObject(entry);
+
+    activityObject.setUrl(provider.getUrl());
+    activityObject.setAuthor(actor.getAuthor());
+
+    activity.setUrl(provider.getUrl());
+    activity.setProvider(provider);
+    activity.setActor(actor);
+    activity.setVerb("post");
+    activity.setId("id:rss:post:" + activity.getUrl());
+
+    JsonNode published = entry.get("publishedDate");
+    if (published != null) {
+      try {
+        activity.setPublished(RFC3339Utils.parseToUTC(published.textValue()));
+      } catch (Exception ex) {
+        LOGGER.warn("Failed to parse date : {}", published.textValue());
+
+        DateTime now = DateTime.now().withZone(DateTimeZone.UTC);
+        activity.setPublished(now);
+      }
     }
 
-    @Override
-    public ObjectNode serialize(Activity deserialized) {
-        throw new UnsupportedOperationException("Cannot currently serialize to Rome");
-    }
+    activity.setUpdated(activityObject.getUpdated());
+    activity.setObject(activityObject);
 
-    @Override
-    public Activity deserialize(ObjectNode syndEntry) {
-        return deserializeWithRomeExtension(syndEntry, this.includeRomeExtension);
+    if (withExtension) {
+      activity = addRomeExtension(activity, entry);
     }
 
-    public Activity deserializeWithRomeExtension(ObjectNode entry, boolean withExtension) {
-        Preconditions.checkNotNull(entry);
-
-        Activity activity = new Activity();
-        Provider provider = buildProvider(entry);
-        ActivityObject actor = buildActor(entry);
-        ActivityObject activityObject = buildActivityObject(entry);
-
-        activityObject.setUrl(provider.getUrl());
-        activityObject.setAuthor(actor.getAuthor());
-
-        activity.setUrl(provider.getUrl());
-        activity.setProvider(provider);
-        activity.setActor(actor);
-        activity.setVerb("post");
-        activity.setId("id:rss:post:" + activity.getUrl());
-
-        JsonNode published = entry.get("publishedDate");
-        if (published != null) {
-            try {
-                activity.setPublished(RFC3339Utils.parseToUTC(published.textValue()));
-            } catch (Exception e) {
-                LOGGER.warn("Failed to parse date : {}", published.textValue());
-
-                DateTime now = DateTime.now().withZone(DateTimeZone.UTC);
-                activity.setPublished(now);
-            }
-        }
-
-        activity.setUpdated(activityObject.getUpdated());
-        activity.setObject(activityObject);
-
-        if (withExtension) {
-            activity = addRomeExtension(activity, entry);
-        }
-
-        return activity;
+    return activity;
+  }
+
+  /**
+   * Given an RSS entry, extra out the author and actor information and return it
+   * in an actor object
+   *
+   * @param entry entry
+   * @return $.actor
+   */
+  private ActivityObject buildActor(ObjectNode entry) {
+    ActivityObject actor = new ActivityObject();
+    Author author = new Author();
+
+    if (entry.get("author") != null) {
+      author.setId(entry.get("author").textValue());
+      author.setDisplayName(entry.get("author").textValue());
+
+      actor.setAuthor(author);
+      String uriToSet = entry.get("rssFeed") != null ? entry.get("rssFeed").asText() : null;
+
+      actor.setId("id:rss:" + uriToSet + ":" + author.getId());
+      actor.setDisplayName(author.getDisplayName());
     }
 
-    /**
-     * Given an RSS entry, extra out the author and actor information and return it
-     * in an actor object
-     *
-     * @param entry
-     * @return
-     */
-    private ActivityObject buildActor(ObjectNode entry) {
-        ActivityObject actor = new ActivityObject();
-        Author author = new Author();
-
-        if (entry.get("author") != null) {
-            author.setId(entry.get("author").textValue());
-            author.setDisplayName(entry.get("author").textValue());
-
-            actor.setAuthor(author);
-            String uriToSet = entry.get("rssFeed") != null ? entry.get("rssFeed").asText() : null;
-
-            actor.setId("id:rss:" + uriToSet + ":" + author.getId());
-            actor.setDisplayName(author.getDisplayName());
-        }
-
-        return actor;
+    return actor;
+  }
+
+  /**
+   * Given an RSS object, build the ActivityObject.
+   *
+   * @param entry ObjectNode
+   * @return $.object
+   */
+  private ActivityObject buildActivityObject(ObjectNode entry) {
+    ActivityObject activityObject = new ActivityObject();
+
+    JsonNode summary = entry.get("description");
+    if (summary != null) {
+      activityObject.setSummary(summary.textValue());
+    } else if ((summary = entry.get("title")) != null) {
+      activityObject.setSummary(summary.textValue());
     }
 
-    /**
-     * Given an RSS object, build the ActivityObject
-     *
-     * @param entry
-     * @return
-     */
-    private ActivityObject buildActivityObject(ObjectNode entry) {
-        ActivityObject activityObject = new ActivityObject();
+    return activityObject;
+  }
 
-        JsonNode summary = entry.get("description");
-        if (summary != null)
-            activityObject.setSummary(summary.textValue());
-        else if((summary = entry.get("title")) != null) {
-            activityObject.setSummary(summary.textValue());
-        }
+  /**
+   * Given an RSS object, build and return the Provider object.
+   *
+   * @param entry ObjectNode
+   * @return $.provider
+   */
+  private Provider buildProvider(ObjectNode entry) {
+    Provider provider = new Provider();
 
-        return activityObject;
-    }
+    String link = null;
+    String uri = null;
+    String resourceLocation = null;
 
-    /**
-     * Given an RSS object, build and return the Provider object
-     *
-     * @param entry
-     * @return
-     */
-    private Provider buildProvider(ObjectNode entry) {
-        Provider provider = new Provider();
-
-        String link = null;
-        String uri = null;
-        String resourceLocation = null;
-
-        if (entry.get("link") != null)
-            link = entry.get("link").textValue();
-        if (entry.get("uri") != null)
-            uri = entry.get("uri").textValue();
-
-        /*
-         * Order of precedence for resourceLocation selection
-         *
-         * 1. Valid URI
-         * 2. Valid Link
-         * 3. Non-null URI
-         * 4. Non-null Link
-         */
-        if(isValidResource(uri))
-            resourceLocation = uri;
-        else if(isValidResource(link))
-            resourceLocation = link;
-        else if(uri != null || link != null) {
-            resourceLocation = (uri != null) ? uri : link;
-        }
-
-        provider.setId("id:providers:rss");
-        provider.setUrl(resourceLocation);
-        provider.setDisplayName("RSS");
-
-        return provider;
+    if (entry.get("link") != null) {
+      link = entry.get("link").textValue();
     }
-
-    /**
-     * Tests whether or not the passed in resource is a valid URI
-     * @param resource
-     * @return boolean of whether or not the resource is valid
-     */
-    private boolean isValidResource(String resource) {
-        return resource != null && (resource.startsWith("http") || resource.startsWith("www"));
+    if (entry.get("uri") != null) {
+      uri = entry.get("uri").textValue();
     }
-
-    /**
-     * Given an RSS object and an existing activity,
-     * add the Rome extension to that activity and return it
+    /*
+     * Order of precedence for resourceLocation selection
      *
-     * @param activity
-     * @param entry
-     * @return
+     * 1. Valid URI
+     * 2. Valid Link
+     * 3. Non-null URI
+     * 4. Non-null Link
      */
-    private Activity addRomeExtension(Activity activity, ObjectNode entry) {
-        ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-        ObjectNode activityRoot = mapper.convertValue(activity, ObjectNode.class);
-        ObjectNode extensions = JsonNodeFactory.instance.objectNode();
-
-        extensions.put("rome", entry);
-        activityRoot.put("extensions", extensions);
-
-        activity = mapper.convertValue(activityRoot, Activity.class);
-
-        return activity;
+    if (isValidResource(uri)) {
+      resourceLocation = uri;
+    } else if (isValidResource(link)) {
+      resourceLocation = link;
+    } else if (uri != null || link != null) {
+      resourceLocation = (uri != null) ? uri : link;
     }
+
+    provider.setId("id:providers:rss");
+    provider.setUrl(resourceLocation);
+    provider.setDisplayName("RSS");
+
+    return provider;
+  }
+
+  /**
+   * Tests whether or not the passed in resource is a valid URI.
+   * @param resource resource
+   * @return boolean of whether or not the resource is valid
+   */
+  private boolean isValidResource(String resource) {
+    return resource != null && (resource.startsWith("http") || resource.startsWith("www"));
+  }
+
+  /**
+   * Given an RSS object and an existing activity,
+   * add the Rome extension to that activity and return it.
+   *
+   * @param activity Activity
+   * @param entry ObjectNode
+   * @return Activity
+   */
+  private Activity addRomeExtension(Activity activity, ObjectNode entry) {
+    ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+    ObjectNode activityRoot = mapper.convertValue(activity, ObjectNode.class);
+    ObjectNode extensions = JsonNodeFactory.instance.objectNode();
+
+    extensions.put("rome", entry);
+    activityRoot.put("extensions", extensions);
+
+    activity = mapper.convertValue(activityRoot, Activity.class);
+
+    return activity;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java
index 1135172..6868bfc 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java
@@ -26,7 +26,12 @@ import com.sun.syndication.feed.module.Module;
 import com.sun.syndication.feed.rss.Category;
 import com.sun.syndication.feed.rss.Content;
 import com.sun.syndication.feed.rss.Enclosure;
-import com.sun.syndication.feed.synd.*;
+import com.sun.syndication.feed.synd.SyndContent;
+import com.sun.syndication.feed.synd.SyndEnclosure;
+import com.sun.syndication.feed.synd.SyndEntry;
+import com.sun.syndication.feed.synd.SyndFeed;
+import com.sun.syndication.feed.synd.SyndImage;
+import com.sun.syndication.feed.synd.SyndLinkImpl;
 import org.joda.time.format.DateTimeFormatter;
 import org.joda.time.format.ISODateTimeFormat;
 import org.slf4j.Logger;
@@ -42,267 +47,284 @@ import java.util.List;
  */
 public class SyndEntrySerializer {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(SyndEntrySerializer.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(SyndEntrySerializer.class);
 
-    public ObjectNode deserialize(SyndEntry entry) {
-        return deserializeRomeEntry(entry);
-    }
-
-
-    public List<ObjectNode> deserializeAll(Collection<SyndEntry> entries) {
-        List<ObjectNode> result = Lists.newLinkedList();
-        for(SyndEntry entry : entries) {
-            result.add(deserialize(entry));
-        }
-        return result;
-    }
+  public ObjectNode deserialize(SyndEntry entry) {
+    return deserializeRomeEntry(entry);
+  }
 
+  private ObjectNode deserializeRomeEntry(SyndEntry entry) {
+    JsonNodeFactory factory = JsonNodeFactory.instance;
+    ObjectNode root = factory.objectNode();
 
+    serializeString(entry.getAuthor(), "author", root);
+    serializeListOfStrings(entry.getAuthors(), "authors", root, factory);
+    serializeCategories(root, factory, entry.getCategories());
+    serializeContents(root, factory, entry.getContents());
+    serializeListOfStrings(entry.getContributors(), "contributors", root, factory);
+    serializeDescription(root, factory, entry.getDescription());
+    serializeEnclosures(root, factory, entry.getEnclosures());
+    serializeForeignMarkUp(root, factory, entry.getForeignMarkup());
+    serializeString(entry.getLink(), "link", root);
+    serializeLinks(root, factory, entry.getLinks());
+    serializeModules(root, factory, entry.getModules());
+    serializeDate(root, entry.getPublishedDate(), "publishedDate");
+    serializeSource(root, factory, entry.getSource());
+    serializeString(entry.getTitle(), "title", root);
+    serializeDate(root, entry.getUpdatedDate(), "updateDate");
+    serializeString(entry.getUri(), "uri", root);
 
-    private ObjectNode deserializeRomeEntry(SyndEntry entry) {
-        JsonNodeFactory factory = JsonNodeFactory.instance;
-        ObjectNode root = factory.objectNode();
+    return root;
+  }
 
-        serializeString(entry.getAuthor(), "author", root);
-        serializeListOfStrings(entry.getAuthors(), "authors", root, factory);
-        serializeCategories(root, factory, entry.getCategories());
-        serializeContents(root, factory, entry.getContents());
-        serializeListOfStrings(entry.getContributors(), "contributors", root, factory);
-        serializeDescription(root, factory, entry.getDescription());
-        serializeEnclosures(root, factory, entry.getEnclosures());
-        serializeForeignMarkUp(root, factory, entry.getForeignMarkup());
-        serializeString(entry.getLink(), "link", root);
-        serializeLinks(root, factory, entry.getLinks());
-        serializeModules(root, factory, entry.getModules());
-        serializeDate(root, entry.getPublishedDate(), "publishedDate");
-        serializeSource(root, factory, entry.getSource());
-        serializeString(entry.getTitle(), "title", root);
-        serializeDate(root, entry.getUpdatedDate(), "updateDate");
-        serializeString(entry.getUri(), "uri", root);
 
-        return root;
+  private void serializeCategories(ObjectNode root, JsonNodeFactory factory, List categories) {
+    if (categories == null || categories.size() == 0) {
+      return;
     }
-
-
-    private void serializeCategories(ObjectNode root, JsonNodeFactory factory, List categories) {
-        if(categories == null || categories.size() == 0)
-            return;
-        ArrayNode cats = factory.arrayNode();
-        for(Object obj : categories) {
-            if(obj instanceof Category) {
-                ObjectNode catNode = factory.objectNode();
-                Category category = (Category) obj;
-                if(category.getDomain() != null)
-                    catNode.put("domain", category.getDomain());
-                if(category.getValue() != null)
-                    catNode.put("value", category.getValue());
-                cats.add(catNode);
-            }
-            else if(obj instanceof com.sun.syndication.feed.atom.Category) {
-                com.sun.syndication.feed.atom.Category category = (com.sun.syndication.feed.atom.Category) obj;
-                ObjectNode catNode = factory.objectNode();
-                if(category.getLabel() != null)
-                    catNode.put("label", category.getLabel());
-                if(category.getScheme() != null)
-                    catNode.put("scheme", category.getScheme());
-                if(category.getSchemeResolved() != null)
-                    catNode.put("schemeResolved", category.getSchemeResolved());
-                if(category.getTerm() != null )
-                    catNode.put("term", category.getTerm());
-                cats.add(catNode);
-            }
+    ArrayNode cats = factory.arrayNode();
+    for (Object obj : categories) {
+      if (obj instanceof Category) {
+        ObjectNode catNode = factory.objectNode();
+        Category category = (Category) obj;
+        if (category.getDomain() != null) {
+          catNode.put("domain", category.getDomain());
+        }
+        if (category.getValue() != null) {
+          catNode.put("value", category.getValue());
         }
-        root.put("categories", cats);
+        cats.add(catNode);
+      } else if (obj instanceof com.sun.syndication.feed.atom.Category) {
+        com.sun.syndication.feed.atom.Category category = (com.sun.syndication.feed.atom.Category) obj;
+        ObjectNode catNode = factory.objectNode();
+        if (category.getLabel() != null) {
+          catNode.put("label", category.getLabel());
+        }
+        if (category.getScheme() != null) {
+          catNode.put("scheme", category.getScheme());
+        }
+        if (category.getSchemeResolved() != null) {
+          catNode.put("schemeResolved", category.getSchemeResolved());
+        }
+        if (category.getTerm() != null ) {
+          catNode.put("term", category.getTerm());
+        }
+        cats.add(catNode);
+      }
     }
+    root.put("categories", cats);
+  }
 
-    private void serializeContents(ObjectNode root, JsonNodeFactory factory, List contents) {
-        if(contents == null || contents.size() == 0)
-            return;
-        ArrayNode contentsArray = factory.arrayNode();
-        for(Object obj : contents) {
-            ObjectNode content = factory.objectNode();
-            if(obj instanceof Content) {
-                Content rssContent = (Content) obj;
-                content.put("type", rssContent.getType());
-                content.put("value", rssContent.getValue());
-            }
-            if(obj instanceof com.sun.syndication.feed.atom.Content) {
-                com.sun.syndication.feed.atom.Content atomContent = (com.sun.syndication.feed.atom.Content) obj;
-                content.put("type", atomContent.getType());
-                content.put("value", atomContent.getValue());
-                content.put("mode", atomContent.getMode());
-                content.put("src", atomContent.getSrc());
-            }
-            contentsArray.add(content);
-        }
-        root.put("contents", contentsArray);
+  private void serializeContents(ObjectNode root, JsonNodeFactory factory, List contents) {
+    if (contents == null || contents.size() == 0) {
+      return;
+    }
+    ArrayNode contentsArray = factory.arrayNode();
+    for (Object obj : contents) {
+      ObjectNode content = factory.objectNode();
+      if (obj instanceof Content) {
+        Content rssContent = (Content) obj;
+        content.put("type", rssContent.getType());
+        content.put("value", rssContent.getValue());
+      }
+      if (obj instanceof com.sun.syndication.feed.atom.Content) {
+        com.sun.syndication.feed.atom.Content atomContent = (com.sun.syndication.feed.atom.Content) obj;
+        content.put("type", atomContent.getType());
+        content.put("value", atomContent.getValue());
+        content.put("mode", atomContent.getMode());
+        content.put("src", atomContent.getSrc());
+      }
+      contentsArray.add(content);
     }
+    root.put("contents", contentsArray);
+  }
 
-    private void serializeDate(ObjectNode root, Date date, String key) {
-        DateTimeFormatter formatter = ISODateTimeFormat.dateTime();
-        if(date == null)
-            return;
-        root.put(key, formatter.print(date.getTime()));
+  private void serializeDate(ObjectNode root, Date date, String key) {
+    DateTimeFormatter formatter = ISODateTimeFormat.dateTime();
+    if (date == null) {
+      return;
     }
+    root.put(key, formatter.print(date.getTime()));
+  }
 
-    private void serializeDescription(ObjectNode root, JsonNodeFactory factory, SyndContent synd) {
-        if(synd == null)
-            return;
-        ObjectNode content = factory.objectNode();
-        if(synd.getValue() != null)
-            content.put("value", synd.getValue());
-        if(synd.getMode() != null)
-            content.put("mode", synd.getMode());
-        if(synd.getType() != null)
-            content.put("type", synd.getType());
-        root.put("description", content);
+  private void serializeDescription(ObjectNode root, JsonNodeFactory factory, SyndContent synd) {
+    if (synd == null) {
+      return;
+    }
+    ObjectNode content = factory.objectNode();
+    if (synd.getValue() != null) {
+      content.put("value", synd.getValue());
     }
+    if (synd.getMode() != null) {
+      content.put("mode", synd.getMode());
+    }
+    if (synd.getType() != null) {
+      content.put("type", synd.getType());
+    }
+    root.put("description", content);
+  }
 
-    private void serializeEnclosures(ObjectNode root, JsonNodeFactory factory, List enclosures) {
-        if(enclosures == null || enclosures.size() == 0)
-            return;
-        ArrayNode encls = factory.arrayNode();
-        for(Object obj : enclosures) {
-            if(obj instanceof Enclosure){
-                Enclosure enclosure = (Enclosure) obj;
-                ObjectNode encl = factory.objectNode();
-                if(enclosure.getType() != null)
-                    encl.put("type", enclosure.getType());
-                if(enclosure.getUrl() != null)
-                    encl.put("url", enclosure.getUrl());
-                encl.put("length", enclosure.getLength());
-                encls.add(encl);
-            } else if(obj instanceof SyndEnclosure) {
-                SyndEnclosure enclosure = (SyndEnclosure) obj;
-                ObjectNode encl = factory.objectNode();
-                if(enclosure.getType() != null)
-                    encl.put("type", enclosure.getType());
-                if(enclosure.getUrl() != null)
-                    encl.put("url", enclosure.getUrl());
-                encl.put("length", enclosure.getLength());
-                encls.add(encl);
-            } else {
-                LOGGER.warn("serializeEnclosures does not handle type : {}", obj.getClass().toString());
-            }
+  private void serializeEnclosures(ObjectNode root, JsonNodeFactory factory, List enclosures) {
+    if (enclosures == null || enclosures.size() == 0) {
+      return;
+    }
+    ArrayNode encls = factory.arrayNode();
+    for (Object obj : enclosures) {
+      if (obj instanceof Enclosure) {
+        Enclosure enclosure = (Enclosure) obj;
+        ObjectNode encl = factory.objectNode();
+        if (enclosure.getType() != null) {
+          encl.put("type", enclosure.getType());
+        }
+        if (enclosure.getUrl() != null) {
+          encl.put("url", enclosure.getUrl());
         }
-        root.put("enclosures", encls);
+        encl.put("length", enclosure.getLength());
+        encls.add(encl);
+      } else if (obj instanceof SyndEnclosure) {
+        SyndEnclosure enclosure = (SyndEnclosure) obj;
+        ObjectNode encl = factory.objectNode();
+        if (enclosure.getType() != null) {
+          encl.put("type", enclosure.getType());
+        }
+        if (enclosure.getUrl() != null) {
+          encl.put("url", enclosure.getUrl());
+        }
+        encl.put("length", enclosure.getLength());
+        encls.add(encl);
+      } else {
+        LOGGER.warn("serializeEnclosures does not handle type : {}", obj.getClass().toString());
+      }
     }
+    root.put("enclosures", encls);
+  }
 
-    private void serializeForeignMarkUp(ObjectNode root, JsonNodeFactory factory, Object foreignMarkUp) {
-        if(foreignMarkUp == null)
-            return;
-        if(foreignMarkUp instanceof String) {
-            root.put("foreignEnclosures", (String) foreignMarkUp);
-        } else if (foreignMarkUp instanceof List) {
-            List foreignList = (List) foreignMarkUp;
-            if(foreignList.size() == 0)
-                return;
-            if(foreignList.get(0) instanceof String) {
-                serializeListOfStrings(foreignList, "foreignEnclosures", root, factory);
-            } else {
-                LOGGER.debug("SyndEntry.getForeignMarkUp is not of type String. Need to handle the case of class : {}", ((List)foreignMarkUp).get(0).getClass().toString());
-            }
-        } else {
-            LOGGER.debug("SyndEntry.getForeignMarkUp is not of an expected type. Need to handle the case of class : {}", foreignMarkUp.getClass().toString());
-        }
+  private void serializeForeignMarkUp(ObjectNode root, JsonNodeFactory factory, Object foreignMarkUp) {
+    if (foreignMarkUp == null) {
+      return;
     }
+    if (foreignMarkUp instanceof String) {
+      root.put("foreignEnclosures", (String) foreignMarkUp);
+    } else if (foreignMarkUp instanceof List) {
+      List foreignList = (List) foreignMarkUp;
+      if (foreignList.size() == 0) {
+        return;
+      }
+      if (foreignList.get(0) instanceof String) {
+        serializeListOfStrings(foreignList, "foreignEnclosures", root, factory);
+      } else {
+        LOGGER.debug("SyndEntry.getForeignMarkUp is not of type String. Need to handle the case of class : {}",
+            ((List)foreignMarkUp).get(0).getClass().toString());
+      }
+    } else {
+      LOGGER.debug("SyndEntry.getForeignMarkUp is not of an expected type. Need to handle the case of class : {}",
+          foreignMarkUp.getClass().toString());
+    }
+  }
 
-    private void serializeImage(ObjectNode root, JsonNodeFactory factory, SyndImage image) {
-        if(image == null)
-            return;
-        ObjectNode imageNode = factory.objectNode();
-        serializeString(image.getDescription(), "description", imageNode);
-        serializeString(image.getLink(), "link", imageNode);
-        serializeString(image.getUrl(), "url", imageNode);
-        serializeString(image.getTitle(), "title", imageNode);
-        root.put("image", imageNode);
+  private void serializeImage(ObjectNode root, JsonNodeFactory factory, SyndImage image) {
+    if (image == null) {
+      return;
     }
+    ObjectNode imageNode = factory.objectNode();
+    serializeString(image.getDescription(), "description", imageNode);
+    serializeString(image.getLink(), "link", imageNode);
+    serializeString(image.getUrl(), "url", imageNode);
+    serializeString(image.getTitle(), "title", imageNode);
+    root.put("image", imageNode);
+  }
 
-    private void serializeListOfStrings(List toSerialize, String key, ObjectNode node, JsonNodeFactory factory) {
-        if(toSerialize == null || toSerialize.size() == 0)
-            return;
-        ArrayNode keyNode = factory.arrayNode();
-        for(Object obj : toSerialize) {
-            if(obj instanceof String) {
-                keyNode.add((String) obj);
-            } else {
-                LOGGER.debug("Array at Key:{} was expecting item types of String. Received class : {}", key, obj.getClass().toString());
-            }
-        }
-        node.put(key, keyNode);
+  private void serializeListOfStrings(List toSerialize, String key, ObjectNode node, JsonNodeFactory factory) {
+    if (toSerialize == null || toSerialize.size() == 0) {
+      return;
     }
+    ArrayNode keyNode = factory.arrayNode();
+    for (Object obj : toSerialize) {
+      if (obj instanceof String) {
+        keyNode.add((String) obj);
+      } else {
+        LOGGER.debug("Array at Key:{} was expecting item types of String. Received class : {}", key, obj.getClass().toString());
+      }
+    }
+    node.put(key, keyNode);
+  }
 
-    private void serializeLinks(ObjectNode root, JsonNodeFactory factory, List links) {
-        if(links == null || links.size() == 0) {
-            return;
-        } else if(links.get(0) instanceof String) {
-            serializeListOfStrings(links, "links", root, factory);
-        } else if(links.get(0) instanceof SyndLinkImpl) {
-            ArrayNode linksArray = factory.arrayNode();
-            SyndLinkImpl syndLink;
-            ObjectNode linkNode;
-            for(Object obj : links) {
-                linkNode = factory.objectNode();
-                syndLink = (SyndLinkImpl) obj;
-                linkNode.put("rel", syndLink.getRel());
-                linkNode.put("href", syndLink.getHref());
-                linkNode.put("type", syndLink.getType());
-                linkNode.put("length", syndLink.getLength());
-                linkNode.put("hrefLang", syndLink.getHreflang());
-                linkNode.put("title", syndLink.getTitle());
-                linksArray.add(linkNode);
-            }
-            root.put("links", linksArray);
-        } else {
-            LOGGER.error("No implementation for handling links of class : {}", links.get(0).getClass().toString());
-        }
+  private void serializeLinks(ObjectNode root, JsonNodeFactory factory, List links) {
+    if (links == null || links.size() == 0) {
+      return;
+    } else if (links.get(0) instanceof String) {
+      serializeListOfStrings(links, "links", root, factory);
+    } else if (links.get(0) instanceof SyndLinkImpl) {
+      ArrayNode linksArray = factory.arrayNode();
+      SyndLinkImpl syndLink;
+      ObjectNode linkNode;
+      for (Object obj : links) {
+        linkNode = factory.objectNode();
+        syndLink = (SyndLinkImpl) obj;
+        linkNode.put("rel", syndLink.getRel());
+        linkNode.put("href", syndLink.getHref());
+        linkNode.put("type", syndLink.getType());
+        linkNode.put("length", syndLink.getLength());
+        linkNode.put("hrefLang", syndLink.getHreflang());
+        linkNode.put("title", syndLink.getTitle());
+        linksArray.add(linkNode);
+      }
+      root.put("links", linksArray);
+    } else {
+      LOGGER.error("No implementation for handling links of class : {}", links.get(0).getClass().toString());
     }
+  }
 
-    private void serializeModules(ObjectNode root, JsonNodeFactory factory, List modules) {
-        if(modules == null || modules.size() == 0)
-            return;
-        ArrayNode modulesArray = factory.arrayNode();
-        for(Object obj : modules) {
-            if(obj instanceof Module) {
-                Module mod = (Module) obj;
-                if(mod.getUri() != null)
-                    modulesArray.add(mod.getUri());
-            } else {
-                LOGGER.debug("SyndEntry.getModules() items are not of type Module. Need to handle the case of class : {}", obj.getClass().toString());
-            }
+  private void serializeModules(ObjectNode root, JsonNodeFactory factory, List modules) {
+    if (modules == null || modules.size() == 0) {
+      return;
+    }
+    ArrayNode modulesArray = factory.arrayNode();
+    for (Object obj : modules) {
+      if (obj instanceof Module) {
+        Module mod = (Module) obj;
+        if (mod.getUri() != null) {
+          modulesArray.add(mod.getUri());
         }
-        root.put("modules", modulesArray);
+      } else {
+        LOGGER.debug("SyndEntry.getModules() items are not of type Module. Need to handle the case of class : {}",
+            obj.getClass().toString());
+      }
     }
+    root.put("modules", modulesArray);
+  }
 
-    private void serializeSource(ObjectNode root, JsonNodeFactory factory, SyndFeed source) {
-        if(source == null)
-            return;
-        ObjectNode sourceNode = factory.objectNode();
-        serializeString(source.getAuthor(), "author", sourceNode);
-        serializeListOfStrings(source.getAuthors(), "authors", sourceNode, factory);
-        serializeCategories(sourceNode, factory, source.getCategories());
-        serializeString(source.getCopyright(), "copyright", sourceNode);
-        serializeListOfStrings(source.getContributors(), "contributors", sourceNode, factory);
-        serializeString(source.getDescription(), "description", sourceNode);
-        serializeDescription(sourceNode, factory, source.getDescriptionEx());
-        // source.getEntries(); wtf?
-        serializeString(source.getFeedType(), "feedType", sourceNode);
-        serializeImage(sourceNode, factory, source.getImage());
-        serializeForeignMarkUp(sourceNode, factory, source.getForeignMarkup());
-        serializeString(source.getLanguage(), "language", sourceNode);
-        serializeString(source.getLink(), "link", sourceNode);
-        serializeListOfStrings(source.getLinks(), "links", sourceNode, factory);
-        serializeModules(sourceNode, factory, source.getModules());
-        serializeDate(sourceNode, source.getPublishedDate(), "publishedDate");
-        serializeString(source.getTitle(), "title", sourceNode);
-        serializeString(source.getUri(), "uri", sourceNode);
-
-        root.put("source", sourceNode);
+  private void serializeSource(ObjectNode root, JsonNodeFactory factory, SyndFeed source) {
+    if (source == null) {
+      return;
     }
+    ObjectNode sourceNode = factory.objectNode();
+    serializeString(source.getAuthor(), "author", sourceNode);
+    serializeListOfStrings(source.getAuthors(), "authors", sourceNode, factory);
+    serializeCategories(sourceNode, factory, source.getCategories());
+    serializeString(source.getCopyright(), "copyright", sourceNode);
+    serializeListOfStrings(source.getContributors(), "contributors", sourceNode, factory);
+    serializeString(source.getDescription(), "description", sourceNode);
+    serializeDescription(sourceNode, factory, source.getDescriptionEx());
+    // source.getEntries(); wtf?
+    serializeString(source.getFeedType(), "feedType", sourceNode);
+    serializeImage(sourceNode, factory, source.getImage());
+    serializeForeignMarkUp(sourceNode, factory, source.getForeignMarkup());
+    serializeString(source.getLanguage(), "language", sourceNode);
+    serializeString(source.getLink(), "link", sourceNode);
+    serializeListOfStrings(source.getLinks(), "links", sourceNode, factory);
+    serializeModules(sourceNode, factory, source.getModules());
+    serializeDate(sourceNode, source.getPublishedDate(), "publishedDate");
+    serializeString(source.getTitle(), "title", sourceNode);
+    serializeString(source.getUri(), "uri", sourceNode);
+
+    root.put("source", sourceNode);
+  }
 
-    private void serializeString(String string, String key, ObjectNode node) {
-        if(string != null && !string.equals(""))
-            node.put(key, string);
+  private void serializeString(String string, String key, ObjectNode node) {
+    if (string != null && !string.equals("")) {
+      node.put(key, string);
     }
+  }
 
 }