You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/11/25 20:25:04 UTC

[23/42] incubator-streams git commit: STREAMS-440: custom checkstyle.xml, address compliance

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTaskIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTaskIT.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTaskIT.java
index cb71c90..6722f3c 100644
--- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTaskIT.java
+++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTaskIT.java
@@ -15,13 +15,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.rss.provider;
 
 import org.apache.streams.core.StreamsDatum;
+
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.net.URL;
@@ -37,111 +38,113 @@ import static org.junit.Assert.assertTrue;
  */
 public class RssStreamProviderTaskIT {
 
-
-    @Before
-    public void clearPreviouslySeen() {
-        //some test runners run in parallel so needs to be synchronized
-        //if tests are run in parallel test will have undetermined results.
-        synchronized (RssStreamProviderTask.PREVIOUSLY_SEEN) {
-            RssStreamProviderTask.PREVIOUSLY_SEEN.clear();
-        }
+  /**
+   * clearPreviouslySeen.
+   */
+  @Before
+  public void clearPreviouslySeen() {
+    //some test runners run in parallel so needs to be synchronized
+    //if tests are run in parallel test will have undetermined results.
+    synchronized (RssStreamProviderTask.PREVIOUSLY_SEEN) {
+      RssStreamProviderTask.PREVIOUSLY_SEEN.clear();
     }
+  }
 
-    /**
-     * Test that a task can read a valid rss from a url and queue the data
-     * @throws Exception
-     */
-    @Test
-    public void testNonPerpetualNoTimeFramePull() throws Exception {
-        com.healthmarketscience.common.util.resource.Handler.init();
-        BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>();
-        RssStreamProviderTask task = new RssStreamProviderTask(queue, "fake url");
-        Set<String> batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml"));
-        assertEquals("Expected batch size to be the same as amount of queued datums", batch.size(), queue.size());
-        RssStreamProviderTask.PREVIOUSLY_SEEN.put("fake url", batch);
-        //Test that  it will out previously seen articles
-        queue.clear();
-        batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml"));
-        assertEquals("Expected batch size to be the same as amount of queued datums", batch.size(), queue.size());
-    }
+  /**
+   * Test that a task can read a valid rss from a url and queue the data.
+   * @throws Exception Exception
+   */
+  @Test
+  public void testNonPerpetualNoTimeFramePull() throws Exception {
+    com.healthmarketscience.common.util.resource.Handler.init();
+    BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>();
+    RssStreamProviderTask task = new RssStreamProviderTask(queue, "fake url");
+    Set<String> batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml"));
+    assertEquals("Expected batch size to be the same as amount of queued datums", batch.size(), queue.size());
+    RssStreamProviderTask.PREVIOUSLY_SEEN.put("fake url", batch);
+    //Test that  it will out previously seen articles
+    queue.clear();
+    batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml"));
+    assertEquals("Expected batch size to be the same as amount of queued datums", batch.size(), queue.size());
+  }
 
-    /**
-     * Test that perpetual streams will not output previously seen articles
-     * @throws Exception
-     */
-    @Test
-    public void testPerpetualNoTimeFramePull() throws Exception {
-        com.healthmarketscience.common.util.resource.Handler.init();
-        BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>();
-        RssStreamProviderTask task = new RssStreamProviderTask(queue, "fake url", new DateTime().minusYears(5), 10000, true);
-        Set<String> batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml"));
-        assertEquals("Expected batch size to be the same as amount of queued datums", batch.size(), queue.size());
-        RssStreamProviderTask.PREVIOUSLY_SEEN.put("fake url", batch);
-        //Test that it will not out previously seen articles
-        queue.clear();
-        batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml"));
-        assertEquals("Expected queue size to be 0", 0, queue.size());
-        assertEquals("Expected batch size to be 20", 20, batch.size());
-        RssStreamProviderTask.PREVIOUSLY_SEEN.put("fake url", batch);
-        //Test that not seen urls aren't blocked.
-        queue.clear();
-        batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist2.xml"));
-        assertEquals(batch.size(), queue.size());
-        assertEquals("Expected queue size to be 25", 25, queue.size());
-        assertEquals("Expected batch size to be 25", 25, batch.size());
-    }
+  /**
+   * Test that perpetual streams will not output previously seen articles.
+   * @throws Exception Exception
+   */
+  @Test
+  public void testPerpetualNoTimeFramePull() throws Exception {
+    com.healthmarketscience.common.util.resource.Handler.init();
+    BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>();
+    RssStreamProviderTask task = new RssStreamProviderTask(queue, "fake url", new DateTime().minusYears(5), 10000, true);
+    Set<String> batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml"));
+    assertEquals("Expected batch size to be the same as amount of queued datums", batch.size(), queue.size());
+    RssStreamProviderTask.PREVIOUSLY_SEEN.put("fake url", batch);
+    //Test that it will not out previously seen articles
+    queue.clear();
+    batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml"));
+    assertEquals("Expected queue size to be 0", 0, queue.size());
+    assertEquals("Expected batch size to be 20", 20, batch.size());
+    RssStreamProviderTask.PREVIOUSLY_SEEN.put("fake url", batch);
+    //Test that not seen urls aren't blocked.
+    queue.clear();
+    batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist2.xml"));
+    assertEquals(batch.size(), queue.size());
+    assertEquals("Expected queue size to be 25", 25, queue.size());
+    assertEquals("Expected batch size to be 25", 25, batch.size());
+  }
 
-    /**
-     * Test that you can task will only output aritcles after a certain published time
-     * @throws Exception
-     */
-    @Test
-    public void testNonPerpetualTimeFramedPull() throws Exception{
-        com.healthmarketscience.common.util.resource.Handler.init();
-        BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>();
-        DateTime publishedSince = new DateTime().withYear(2014).withDayOfMonth(5).withMonthOfYear(9).withZone(DateTimeZone.UTC);
-        RssStreamProviderTask task = new RssStreamProviderTask(queue, "fake url", publishedSince, 10000, false);
-        Set<String> batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml"));
-        assertEquals( 15, queue.size());
-        assertEquals( 20 , batch.size());
-        assertTrue( queue.size() < batch.size());
-        RssStreamProviderTask.PREVIOUSLY_SEEN.put("fake url", batch);
-        //Test that  it will out previously seen articles
-        queue.clear();
-        batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml"));
-        assertEquals( 15, queue.size());
-        assertEquals( 20 , batch.size());
-        assertTrue( queue.size() < batch.size());
-    }
+  /**
+   * Test that you can task will only output aritcles after a certain published time.
+   * @throws Exception Exception
+   */
+  @Test
+  public void testNonPerpetualTimeFramedPull() throws Exception {
+    com.healthmarketscience.common.util.resource.Handler.init();
+    BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>();
+    DateTime publishedSince = new DateTime().withYear(2014).withDayOfMonth(5).withMonthOfYear(9).withZone(DateTimeZone.UTC);
+    RssStreamProviderTask task = new RssStreamProviderTask(queue, "fake url", publishedSince, 10000, false);
+    Set<String> batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml"));
+    assertEquals( 15, queue.size());
+    assertEquals( 20 , batch.size());
+    assertTrue( queue.size() < batch.size());
+    RssStreamProviderTask.PREVIOUSLY_SEEN.put("fake url", batch);
+    //Test that  it will out previously seen articles
+    queue.clear();
+    batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml"));
+    assertEquals( 15, queue.size());
+    assertEquals( 20 , batch.size());
+    assertTrue( queue.size() < batch.size());
+  }
 
-    /**
-     * Test that task will only output articles after a certain published time that it has not seen before.
-     * @throws Exception
-     */
-    @Test
-    public void testPerpetualTimeFramedPull() throws Exception {
-        com.healthmarketscience.common.util.resource.Handler.init();
-        BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>();
-        DateTime publishedSince = new DateTime().withYear(2014).withDayOfMonth(5).withMonthOfYear(9).withZone(DateTimeZone.UTC);
-        RssStreamProviderTask task = new RssStreamProviderTask(queue, "fake url", publishedSince, 10000, true);
-        Set<String> batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml"));
-        assertEquals( 15, queue.size());
-        assertEquals( 20 , batch.size());
-        assertTrue( queue.size() < batch.size());
-        RssStreamProviderTask.PREVIOUSLY_SEEN.put("fake url", batch);
-        //Test that  it will not out put previously seen articles
-        queue.clear();
-        batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml"));
-        assertEquals( 0, queue.size());
-        assertEquals( 20 , batch.size());
-        assertTrue( queue.size() < batch.size());
-        RssStreamProviderTask.PREVIOUSLY_SEEN.put("fake url", batch);
-
-        batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist2.xml"));
-        assertTrue( queue.size() < batch.size());
-        assertEquals("Expected queue size to be 0", 3, queue.size());
-        assertEquals("Expected batch size to be 0", 25, batch.size());
-    }
+  /**
+   * Test that task will only output articles after a certain published time that it has not seen before.
+   * @throws Exception Exception
+   */
+  @Test
+  public void testPerpetualTimeFramedPull() throws Exception {
+    com.healthmarketscience.common.util.resource.Handler.init();
+    BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>();
+    DateTime publishedSince = new DateTime().withYear(2014).withDayOfMonth(5).withMonthOfYear(9).withZone(DateTimeZone.UTC);
+    RssStreamProviderTask task = new RssStreamProviderTask(queue, "fake url", publishedSince, 10000, true);
+    Set<String> batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml"));
+    assertEquals( 15, queue.size());
+    assertEquals( 20 , batch.size());
+    assertTrue( queue.size() < batch.size());
+    RssStreamProviderTask.PREVIOUSLY_SEEN.put("fake url", batch);
+    //Test that  it will not out put previously seen articles
+    queue.clear();
+    batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml"));
+    assertEquals( 0, queue.size());
+    assertEquals( 20 , batch.size());
+    assertTrue( queue.size() < batch.size());
+    RssStreamProviderTask.PREVIOUSLY_SEEN.put("fake url", batch);
+
+    batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist2.xml"));
+    assertTrue( queue.size() < batch.size());
+    assertEquals("Expected queue size to be 0", 3, queue.size());
+    assertEquals("Expected batch size to be 0", 25, batch.size());
+  }
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTest.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTest.java
index 60b8e0f..08a58d3 100644
--- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTest.java
+++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTest.java
@@ -18,12 +18,13 @@
 
 package org.apache.streams.rss.provider;
 
-import com.carrotsearch.randomizedtesting.RandomizedTest;
-import com.google.common.collect.Queues;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.rss.RssStreamConfiguration;
 import org.apache.streams.rss.provider.perpetual.RssFeedScheduler;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.google.common.collect.Queues;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,85 +37,87 @@ import java.util.concurrent.CountDownLatch;
  */
 public class RssStreamProviderTest extends RandomizedTest {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(RssStreamProviderTest.class);
-
-    @Test
-    public void testRssFeedShutdownsNonPerpetual() throws Exception {
-        RssStreamProvider provider = null;
-        try {
-            final CountDownLatch latch = new CountDownLatch(1);
-            BlockingQueue<StreamsDatum> datums = Queues.newLinkedBlockingQueue();
-            provider = new RssStreamProvider(new RssStreamConfiguration()) {
-                @Override
-                protected RssFeedScheduler getScheduler(BlockingQueue<StreamsDatum> queue) {
-                    return new MockScheduler(latch, queue);
-                }
-            };
-            provider.prepare(null);
-            int datumCount = 0;
-            provider.startStream();
-            while (!provider.scheduler.isComplete()) {
-                StreamsResultSet batch = provider.readCurrent();
-                LOGGER.debug("Batch size : {}", batch.size());
-                datumCount += batch.size();
-                Thread.sleep(randomIntBetween(0, 3000));
-            }
-            latch.await();
-
-            //one last pull incase of race condition
-            StreamsResultSet batch = provider.readCurrent();
-            LOGGER.debug("Batch size : {}", batch.size());
-            datumCount += batch.size();
-            if(batch.size() != 0) { //if race condition happened, pull again
-                batch = provider.readCurrent();
-                assertEquals(0, batch.size());
-            }
-
-            assertTrue(provider.scheduler.isComplete());
-            assertEquals(20, datumCount);
-            assertFalse(provider.isRunning());
-            assertEquals(0, datums.size());
-            provider.cleanUp();
-        } finally {
-            if(provider != null)
-                provider.cleanUp();
+  private static final Logger LOGGER = LoggerFactory.getLogger(RssStreamProviderTest.class);
+
+  @Test
+  public void testRssFeedShutdownsNonPerpetual() throws Exception {
+    RssStreamProvider provider = null;
+    try {
+      final CountDownLatch latch = new CountDownLatch(1);
+      BlockingQueue<StreamsDatum> datums = Queues.newLinkedBlockingQueue();
+      provider = new RssStreamProvider(new RssStreamConfiguration()) {
+        @Override
+        protected RssFeedScheduler getScheduler(BlockingQueue<StreamsDatum> queue) {
+          return new MockScheduler(latch, queue);
         }
+      };
+      provider.prepare(null);
+      int datumCount = 0;
+      provider.startStream();
+      while (!provider.scheduler.isComplete()) {
+        StreamsResultSet batch = provider.readCurrent();
+        LOGGER.debug("Batch size : {}", batch.size());
+        datumCount += batch.size();
+        Thread.sleep(randomIntBetween(0, 3000));
+      }
+      latch.await();
+
+      //one last pull incase of race condition
+      StreamsResultSet batch = provider.readCurrent();
+      LOGGER.debug("Batch size : {}", batch.size());
+      datumCount += batch.size();
+      if (batch.size() != 0) {
+        //if race condition happened, pull again
+        batch = provider.readCurrent();
+        assertEquals(0, batch.size());
+      }
+
+      assertTrue(provider.scheduler.isComplete());
+      assertEquals(20, datumCount);
+      assertFalse(provider.isRunning());
+      assertEquals(0, datums.size());
+      provider.cleanUp();
+    } finally {
+      if (provider != null) {
+        provider.cleanUp();
+      }
     }
+  }
 
 
-    private class MockScheduler extends RssFeedScheduler {
+  private class MockScheduler extends RssFeedScheduler {
 
-        private BlockingQueue<StreamsDatum> queue;
-        private CountDownLatch latch;
-        private volatile boolean complete = false;
+    private BlockingQueue<StreamsDatum> queue;
+    private CountDownLatch latch;
+    private volatile boolean complete = false;
 
-        public MockScheduler(CountDownLatch latch, BlockingQueue<StreamsDatum> dataQueue) {
-            super(null, null, dataQueue);
-            this.latch = latch;
-            this.queue = dataQueue;
-        }
+    public MockScheduler(CountDownLatch latch, BlockingQueue<StreamsDatum> dataQueue) {
+      super(null, null, dataQueue);
+      this.latch = latch;
+      this.queue = dataQueue;
+    }
 
-        @Override
-        public void run() {
-            try {
-                for (int i = 0; i < 20; ++i) {
-                    this.queue.put(new StreamsDatum(null));
-                    Thread.sleep(randomIntBetween(0, 5000));
-                }
-            } catch (InterruptedException ie) {
-                Thread.currentThread().interrupt();
-            } finally {
-                this.complete = true;
-                this.latch.countDown();
-            }
+    @Override
+    public void run() {
+      try {
+        for (int i = 0; i < 20; ++i) {
+          this.queue.put(new StreamsDatum(null));
+          Thread.sleep(randomIntBetween(0, 5000));
         }
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+      } finally {
+        this.complete = true;
+        this.latch.countDown();
+      }
+    }
 
 
-        @Override
-        public boolean isComplete() {
-            return this.complete;
-        }
+    @Override
+    public boolean isComplete() {
+      return this.complete;
     }
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java
index 2bd0b69..830f0e7 100644
--- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java
+++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java
@@ -15,12 +15,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.rss.provider.perpetual;
 
-import com.google.common.collect.Lists;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.rss.FeedDetails;
 import org.apache.streams.rss.provider.RssStreamProviderTask;
+
+import com.google.common.collect.Lists;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -34,7 +36,6 @@ import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Unit tests for {@link org.apache.streams.rss.provider.perpetual.RssFeedScheduler}
@@ -42,60 +43,60 @@ import static org.mockito.Mockito.when;
 public class RssFeedSchedulerTest {
 
 
-    /**
-     * Test that feeds are scheduled based on elapsed time correctly.
-     * Takes 1 minute to run.
-     */
-    @Test
-    public void testScheduleFeeds() {
-        ExecutorService mockService = mock(ExecutorService.class);
-        final List<String> queuedTasks = new ArrayList<>(5);
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-                queuedTasks.add(((RssStreamProviderTask) invocationOnMock.getArguments()[0]).getRssFeed());
-                return null;
-            }
-        }).when(mockService).execute(any(Runnable.class));
+  /**
+   * Test that feeds are scheduled based on elapsed time correctly.
+   * Takes 1 minute to run.
+   */
+  @Test
+  public void testScheduleFeeds() {
+    ExecutorService mockService = mock(ExecutorService.class);
+    final List<String> queuedTasks = new ArrayList<>(5);
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+        queuedTasks.add(((RssStreamProviderTask) invocationOnMock.getArguments()[0]).getRssFeed());
+        return null;
+      }
+    }).when(mockService).execute(any(Runnable.class));
 
-        RssFeedScheduler scheduler = new RssFeedScheduler(mockService, createFeedList(), new LinkedBlockingQueue<StreamsDatum>(), 1);
-        scheduler.scheduleFeeds();
-        assertEquals("Expected 2 Feeds to be scheduled", 2, queuedTasks.size());
-        assertEquals("Expected Feed 1 to be queued first",  "1", queuedTasks.get(0));
-        assertEquals("Expected Feed 2 to be queued second", "2", queuedTasks.get(1));
+    RssFeedScheduler scheduler = new RssFeedScheduler(mockService, createFeedList(), new LinkedBlockingQueue<StreamsDatum>(), 1);
+    scheduler.scheduleFeeds();
+    assertEquals("Expected 2 Feeds to be scheduled", 2, queuedTasks.size());
+    assertEquals("Expected Feed 1 to be queued first",  "1", queuedTasks.get(0));
+    assertEquals("Expected Feed 2 to be queued second", "2", queuedTasks.get(1));
 
-        safeSleep(1);
-        scheduler.scheduleFeeds();
-        assertEquals("Only feed 1 should have been re-queued", 3, queuedTasks.size());
-        assertEquals("Only feed 1 should have been re-queued", "1", queuedTasks.get(2));
+    safeSleep(1);
+    scheduler.scheduleFeeds();
+    assertEquals("Only feed 1 should have been re-queued", 3, queuedTasks.size());
+    assertEquals("Only feed 1 should have been re-queued", "1", queuedTasks.get(2));
 
-        safeSleep(60 * 1000);
-        scheduler.scheduleFeeds();
-        assertEquals("Both feeds should have been re-queued", 5, queuedTasks.size());
-        assertEquals("1", queuedTasks.get(3));
-        assertEquals("2", queuedTasks.get(4));
-    }
+    safeSleep(60 * 1000);
+    scheduler.scheduleFeeds();
+    assertEquals("Both feeds should have been re-queued", 5, queuedTasks.size());
+    assertEquals("1", queuedTasks.get(3));
+    assertEquals("2", queuedTasks.get(4));
+  }
 
-    private List<FeedDetails> createFeedList() {
-        List<FeedDetails> list = Lists.newLinkedList();
-        FeedDetails fd = new FeedDetails();
-        fd.setPollIntervalMillis(1L);
-        fd.setUrl("1");
-        list.add(fd);
+  private List<FeedDetails> createFeedList() {
+    List<FeedDetails> list = Lists.newLinkedList();
+    FeedDetails fd = new FeedDetails();
+    fd.setPollIntervalMillis(1L);
+    fd.setUrl("1");
+    list.add(fd);
 
-        fd = new FeedDetails();
-        fd.setPollIntervalMillis( 60L * 1000);
-        fd.setUrl("2");
-        list.add(fd);
-        return list;
-    }
+    fd = new FeedDetails();
+    fd.setPollIntervalMillis( 60L * 1000);
+    fd.setUrl("2");
+    list.add(fd);
+    return list;
+  }
 
-    private void safeSleep(long milliseconds) {
-        try {
-            Thread.sleep(milliseconds);
-        } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-        }
+  private void safeSleep(long milliseconds) {
+    try {
+      Thread.sleep(milliseconds);
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
     }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java
index ccac8aa..ccd8b74 100644
--- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java
+++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java
@@ -18,17 +18,16 @@
 
 package org.apache.streams.rss.test;
 
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.rss.FeedDetails;
+import org.apache.streams.rss.RssStreamConfiguration;
+import org.apache.streams.rss.provider.RssStreamProvider;
+
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.commons.lang.StringUtils;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.rss.FeedDetails;
-import org.apache.streams.rss.RssStreamConfiguration;
-import org.apache.streams.rss.provider.RssStreamProvider;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -53,69 +52,68 @@ import static org.hamcrest.number.OrderingComparison.greaterThan;
  */
 public class RssStreamProviderIT {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(RssStreamProviderIT.class);
-
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-    @Test
-    public void testRssStreamProvider() throws Exception {
-
-        String configfile = "./target/test-classes/RssStreamProviderIT.conf";
-        String outfile = "./target/test-classes/RssStreamProviderIT.stdout.txt";
-
-        InputStream is = RssStreamProviderIT.class.getResourceAsStream("/top100.txt");
-        InputStreamReader isr = new InputStreamReader(is);
-        BufferedReader br = new BufferedReader(isr);
-
-        RssStreamConfiguration configuration = new RssStreamConfiguration();
-        List<FeedDetails> feedArray = Lists.newArrayList();
-        try {
-            while (br.ready()) {
-                String line = br.readLine();
-                if(!StringUtils.isEmpty(line))
-                {
-                    feedArray.add(new FeedDetails().withUrl(line).withPollIntervalMillis(5000l));
-                }
-            }
-            configuration.setFeeds(feedArray);
-        } catch( Exception e ) {
-            System.out.println(e);
-            e.printStackTrace();
-            Assert.fail();
+  private static final Logger LOGGER = LoggerFactory.getLogger(RssStreamProviderIT.class);
+
+  private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+  @Test
+  public void testRssStreamProvider() throws Exception {
+
+    final String configfile = "./target/test-classes/RssStreamProviderIT.conf";
+    final String outfile = "./target/test-classes/RssStreamProviderIT.stdout.txt";
+
+    InputStream is = RssStreamProviderIT.class.getResourceAsStream("/top100.txt");
+    InputStreamReader isr = new InputStreamReader(is);
+    BufferedReader br = new BufferedReader(isr);
+
+    RssStreamConfiguration configuration = new RssStreamConfiguration();
+    List<FeedDetails> feedArray = Lists.newArrayList();
+    try {
+      while (br.ready()) {
+        String line = br.readLine();
+        if (!StringUtils.isEmpty(line)) {
+          feedArray.add(new FeedDetails().withUrl(line).withPollIntervalMillis(5000L));
         }
+      }
+      configuration.setFeeds(feedArray);
+    } catch ( Exception ex ) {
+      System.out.println(ex);
+      ex.printStackTrace();
+      Assert.fail();
+    }
 
-        Assert.assertThat(configuration.getFeeds().size(), greaterThan(70));
+    Assert.assertThat(configuration.getFeeds().size(), greaterThan(70));
 
-        OutputStream os = new FileOutputStream(configfile);
-        OutputStreamWriter osw = new OutputStreamWriter(os);
-        BufferedWriter bw = new BufferedWriter(osw);
+    OutputStream os = new FileOutputStream(configfile);
+    OutputStreamWriter osw = new OutputStreamWriter(os);
+    BufferedWriter bw = new BufferedWriter(osw);
 
-        // write conf
-        ObjectNode feedsNode = mapper.convertValue(configuration, ObjectNode.class);
-        JsonNode configNode = mapper.createObjectNode().set("rss", feedsNode);
+    // write conf
+    ObjectNode feedsNode = mapper.convertValue(configuration, ObjectNode.class);
+    JsonNode configNode = mapper.createObjectNode().set("rss", feedsNode);
 
-        bw.write(mapper.writeValueAsString(configNode));
-        bw.flush();
-        bw.close();
+    bw.write(mapper.writeValueAsString(configNode));
+    bw.flush();
+    bw.close();
 
-        File config = new File(configfile);
-        assert (config.exists());
-        assert (config.canRead());
-        assert (config.isFile());
+    File config = new File(configfile);
+    assert (config.exists());
+    assert (config.canRead());
+    assert (config.isFile());
 
-        RssStreamProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2]));
+    RssStreamProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2]));
 
-        File out = new File(outfile);
-        assert (out.exists());
-        assert (out.canRead());
-        assert (out.isFile());
+    File out = new File(outfile);
+    assert (out.exists());
+    assert (out.canRead());
+    assert (out.isFile());
 
-        FileReader outReader = new FileReader(out);
-        LineNumberReader outCounter = new LineNumberReader(outReader);
+    FileReader outReader = new FileReader(out);
+    LineNumberReader outCounter = new LineNumberReader(outReader);
 
-        while(outCounter.readLine() != null) {}
+    while (outCounter.readLine() != null) {}
 
-        assert (outCounter.getLineNumber() >= 200);
+    assert (outCounter.getLineNumber() >= 200);
 
-    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssTypeConverterTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssTypeConverterTest.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssTypeConverterTest.java
index 9def7ac..37833c5 100644
--- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssTypeConverterTest.java
+++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssTypeConverterTest.java
@@ -18,17 +18,18 @@
 
 package org.apache.streams.rss.test;
 
-import org.apache.commons.lang3.SerializationUtils;
 import org.apache.streams.rss.processor.RssTypeConverter;
+
+import org.apache.commons.lang3.SerializationUtils;
 import org.junit.Test;
 
 /**
  * Tests Serializability of {@link org.apache.streams.rss.processor.RssTypeConverter}
  */
 public class RssTypeConverterTest {
-    @Test
-    public void testSerializability() {
-        RssTypeConverter converter = new RssTypeConverter();
-        RssTypeConverter clone = SerializationUtils.clone(converter);
-    }
+  @Test
+  public void testSerializability() {
+    RssTypeConverter converter = new RssTypeConverter();
+    RssTypeConverter clone = SerializationUtils.clone(converter);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerializerIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerializerIT.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerializerIT.java
index b1d5f9d..01f1999 100644
--- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerializerIT.java
+++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerializerIT.java
@@ -18,15 +18,16 @@
 
 package org.apache.streams.rss.test;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.ActivityObject;
 import org.apache.streams.pojo.json.Author;
 import org.apache.streams.pojo.json.Provider;
 import org.apache.streams.rss.serializer.SyndEntryActivitySerializer;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.junit.Test;
@@ -37,88 +38,94 @@ import java.net.URL;
 import java.util.List;
 import java.util.Scanner;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests ability to convert SyndEntry ObjectNode form to {@link org.apache.streams.rss.processor.RssTypeConverter} form
  */
 public class SyndEntryActivitySerializerIT {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(SyndEntryActivitySerializerIT.class);
-
-    private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-    @Test
-    public void testJsonData() throws Exception {
-        Scanner scanner = new Scanner(this.getClass().getResourceAsStream("/TestSyndEntryJson.txt"));
-        List<Activity> activities = Lists.newLinkedList();
-        List<ObjectNode> objects = Lists.newLinkedList();
-
-        SyndEntryActivitySerializer serializer = new SyndEntryActivitySerializer();
+  private static final Logger LOGGER = LoggerFactory.getLogger(SyndEntryActivitySerializerIT.class);
 
-        while(scanner.hasNext()) {
-            String line = scanner.nextLine();
-            LOGGER.debug(line);
-            ObjectNode node = (ObjectNode) mapper.readTree(line);
+  private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
-            objects.add(node);
-            activities.add(serializer.deserialize(node));
-        }
+  @Test
+  public void testJsonData() throws Exception {
+    Scanner scanner = new Scanner(this.getClass().getResourceAsStream("/TestSyndEntryJson.txt"));
+    List<Activity> activities = Lists.newLinkedList();
+    List<ObjectNode> objects = Lists.newLinkedList();
 
-        assertEquals(11, activities.size());
+    SyndEntryActivitySerializer serializer = new SyndEntryActivitySerializer();
 
-        for(int x = 0; x < activities.size(); x ++) {
-            ObjectNode n = objects.get(x);
-            Activity a = activities.get(x);
+    while (scanner.hasNext()) {
+      String line = scanner.nextLine();
+      LOGGER.debug(line);
+      ObjectNode node = (ObjectNode) mapper.readTree(line);
 
-            testActor(n.get("author").asText(), a.getActor());
-            testAuthor(n.get("author").asText(), a.getObject().getAuthor());
-            testProvider("id:providers:rss", "RSS", a.getProvider());
-            testProviderUrl(a.getProvider());
-            testVerb("post", a.getVerb());
-            testPublished(n.get("publishedDate").asText(), a.getPublished());
-            testUrl(n.get("uri").asText(), n.get("link").asText(), a);
-        }
+      objects.add(node);
+      activities.add(serializer.deserialize(node));
     }
 
-    public void testVerb(String expected, String verb) {
-        assertEquals(expected, verb);
-    }
+    assertEquals(11, activities.size());
 
-    public void testPublished(String expected, DateTime published) {
-        assertEquals(new DateTime(expected, DateTimeZone.UTC), published);
-    }
+    for (int x = 0; x < activities.size(); x++) {
+      ObjectNode objectNode = objects.get(x);
+      Activity activity = activities.get(x);
 
-    public void testActor(String expected, ActivityObject actor) {
-        assertEquals("id:rss:null" + ":" + expected, actor.getId());
-        assertEquals(expected, actor.getDisplayName());
+      testActor(objectNode.get("author").asText(), activity.getActor());
+      testAuthor(objectNode.get("author").asText(), activity.getObject().getAuthor());
+      testProvider("id:providers:rss", "RSS", activity.getProvider());
+      validateProviderUrl(activity.getProvider());
+      testVerb("post", activity.getVerb());
+      testPublished(objectNode.get("publishedDate").asText(), activity.getPublished());
+      testUrl(objectNode.get("uri").asText(), objectNode.get("link").asText(), activity);
     }
-
-    public void testAuthor(String expected, Author author) {
-        assertEquals(expected, author.getDisplayName());
-        assertEquals(expected, author.getId());
-    }
-
-    public void testProvider(String expectedId, String expectedDisplay, Provider provider) {
-        assertEquals(expectedId, provider.getId());
-        assertEquals(expectedDisplay, provider.getDisplayName());
+  }
+
+  public void testVerb(String expected, String verb) {
+    assertEquals(expected, verb);
+  }
+
+  public void testPublished(String expected, DateTime published) {
+    assertEquals(new DateTime(expected, DateTimeZone.UTC), published);
+  }
+
+  public void testActor(String expected, ActivityObject actor) {
+    assertEquals("id:rss:null" + ":" + expected, actor.getId());
+    assertEquals(expected, actor.getDisplayName());
+  }
+
+  public void testAuthor(String expected, Author author) {
+    assertEquals(expected, author.getDisplayName());
+    assertEquals(expected, author.getId());
+  }
+
+  public void testProvider(String expectedId, String expectedDisplay, Provider provider) {
+    assertEquals(expectedId, provider.getId());
+    assertEquals(expectedDisplay, provider.getDisplayName());
+  }
+
+  /**
+   * validate Provider Url.
+   * @param provider Provider
+   */
+  public void validateProviderUrl(Provider provider) {
+    URL url = null;
+
+    try {
+      url = new URL(provider.getUrl());
+      url.toURI();
+    } catch (Exception ex) {
+      LOGGER.error("Threw an exception while trying to validate URL: {} - {}", provider.getUrl(), ex);
     }
 
-    public void testProviderUrl(Provider provider) {
-        URL url = null;
+    assertNotNull(url);
+  }
 
-        try {
-            url = new URL(provider.getUrl());
-            url.toURI();
-        } catch(Exception e) {
-            LOGGER.error("Threw an exception while trying to validate URL: {} - {}", provider.getUrl(), e);
-        }
-
-        assertNotNull(url);
-    }
-
-    public void testUrl(String expectedURI, String expectedLink, Activity activity) {
-        assertTrue((expectedURI == activity.getUrl() || expectedLink == activity.getUrl()));
-        assertTrue((expectedURI == activity.getObject().getUrl() || expectedLink == activity.getObject().getUrl()));
-    }
+  public void testUrl(String expectedUri, String expectedLink, Activity activity) {
+    assertTrue((expectedUri == activity.getUrl() || expectedLink == activity.getUrl()));
+    assertTrue((expectedUri == activity.getObject().getUrl() || expectedLink == activity.getObject().getUrl()));
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosException.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosException.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosException.java
index a38e267..1480308 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosException.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/SysomosException.java
@@ -20,44 +20,44 @@
 package org.apache.streams.sysomos;
 
 /**
- * Runtime exception wrapper for Sysomos Errors
+ * Runtime exception wrapper for Sysomos Errors.
  */
 public class SysomosException extends RuntimeException {
 
-    private int errorCode = -1;
+  private int errorCode = -1;
 
-    public SysomosException() {
-        // TODO Auto-generated constructor stub
-    }
+  public SysomosException() {
+    // TODO Auto-generated constructor stub
+  }
 
-    public SysomosException(String arg0) {
-        super(arg0);
-        // TODO Auto-generated constructor stub
-    }
+  public SysomosException(String arg0) {
+    super(arg0);
+    // TODO Auto-generated constructor stub
+  }
 
-    public SysomosException(Throwable arg0) {
-        super(arg0);
-        // TODO Auto-generated constructor stub
-    }
+  public SysomosException(Throwable arg0) {
+    super(arg0);
+    // TODO Auto-generated constructor stub
+  }
 
-    public SysomosException(String arg0, Throwable arg1) {
-        super(arg0, arg1);
-        // TODO Auto-generated constructor stub
-    }
+  public SysomosException(String arg0, Throwable arg1) {
+    super(arg0, arg1);
+    // TODO Auto-generated constructor stub
+  }
 
-    public SysomosException(String arg0, int errorCode) {
-        super(arg0);
-        this.errorCode = errorCode;
-    }
+  public SysomosException(String arg0, int errorCode) {
+    super(arg0);
+    this.errorCode = errorCode;
+  }
 
-    public SysomosException(String arg0, Throwable arg1, int errorCode) {
-        super(arg0, arg1);
-        this.errorCode = errorCode;
-    }
+  public SysomosException(String arg0, Throwable arg1, int errorCode) {
+    super(arg0, arg1);
+    this.errorCode = errorCode;
+  }
 
-    public int getErrorCode() {
-        return this.errorCode;
-    }
+  public int getErrorCode() {
+    return this.errorCode;
+  }
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java
index 5d2a399..c0278cd 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/conversion/SysomosBeatActivityConverter.java
@@ -19,12 +19,13 @@
 
 package org.apache.streams.sysomos.conversion;
 
-import com.sysomos.xml.BeatApi;
-import org.apache.commons.lang.StringUtils;
 import org.apache.streams.pojo.extensions.ExtensionUtil;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.ActivityObject;
 import org.apache.streams.pojo.json.Provider;
+
+import com.sysomos.xml.BeatApi;
+import org.apache.commons.lang.StringUtils;
 import org.joda.time.DateTime;
 
 import java.util.HashMap;
@@ -42,103 +43,109 @@ import static org.apache.streams.data.util.ActivityUtil.getProviderId;
  */
 public class SysomosBeatActivityConverter {
 
-    private static final String LANGUAGE_KEY = "LANGUAGE";
-
-    public Activity convert(BeatApi.BeatResponse.Beat beat) {
-        Activity converted = new Activity();
-        converted.setId(beat.getDocid());
-        converted.setVerb("post");
-        converted.setContent(beat.getContent());
-        converted.setTitle(beat.getTitle());
-        converted.setPublished(new DateTime(beat.getTime()));
-        converted.setUrl(beat.getLink());
-        converted.setActor(new ActivityObject());
-        Map<String, BeatApi.BeatResponse.Beat.Tag> mappedTags = mapTags(beat);
-        Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(converted);
-        extensions.put("keywords", beat.getContent());
-        setLocation(beat, extensions);
-        setObject(beat, converted);
-        setProvider(beat, converted);
-        setLanguage(mappedTags, extensions);
-        extensions.put("sysomos", beat);
-
-        setChannelSpecificValues(beat, converted, mappedTags);
-
-        return converted;
-    }
-
-    protected void setChannelSpecificValues(BeatApi.BeatResponse.Beat beat, Activity converted, Map<String, BeatApi.BeatResponse.Beat.Tag> mappedTags) {
-        String mediaType = beat.getMediaType();
-        String lowerMediaType = mediaType.toLowerCase();
-        ActivityObject actor = converted.getActor();
-        ActivityObject object = converted.getObject();
-        if ("TWITTER".equals(mediaType)) {
-            actor.setId(getPersonId(lowerMediaType, beat.getHost()));
-            actor.setDisplayName(beat.getHost());
-            actor.setUrl("http://twitter.com/" + beat.getHost());
-            object.setObjectType("tweet");
-            object.setId(getObjectId(lowerMediaType, "tweet", beat.getTweetid()));
-        } else if ("FACEBOOK".equals(mediaType)) {
-            String fbid = mappedTags.containsKey("FBID") ? mappedTags.get("FBID").getValue() : "";
-            actor.setId(getPersonId(lowerMediaType, fbid));
-            actor.setDisplayName(beat.getTitle());
-            actor.setUrl(beat.getHost());
-            object.setObjectType("post");
-            object.setId(getObjectId(lowerMediaType, "post", String.valueOf(converted.getContent().hashCode())));
-        } else {
-            actor.setId(null);
-            actor.setDisplayName(null);
-            actor.setUrl(null);
-            object.setObjectType("post");
-            object.setId(getObjectId(lowerMediaType, "post", String.valueOf(converted.getContent().hashCode())));
-        }
+  private static final String LANGUAGE_KEY = "LANGUAGE";
+
+  /**
+   * convert BeatApi.BeatResponse.Beat to Activity
+   * @param beat BeatApi.BeatResponse.Beat
+   * @return Activity
+   */
+  public Activity convert(BeatApi.BeatResponse.Beat beat) {
+    Activity converted = new Activity();
+    converted.setId(beat.getDocid());
+    converted.setVerb("post");
+    converted.setContent(beat.getContent());
+    converted.setTitle(beat.getTitle());
+    converted.setPublished(new DateTime(beat.getTime()));
+    converted.setUrl(beat.getLink());
+    converted.setActor(new ActivityObject());
+    Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(converted);
+    extensions.put("keywords", beat.getContent());
+    setLocation(beat, extensions);
+    setObject(beat, converted);
+    setProvider(beat, converted);
+    Map<String, BeatApi.BeatResponse.Beat.Tag> mappedTags = mapTags(beat);
+    setLanguage(mappedTags, extensions);
+    extensions.put("sysomos", beat);
+
+    setChannelSpecificValues(beat, converted, mappedTags);
+
+    return converted;
+  }
+
+  protected void setChannelSpecificValues(
+      BeatApi.BeatResponse.Beat beat,
+      Activity converted, Map<String, BeatApi.BeatResponse.Beat.Tag> mappedTags) {
+    String mediaType = beat.getMediaType();
+    String lowerMediaType = mediaType.toLowerCase();
+    ActivityObject actor = converted.getActor();
+    ActivityObject object = converted.getObject();
+    if ("TWITTER".equals(mediaType)) {
+      actor.setId(getPersonId(lowerMediaType, beat.getHost()));
+      actor.setDisplayName(beat.getHost());
+      actor.setUrl("http://twitter.com/" + beat.getHost());
+      object.setObjectType("tweet");
+      object.setId(getObjectId(lowerMediaType, "tweet", beat.getTweetid()));
+    } else if ("FACEBOOK".equals(mediaType)) {
+      String fbid = mappedTags.containsKey("FBID") ? mappedTags.get("FBID").getValue() : "";
+      actor.setId(getPersonId(lowerMediaType, fbid));
+      actor.setDisplayName(beat.getTitle());
+      actor.setUrl(beat.getHost());
+      object.setObjectType("post");
+      object.setId(getObjectId(lowerMediaType, "post", String.valueOf(converted.getContent().hashCode())));
+    } else {
+      actor.setId(null);
+      actor.setDisplayName(null);
+      actor.setUrl(null);
+      object.setObjectType("post");
+      object.setId(getObjectId(lowerMediaType, "post", String.valueOf(converted.getContent().hashCode())));
     }
+  }
 
-    protected void setLanguage(Map<String, BeatApi.BeatResponse.Beat.Tag> mappedTags, Map<String, Object> extensions) {
-        if(mappedTags.containsKey(LANGUAGE_KEY)) {
-            extensions.put(LANGUAGE_EXTENSION, mappedTags.get(LANGUAGE_KEY).getValue());
-        }
+  protected void setLanguage(Map<String, BeatApi.BeatResponse.Beat.Tag> mappedTags, Map<String, Object> extensions) {
+    if (mappedTags.containsKey(LANGUAGE_KEY)) {
+      extensions.put(LANGUAGE_EXTENSION, mappedTags.get(LANGUAGE_KEY).getValue());
     }
-
-    protected void setObject(BeatApi.BeatResponse.Beat beat, Activity converted) {
-        ActivityObject object = new ActivityObject();
-        converted.setObject(object);
-        object.setUrl(beat.getLink());
-        object.setContent(beat.getContent());
-    }
-
-    @SuppressWarnings("unchecked")
-    protected void setLocation(BeatApi.BeatResponse.Beat beat, Map<String, Object> extensions) {
-        Map<String, Object> location;
-        String country = beat.getLocation().getCountry();
-        if(StringUtils.isNotBlank(country)) {
-            if (extensions.containsKey(LOCATION_EXTENSION)) {
-                location = (Map<String, Object>) extensions.get(LOCATION_EXTENSION);
-            } else {
-                location = new HashMap<>();
-                extensions.put(LOCATION_EXTENSION, location);
-            }
-            location.put(LOCATION_EXTENSION_COUNTRY, country);
-        }
+  }
+
+  protected void setObject(BeatApi.BeatResponse.Beat beat, Activity converted) {
+    ActivityObject object = new ActivityObject();
+    converted.setObject(object);
+    object.setUrl(beat.getLink());
+    object.setContent(beat.getContent());
+  }
+
+  @SuppressWarnings("unchecked")
+  protected void setLocation(BeatApi.BeatResponse.Beat beat, Map<String, Object> extensions) {
+    Map<String, Object> location;
+    String country = beat.getLocation().getCountry();
+    if (StringUtils.isNotBlank(country)) {
+      if (extensions.containsKey(LOCATION_EXTENSION)) {
+        location = (Map<String, Object>) extensions.get(LOCATION_EXTENSION);
+      } else {
+        location = new HashMap<>();
+        extensions.put(LOCATION_EXTENSION, location);
+      }
+      location.put(LOCATION_EXTENSION_COUNTRY, country);
     }
-
-    protected void setProvider(BeatApi.BeatResponse.Beat beat, Activity converted) {
-        Provider provider = new Provider();
-        String mediaType = beat.getMediaType().toLowerCase();
-        provider.setId(getProviderId(mediaType));
-        provider.setDisplayName(StringUtils.capitalize(mediaType));
-        converted.setProvider(provider);
+  }
+
+  protected void setProvider(BeatApi.BeatResponse.Beat beat, Activity converted) {
+    Provider provider = new Provider();
+    String mediaType = beat.getMediaType().toLowerCase();
+    provider.setId(getProviderId(mediaType));
+    provider.setDisplayName(StringUtils.capitalize(mediaType));
+    converted.setProvider(provider);
+  }
+
+  protected Map<String, BeatApi.BeatResponse.Beat.Tag> mapTags(BeatApi.BeatResponse.Beat beat) {
+    Map<String, BeatApi.BeatResponse.Beat.Tag> tags = new HashMap<>();
+    for (BeatApi.BeatResponse.Beat.Tag tag : beat.getTag()) {
+      if (tag.getSystemType() != null) {
+        tags.put(tag.getSystemType().trim(), tag);
+      }
     }
-
-    protected Map<String, BeatApi.BeatResponse.Beat.Tag> mapTags(BeatApi.BeatResponse.Beat beat) {
-        Map<String, BeatApi.BeatResponse.Beat.Tag> tags = new HashMap<>();
-        for(BeatApi.BeatResponse.Beat.Tag tag : beat.getTag()) {
-            if(tag.getSystemType() != null) {
-                tags.put(tag.getSystemType().trim(), tag);
-            }
-        }
-        return tags;
-    }
-
+    return tags;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/data/HeartbeatInfo.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/data/HeartbeatInfo.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/data/HeartbeatInfo.java
index 5915140..eede9f4 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/data/HeartbeatInfo.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/data/HeartbeatInfo.java
@@ -16,122 +16,116 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.sysomos.data;
 
-import org.apache.streams.sysomos.data.SysomosTagDefinition;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
 import org.xml.sax.InputSource;
 
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
 import java.io.ByteArrayInputStream;
 import java.util.ArrayList;
 import java.util.List;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
 
 /**
- * Represents Heatbeat metadata from the Sysomos API
+ * Represents Heatbeat metadata from the Sysomos API.
  */
 public class HeartbeatInfo {
 
-    private Document doc;
-    private List<SysomosTagDefinition> tags;
-
-    public HeartbeatInfo(String xmlString) throws Exception {
-        DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
-        DocumentBuilder docBuilder = dbFactory.newDocumentBuilder();
-        this.doc = docBuilder.parse(new InputSource(new ByteArrayInputStream(xmlString.getBytes("utf-8"))));
-        this.tags = new ArrayList<SysomosTagDefinition>();
-        createTagDefinitions();
-    }
-
-
-    private void createTagDefinitions() {
-        this.tags = new ArrayList<SysomosTagDefinition>();
-        NodeList tagList = this.doc.getElementsByTagName("tag");
-
-        for(int i=0; i < tagList.getLength(); ++i) {
-            Node tag =  tagList.item(i);
-            SysomosTagDefinition tagDefinition = createSysomosTagDefinitionFromNode(tag);
-            if(this.hasTagName(tagDefinition.getTagName())) {
-                SysomosTagDefinition otherTag = this.getTagWithTagName(tagDefinition.getTagName());
-                if(!otherTag.getDisplayName().equals(tagDefinition.getDisplayName())) {
-                    throw new RuntimeException("A single tag ("+otherTag.getTagName()+") has multiple display names ("+otherTag.getDisplayName()+" , "+tagDefinition.getDisplayName()+")");
-                }
-                else {
-                    List<String> otherQueries = otherTag.getQueries();
-                    for(String query : tagDefinition.getQueries()) {
-                        if(!otherQueries.contains(query)) {
-                            otherTag.addQuery(query);
-                        }
-                    }
-                }
-            }
-            else {
-                this.tags.add(tagDefinition);
-            }
-
-        }
-    }
-
-    private SysomosTagDefinition createSysomosTagDefinitionFromNode(Node tag) {
-        Element tagElement = (Element) tag;
-        SysomosTagDefinition tagDefinition = new SysomosTagDefinition(tagElement.getElementsByTagName("name").item(0).getTextContent(),
-                tagElement.getElementsByTagName("displayName").item(0).getTextContent());
-        NodeList taggingRule = tagElement.getElementsByTagName("taggingRule");
-        for(int i=0; i < taggingRule.getLength(); ++i) {
-            Element rule = (Element) taggingRule.item(i);
-            NodeList queries = rule.getElementsByTagName("query");
-            for(int j=0; j < queries.getLength(); ++j) {
-                Element query = (Element) queries.item(j);
-                tagDefinition.addQuery(query.getTextContent());
+  private Document doc;
+  private List<SysomosTagDefinition> tags;
+
+  /**
+   * HeartbeatInfo constructor.
+   * @param xmlString xmlString
+   * @throws Exception Exception
+   */
+  public HeartbeatInfo(String xmlString) throws Exception {
+    DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
+    DocumentBuilder docBuilder = dbFactory.newDocumentBuilder();
+    this.doc = docBuilder.parse(new InputSource(new ByteArrayInputStream(xmlString.getBytes("utf-8"))));
+    this.tags = new ArrayList<SysomosTagDefinition>();
+    createTagDefinitions();
+  }
+
+  private void createTagDefinitions() {
+    this.tags = new ArrayList<SysomosTagDefinition>();
+    NodeList tagList = this.doc.getElementsByTagName("tag");
+
+    for (int i = 0; i < tagList.getLength(); ++i) {
+      Node tag =  tagList.item(i);
+      SysomosTagDefinition tagDefinition = createSysomosTagDefinitionFromNode(tag);
+      if (this.hasTagName(tagDefinition.getTagName())) {
+        SysomosTagDefinition otherTag = this.getTagWithTagName(tagDefinition.getTagName());
+        if (!otherTag.getDisplayName().equals(tagDefinition.getDisplayName())) {
+          throw new RuntimeException("A single tag ("
+              + otherTag.getTagName()
+              + ") has multiple display names ("
+              + otherTag.getDisplayName()
+              + " , "
+              + tagDefinition.getDisplayName()
+              + ")");
+        } else {
+          List<String> otherQueries = otherTag.getQueries();
+          for (String query : tagDefinition.getQueries()) {
+            if (!otherQueries.contains(query)) {
+              otherTag.addQuery(query);
             }
+          }
         }
-        return tagDefinition;
-    }
+      } else {
+        this.tags.add(tagDefinition);
+      }
 
-    public boolean hasTagName(String tagName) {
-        for(SysomosTagDefinition tag : this.tags) {
-            if(tag.hasTagName(tagName)) {
-                return true;
-            }
-        }
-        return false;
     }
-
-    public SysomosTagDefinition getTagWithTagName(String tagName) {
-        for(SysomosTagDefinition tag : this.tags) {
-            if(tag.hasTagName(tagName)) {
-                return tag;
-            }
-        }
-        return null;
+  }
+
+  private SysomosTagDefinition createSysomosTagDefinitionFromNode(Node tag) {
+    Element tagElement = (Element) tag;
+    SysomosTagDefinition tagDefinition = new SysomosTagDefinition(tagElement.getElementsByTagName("name").item(0).getTextContent(),
+        tagElement.getElementsByTagName("displayName").item(0).getTextContent());
+    NodeList taggingRule = tagElement.getElementsByTagName("taggingRule");
+    for (int i = 0; i < taggingRule.getLength(); ++i) {
+      Element rule = (Element) taggingRule.item(i);
+      NodeList queries = rule.getElementsByTagName("query");
+      for (int j = 0; j < queries.getLength(); ++j) {
+        Element query = (Element) queries.item(j);
+        tagDefinition.addQuery(query.getTextContent());
+      }
     }
-
-    public boolean hasTagWithDisplayName(String displayName) {
-        for(SysomosTagDefinition tag : this.tags) {
-            if(tag.hasDisplayName(displayName)) {
-                return true;
-            }
-        }
-        return false;
+    return tagDefinition;
+  }
+
+  /**
+   * hasTagName.
+   * @param tagName tagName
+   * @return hasTagName
+   */
+  public boolean hasTagName(String tagName) {
+    for (SysomosTagDefinition tag : this.tags) {
+      if (tag.hasTagName(tagName)) {
+        return true;
+      }
     }
-
-    public SysomosTagDefinition getTagWithDisplayName(String displayName) {
-        for(SysomosTagDefinition tag : this.tags) {
-            if(tag.hasDisplayName(displayName)) {
-                return tag;
-            }
-        }
-        return null;
-    }
-
-    public List<SysomosTagDefinition> getTagDefinitions() {
-        List<SysomosTagDefinition> result = new ArrayList<SysomosTagDefinition>();
-        result.addAll(this.tags);
-        return result;
+    return false;
+  }
+
+  /**
+   * getTagWithTagName.
+   * @param tagName tagName
+   * @return SysomosTagDefinition
+   */
+  public SysomosTagDefinition getTagWithTagName(String tagName) {
+    for (SysomosTagDefinition tag : this.tags) {
+      if (tag.hasTagName(tagName)) {
+        return tag;
+      }
     }
+    return null;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/data/SysomosTagDefinition.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/data/SysomosTagDefinition.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/data/SysomosTagDefinition.java
index a7a8cd4..889db31 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/data/SysomosTagDefinition.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/data/SysomosTagDefinition.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.sysomos.data;
 
 import java.util.ArrayList;
@@ -26,66 +27,75 @@ import java.util.List;
  */
 public class SysomosTagDefinition {
 
-    private String tagName;
-    private String displayName;
-    private List<String> queries;
+  private String tagName;
+  private String displayName;
+  private List<String> queries;
 
-    public SysomosTagDefinition(String tagName, String displayName) {
-        this.tagName = tagName;
-        this.displayName = displayName;
-        this.queries = new ArrayList<String>();
-    }
+  /**
+   * SysomosTagDefinition constructor.
+   * @param tagName tagName
+   * @param displayName displayName
+   */
+  public SysomosTagDefinition(String tagName, String displayName) {
+    this.tagName = tagName;
+    this.displayName = displayName;
+    this.queries = new ArrayList<String>();
+  }
 
-    public String getTagName() {
-        return this.tagName;
-    }
+  public String getTagName() {
+    return this.tagName;
+  }
 
-    public String getDisplayName() {
-        return this.displayName;
-    }
+  public String getDisplayName() {
+    return this.displayName;
+  }
 
-    public List<String> getQueries() {
-        List<String> result = new ArrayList<String>();
-        result.addAll(this.queries);
-        return result;
-    }
+  /**
+   * getQueries.
+   * @return Queries
+   */
+  public List<String> getQueries() {
+    List<String> result = new ArrayList<String>();
+    result.addAll(this.queries);
+    return result;
+  }
 
-    public void addQuery(String query) {
-        this.queries.add(query);
-    }
+  public void addQuery(String query) {
+    this.queries.add(query);
+  }
 
-    public boolean hasTagName(String tagName) {
-        return this.tagName.equals(tagName);
-    }
+  public boolean hasTagName(String tagName) {
+    return this.tagName.equals(tagName);
+  }
 
-    public boolean hasQuery(String query) {
-        return this.queries.contains(query);
-    }
+  public boolean hasQuery(String query) {
+    return this.queries.contains(query);
+  }
 
-    public boolean hasDisplayName(String displayName) {
-        return this.displayName.equals(displayName);
-    }
+  public boolean hasDisplayName(String displayName) {
+    return this.displayName.equals(displayName);
+  }
 
-    @Override
-    public boolean equals(Object o) {
-        if(!(o instanceof SysomosTagDefinition)) {
-            return false;
-        }
-        SysomosTagDefinition that = (SysomosTagDefinition) o;
-        if(!this.tagName.equals(that.tagName)) {
-            return false;
-        }
-        if(!this.displayName.equals(that.displayName)) {
-            return false;
-        }
-        if(this.queries.size() != that.queries.size()) {
-            return false;
-        }
-        for(int i=0; i < this.queries.size(); ++i) {
-            if(!that.queries.contains(this.queries.get(i))) {
-                return false;
-            }
-        }
-        return true;
+  @Override
+  public boolean equals(Object object) {
+    if (!(object instanceof SysomosTagDefinition)) {
+      return false;
+    }
+    SysomosTagDefinition that = (SysomosTagDefinition) object;
+    if (!this.tagName.equals(that.tagName)) {
+      return false;
+    }
+    if (!this.displayName.equals(that.displayName)) {
+      return false;
+    }
+    if (this.queries.size() != that.queries.size()) {
+      return false;
+    }
+    for (int i = 0; i < this.queries.size(); ++i) {
+      if (!that.queries.contains(this.queries.get(i))) {
+        return false;
+      }
     }
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/processor/SysomosTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/processor/SysomosTypeConverter.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/processor/SysomosTypeConverter.java
index 3ffd0b3..48615eb 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/processor/SysomosTypeConverter.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/processor/SysomosTypeConverter.java
@@ -19,45 +19,46 @@
 
 package org.apache.streams.sysomos.processor;
 
-import com.google.common.collect.Lists;
-import com.sysomos.xml.BeatApi;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.sysomos.conversion.SysomosBeatActivityConverter;
 
+import com.google.common.collect.Lists;
+import com.sysomos.xml.BeatApi;
+
 import java.util.List;
 
 /**
- * Stream processor that converts Sysomos type to Activity
+ * Stream processor that converts Sysomos type to Activity.
  */
 public class SysomosTypeConverter implements StreamsProcessor {
 
-    public final static String STREAMS_ID = "SysomosTypeConverter";
+  public static final String STREAMS_ID = "SysomosTypeConverter";
 
-    private SysomosBeatActivityConverter converter;
+  private SysomosBeatActivityConverter converter;
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-        if(entry.getDocument() instanceof BeatApi.BeatResponse.Beat) {
-            entry.setDocument(converter.convert((BeatApi.BeatResponse.Beat)entry.getDocument()));
-            return Lists.newArrayList(entry);
-        } else {
-            return Lists.newArrayList();
-        }
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
+    if (entry.getDocument() instanceof BeatApi.BeatResponse.Beat) {
+      entry.setDocument(converter.convert((BeatApi.BeatResponse.Beat)entry.getDocument()));
+      return Lists.newArrayList(entry);
+    } else {
+      return Lists.newArrayList();
     }
+  }
 
-    @Override
-    public void prepare(Object configurationObject) {
-        converter = new SysomosBeatActivityConverter();
-    }
+  @Override
+  public void prepare(Object configurationObject) {
+    converter = new SysomosBeatActivityConverter();
+  }
 
-    @Override
-    public void cleanUp() {
-        //NOP
-    }
+  @Override
+  public void cleanUp() {
+    //NOP
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/AbstractRequestBuilder.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/AbstractRequestBuilder.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/AbstractRequestBuilder.java
index 28f33df..de604b4 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/AbstractRequestBuilder.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/AbstractRequestBuilder.java
@@ -19,40 +19,41 @@
 
 package org.apache.streams.sysomos.provider;
 
-import com.sysomos.xml.BeatApi;
-import com.sysomos.xml.ObjectFactory;
 import org.apache.streams.sysomos.SysomosException;
 import org.apache.streams.sysomos.util.SysomosUtils;
+
+import com.sysomos.xml.BeatApi;
+import com.sysomos.xml.ObjectFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.StringReader;
+import java.net.URL;
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBException;
 import javax.xml.bind.Unmarshaller;
-import java.io.StringReader;
-import java.net.URL;
 
 /**
  * Defines a common pattern for requesting data from the Sysomos API.
  */
 public abstract class AbstractRequestBuilder implements RequestBuilder {
-    private final static Logger LOGGER = LoggerFactory.getLogger(AbstractRequestBuilder.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRequestBuilder.class);
 
-    /**
-     * Executes the request to the Sysomos Heartbeat API and returns a valid response
-     */
-    public BeatApi.BeatResponse execute() {
-        URL url = this.getRequestUrl();
-        try {
-            String xmlResponse = SysomosUtils.queryUrl(url);
-            JAXBContext context = JAXBContext.newInstance(ObjectFactory.class);
-            Unmarshaller unmarshaller = context.createUnmarshaller();
-            BeatApi beatApi = (BeatApi) unmarshaller.unmarshal(new StringReader(xmlResponse));
-            return beatApi.getBeatResponse();
-        } catch (JAXBException e) {
-            LOGGER.error("Unable to unmarshal XML content");
-            throw new SysomosException("Unable to unmarshal XML content", e);
-        }
+  /**
+   * Executes the request to the Sysomos Heartbeat API and returns a valid response.
+   */
+  public BeatApi.BeatResponse execute() {
+    URL url = this.getRequestUrl();
+    try {
+      String xmlResponse = SysomosUtils.queryUrl(url);
+      JAXBContext context = JAXBContext.newInstance(ObjectFactory.class);
+      Unmarshaller unmarshaller = context.createUnmarshaller();
+      BeatApi beatApi = (BeatApi) unmarshaller.unmarshal(new StringReader(xmlResponse));
+      return beatApi.getBeatResponse();
+    } catch (JAXBException ex) {
+      LOGGER.error("Unable to unmarshal XML content");
+      throw new SysomosException("Unable to unmarshal XML content", ex);
     }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/ContentRequestBuilder.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/ContentRequestBuilder.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/ContentRequestBuilder.java
index 178014a..7ae47cf 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/ContentRequestBuilder.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/ContentRequestBuilder.java
@@ -20,6 +20,7 @@
 package org.apache.streams.sysomos.provider;
 
 import org.apache.streams.sysomos.SysomosException;
+
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,7 +28,7 @@ import org.slf4j.LoggerFactory;
 import java.net.MalformedURLException;
 import java.net.URL;
 
-import static org.apache.streams.sysomos.util.SysomosUtils.*;
+import static org.apache.streams.sysomos.util.SysomosUtils.SYSOMOS_DATE_FORMATTER;
 
 /**
  * Builds requests for the Sysomos Heartbeat Content API.  This is the preferred method of
@@ -35,102 +36,102 @@ import static org.apache.streams.sysomos.util.SysomosUtils.*;
  */
 public class ContentRequestBuilder extends AbstractRequestBuilder implements RequestBuilder {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(ContentRequestBuilder.class);
-
-    private String baseUrl;
-    private String hid;
-    private String addedAfter;
-    private String addedBefore;
-    private String size;
-    private String offset;
-    private String apiKey;
-
-    /**
-     * The max number of items you are allowed to get per request.
-     */
-    public static final int MAX_ALLOWED_PER_REQUEST = 10000;
-
-    /**
-     * Constructs a new ContentRequestBuilder for the specified API key and Sysomos URL
-     * @param baseUrl the base URL for the sysomos API
-     * @param apiKey the API key generated by Sysomos for authorization
-     */
-    protected ContentRequestBuilder(String baseUrl, String apiKey) {
-        this.baseUrl = baseUrl;
-        this.apiKey = apiKey;
-    }
-
-    /**
-     * Gets the Request URL based on the local fields
-     * @return a valid URL for the Sysomos API or an exception
-     */
-    @Override
-    public URL getRequestUrl()  {
-        StringBuilder url = new StringBuilder();
-        url.append(this.baseUrl);
-        url.append("v1/heartbeat/content?");
-        url.append("apiKey=");
-        url.append(this.apiKey);
-        url.append("&hid=");
-        url.append(this.hid);
-        if (size != null) {
-            url.append("&size=");
-            url.append(this.size);
-        }
-        if (this.addedAfter != null) {
-            url.append("&addedAfter=");
-            url.append(this.addedAfter);
-        }
-        if (this.addedBefore != null) {
-            url.append("&addedBefore=");
-            url.append(this.addedBefore);
-        }
-        if (this.offset != null) {
-            url.append("&offset=");
-            url.append(this.offset);
-        }
-        String urlString = url.toString();
-        LOGGER.debug("Constructed Request URL: {}", urlString);
-        try {
-            return new URL(urlString);
-        } catch (MalformedURLException e) {
-            throw new SysomosException("Malformed Request URL.  Check Request Builder parameters", e);
-        }
+  private static final Logger LOGGER = LoggerFactory.getLogger(ContentRequestBuilder.class);
+
+  private String baseUrl;
+  private String hid;
+  private String addedAfter;
+  private String addedBefore;
+  private String size;
+  private String offset;
+  private String apiKey;
+
+  /**
+   * The max number of items you are allowed to get per request.
+   */
+  public static final int MAX_ALLOWED_PER_REQUEST = 10000;
+
+  /**
+   * Constructs a new ContentRequestBuilder for the specified API key and Sysomos URL.
+   * @param baseUrl the base URL for the sysomos API
+   * @param apiKey the API key generated by Sysomos for authorization
+   */
+  protected ContentRequestBuilder(String baseUrl, String apiKey) {
+    this.baseUrl = baseUrl;
+    this.apiKey = apiKey;
+  }
+
+  /**
+   * Gets the Request URL based on the local fields.
+   * @return a valid URL for the Sysomos API or an exception
+   */
+  @Override
+  public URL getRequestUrl()  {
+    StringBuilder url = new StringBuilder();
+    url.append(this.baseUrl);
+    url.append("v1/heartbeat/content?");
+    url.append("apiKey=");
+    url.append(this.apiKey);
+    url.append("&hid=");
+    url.append(this.hid);
+    if (size != null) {
+      url.append("&size=");
+      url.append(this.size);
     }
-
-    @Override
-    public RequestBuilder setHeartBeatId(int hid) {
-        return setHeartBeatId(Integer.toString(hid));
+    if (this.addedAfter != null) {
+      url.append("&addedAfter=");
+      url.append(this.addedAfter);
     }
-
-    @Override
-    public RequestBuilder setHeartBeatId(String hid) {
-        this.hid = hid;
-        return this;
+    if (this.addedBefore != null) {
+      url.append("&addedBefore=");
+      url.append(this.addedBefore);
     }
-
-    @Override
-    public RequestBuilder setAddedAfterDate(DateTime afterDate) {
-        this.addedAfter = SYSOMOS_DATE_FORMATTER.print(afterDate);
-        return this;
+    if (this.offset != null) {
+      url.append("&offset=");
+      url.append(this.offset);
     }
-
-    @Override
-    public RequestBuilder setAddedBeforeDate(DateTime beforeDate) {
-        this.addedBefore = SYSOMOS_DATE_FORMATTER.print(beforeDate);
-        return this;
-    }
-
-    @Override
-    public RequestBuilder setReturnSetSize(long size) {
-        this.size = Long.toString(Math.min(size, MAX_ALLOWED_PER_REQUEST));
-        return this;
-    }
-
-    @Override
-    public RequestBuilder setOffset(long offset) {
-        this.offset = Long.toString(offset);
-        return this;
+    String urlString = url.toString();
+    LOGGER.debug("Constructed Request URL: {}", urlString);
+    try {
+      return new URL(urlString);
+    } catch (MalformedURLException ex) {
+      throw new SysomosException("Malformed Request URL.  Check Request Builder parameters", ex);
     }
+  }
+
+  @Override
+  public RequestBuilder setHeartBeatId(int hid) {
+    return setHeartBeatId(Integer.toString(hid));
+  }
+
+  @Override
+  public RequestBuilder setHeartBeatId(String hid) {
+    this.hid = hid;
+    return this;
+  }
+
+  @Override
+  public RequestBuilder setAddedAfterDate(DateTime afterDate) {
+    this.addedAfter = SYSOMOS_DATE_FORMATTER.print(afterDate);
+    return this;
+  }
+
+  @Override
+  public RequestBuilder setAddedBeforeDate(DateTime beforeDate) {
+    this.addedBefore = SYSOMOS_DATE_FORMATTER.print(beforeDate);
+    return this;
+  }
+
+  @Override
+  public RequestBuilder setReturnSetSize(long size) {
+    this.size = Long.toString(Math.min(size, MAX_ALLOWED_PER_REQUEST));
+    return this;
+  }
+
+  @Override
+  public RequestBuilder setOffset(long offset) {
+    this.offset = Long.toString(offset);
+    return this;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/RequestBuilder.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/RequestBuilder.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/RequestBuilder.java
index 0e12025..53887af 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/RequestBuilder.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/RequestBuilder.java
@@ -25,72 +25,73 @@ import org.joda.time.DateTime;
 import java.net.URL;
 
 /**
- * Simplifying abstraction that aids in building a request to the Sysomos API in a chained fashion
+ * Simplifying abstraction that aids in building a request to the Sysomos API in a chained fashion.
  */
 public interface RequestBuilder {
-    /**
-     * Sets the date after which documents should be returned from Sysomos
-     * @param afterDate the {@link org.joda.time.DateTime} instance representing the after date
-     *
-     * @return The RequestBuilder for continued Chaining
-     */
-    RequestBuilder setAddedAfterDate(DateTime afterDate);
 
-    /**
-     * Sets the date before which documents should be returned from Sysomos
-     * @param beforeDate the {@link org.joda.time.DateTime} instance representing the before date
-     *
-     * @return The RequestBuilder for continued Chaining
-     */
-    RequestBuilder setAddedBeforeDate(DateTime beforeDate);
+  /**
+   * Sets the date after which documents should be returned from Sysomos.
+   * @param afterDate the {@link org.joda.time.DateTime} instance representing the after date
+   *
+   * @return The RequestBuilder for continued Chaining
+   */
+  RequestBuilder setAddedAfterDate(DateTime afterDate);
 
-    /**
-     * Sets the size of the expected response
-     * @param size the number of documents
-     *
-     * @return The RequestBuilder for continued Chaining
-     */
-    RequestBuilder setReturnSetSize(long size);
+  /**
+   * Sets the date before which documents should be returned from Sysomos.
+   * @param beforeDate the {@link org.joda.time.DateTime} instance representing the before date
+   *
+   * @return The RequestBuilder for continued Chaining
+   */
+  RequestBuilder setAddedBeforeDate(DateTime beforeDate);
 
-    /**
-     * Sets the starting offset for the number of documents given the other parameters
-     * @param offset the starting offset
-     *
-     * @return The RequestBuilder for continued Chaining
-     */
-    RequestBuilder setOffset(long offset);
+  /**
+   * Sets the size of the expected response.
+   * @param size the number of documents
+   *
+   * @return The RequestBuilder for continued Chaining
+   */
+  RequestBuilder setReturnSetSize(long size);
 
-    /**
-     * Sets the Sysomos Heartbeat ID
-     * @param hid Heartbeat ID
-     *
-     * @return The RequestBuilder for continued Chaining
-     */
-    RequestBuilder setHeartBeatId(int hid);
+  /**
+   * Sets the starting offset for the number of documents given the other parameters.
+   * @param offset the starting offset
+   *
+   * @return The RequestBuilder for continued Chaining
+   */
+  RequestBuilder setOffset(long offset);
 
-    /**
-     *
-     * Sets the Sysomos Heartbeat ID as a String
-     * @param hid Heartbeat ID string
-     *
-     * @return The RequestBuilder for continued Chaining
-     */
-    RequestBuilder setHeartBeatId(String hid);
+  /**
+   * Sets the Sysomos Heartbeat ID.
+   * @param hid Heartbeat ID
+   *
+   * @return The RequestBuilder for continued Chaining
+   */
+  RequestBuilder setHeartBeatId(int hid);
 
-    /**
-     * Returns the full url need to execute a request.
-     *
-     * Example:
-     * http://api.sysomos.com/dev/v1/heartbeat/content?apiKey=YOUR
-     * -APIKEY&hid=YOUR-HEARTBEAT-ID&offset=0&size=10&
-     * addedAfter=2010-10-15T13:00:00Z&addedBefore=2010-10-18T13:00:00Z
-     *
-     * @return the URL to use when making requests of Sysomos Heartbeat
-     */
-    URL getRequestUrl();
+  /**
+   * Sets the Sysomos Heartbeat ID as a String.
+   * @param hid Heartbeat ID string
+   *
+   * @return The RequestBuilder for continued Chaining
+   */
+  RequestBuilder setHeartBeatId(String hid);
 
-    /**
-     * Executes the request to the Sysomos Heartbeat API and returns a valid response
-     */
-    BeatApi.BeatResponse execute();
+  /**
+   * Returns the full url need to execute a request.
+   *
+   * <p/>
+   * Example:
+   * http://api.sysomos.com/dev/v1/heartbeat/content?apiKey=YOUR
+   * -APIKEY&hid=YOUR-HEARTBEAT-ID&offset=0&size=10&
+   * addedAfter=2010-10-15T13:00:00Z&addedBefore=2010-10-18T13:00:00Z
+   *
+   * @return the URL to use when making requests of Sysomos Heartbeat
+   */
+  URL getRequestUrl();
+
+  /**
+   * Executes the request to the Sysomos Heartbeat API and returns a valid response
+   */
+  BeatApi.BeatResponse execute();
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java
index 488b6c7..6b59d1e 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java
@@ -30,26 +30,19 @@ import java.net.URL;
  */
 public class SysomosClient {
 
-    public static final String BASE_URL_STRING = "http://api.sysomos.com/";
-    private static final String HEARTBEAT_INFO_URL = "http://api.sysomos.com/v1/heartbeat/info?apiKey={api_key}&hid={hid}";
+  public static final String BASE_URL_STRING = "http://api.sysomos.com/";
+  private static final String HEARTBEAT_INFO_URL = "http://api.sysomos.com/v1/heartbeat/info?apiKey={api_key}&hid={hid}";
 
-    private String apiKey;
+  private String apiKey;
 
-    private HttpURLConnection client;
+  private HttpURLConnection client;
 
-    public SysomosClient(String apiKey) {
-        this.apiKey = apiKey;
-    }
+  public SysomosClient(String apiKey) {
+    this.apiKey = apiKey;
+  }
 
-    public HeartbeatInfo getHeartbeatInfo(String hid) throws Exception {
-        String urlString = HEARTBEAT_INFO_URL.replace("{api_key}", this.apiKey);
-        urlString = urlString.replace("{hid}", hid);
-        String xmlResponse = SysomosUtils.queryUrl(new URL(urlString));
-        return new HeartbeatInfo(xmlResponse);
-    }
-
-    public RequestBuilder createRequestBuilder() {
-        return new ContentRequestBuilder(BASE_URL_STRING, this.apiKey);
-    }
+  public RequestBuilder createRequestBuilder() {
+    return new ContentRequestBuilder(BASE_URL_STRING, this.apiKey);
+  }
 
 }