You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sm...@apache.org on 2016/11/26 18:08:04 UTC

[3/7] incubator-streams git commit: STREAMS-441: Remove compile dependency on guava for core packages, this closes apache/incubator-streams#320

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
index 9de1863..81b4f79 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
@@ -25,7 +25,6 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.rss.FeedDetails;
 import org.apache.streams.rss.RssStreamConfiguration;
 import org.apache.streams.rss.provider.perpetual.RssFeedScheduler;
 import org.apache.streams.util.ComponentUtils;
@@ -47,11 +46,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
 import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
 import java.util.Queue;
-import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
@@ -65,7 +60,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public class RssStreamProvider implements StreamsProvider {
 
-  public static final String STREAMS_ID = "RssStreamProvider";
+  private static final String STREAMS_ID = "RssStreamProvider";
 
   private static final Logger LOGGER = LoggerFactory.getLogger(RssStreamProvider.class);
 
@@ -142,7 +137,7 @@ public class RssStreamProvider implements StreamsProvider {
 
   @Override
   public void prepare(Object configurationObject) {
-    this.executor = new ThreadPoolExecutor(1, 4, 15L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+    this.executor = new ThreadPoolExecutor(1, 4, 15L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
     this.dataQueue = new LinkedBlockingQueue<>();
     this.scheduler = getScheduler(this.dataQueue);
     this.isComplete = new AtomicBoolean(false);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java
index e4bfd35..975749f 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java
@@ -22,10 +22,10 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.rss.FeedDetails;
 import org.apache.streams.rss.provider.RssStreamProviderTask;
 
-import com.google.common.collect.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
@@ -64,7 +64,7 @@ public class RssFeedScheduler implements Runnable {
     this.feedDetailsList = feedDetailsList;
     this.peroid = peroid;
     this.keepRunning = new AtomicBoolean(true);
-    this.lastScheduled = Maps.newHashMap();
+    this.lastScheduled = new HashMap<>();
     this.dataQueue = dataQueue;
     this.complete = new AtomicBoolean(false);
   }
@@ -116,7 +116,7 @@ public class RssFeedScheduler implements Runnable {
       }
       if (currentTime - lastTime > pollInterval) {
         this.service.execute(new RssStreamProviderTask(this.dataQueue, detail.getUrl()));
-        this.LOGGER.trace("Scheduled data collection on rss feed, {}", detail.getUrl());
+        LOGGER.trace("Scheduled data collection on rss feed, {}", detail.getUrl());
         this.lastScheduled.put(detail.getUrl(), currentTime);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
index 1e3aedd..45d5d96 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
@@ -30,7 +30,6 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.slf4j.Logger;
@@ -38,6 +37,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Objects;
 
 public class SyndEntryActivitySerializer implements ActivitySerializer<ObjectNode> {
 
@@ -84,7 +84,7 @@ public class SyndEntryActivitySerializer implements ActivitySerializer<ObjectNod
    * @return Activity
    */
   public Activity deserializeWithRomeExtension(ObjectNode entry, boolean withExtension) {
-    Preconditions.checkNotNull(entry);
+    Objects.requireNonNull(entry);
 
     Activity activity = new Activity();
     Provider provider = buildProvider(entry);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java
index 6868bfc..63920e7 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java
@@ -21,7 +21,6 @@ package org.apache.streams.rss.serializer;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
 import com.sun.syndication.feed.module.Module;
 import com.sun.syndication.feed.rss.Category;
 import com.sun.syndication.feed.rss.Content;
@@ -37,7 +36,6 @@ import org.joda.time.format.ISODateTimeFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
 import java.util.Date;
 import java.util.List;
 
@@ -250,9 +248,7 @@ public class SyndEntrySerializer {
   }
 
   private void serializeLinks(ObjectNode root, JsonNodeFactory factory, List links) {
-    if (links == null || links.size() == 0) {
-      return;
-    } else if (links.get(0) instanceof String) {
+    if (links.get(0) instanceof String) {
       serializeListOfStrings(links, "links", root, factory);
     } else if (links.get(0) instanceof SyndLinkImpl) {
       ArrayNode linksArray = factory.arrayNode();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTest.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTest.java
index 08a58d3..102c3db 100644
--- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTest.java
+++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTest.java
@@ -24,13 +24,13 @@ import org.apache.streams.rss.RssStreamConfiguration;
 import org.apache.streams.rss.provider.perpetual.RssFeedScheduler;
 
 import com.carrotsearch.randomizedtesting.RandomizedTest;
-import com.google.common.collect.Queues;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * Unit tests for {@link org.apache.streams.rss.provider.RssStreamProvider}
@@ -44,7 +44,7 @@ public class RssStreamProviderTest extends RandomizedTest {
     RssStreamProvider provider = null;
     try {
       final CountDownLatch latch = new CountDownLatch(1);
-      BlockingQueue<StreamsDatum> datums = Queues.newLinkedBlockingQueue();
+      BlockingQueue<StreamsDatum> datums = new LinkedBlockingQueue<>();
       provider = new RssStreamProvider(new RssStreamConfiguration()) {
         @Override
         protected RssFeedScheduler getScheduler(BlockingQueue<StreamsDatum> queue) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java
index 830f0e7..779c2ab 100644
--- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java
+++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java
@@ -18,16 +18,13 @@
 
 package org.apache.streams.rss.provider.perpetual;
 
-import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.rss.FeedDetails;
 import org.apache.streams.rss.provider.RssStreamProviderTask;
 
-import com.google.common.collect.Lists;
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -51,15 +48,12 @@ public class RssFeedSchedulerTest {
   public void testScheduleFeeds() {
     ExecutorService mockService = mock(ExecutorService.class);
     final List<String> queuedTasks = new ArrayList<>(5);
-    doAnswer(new Answer() {
-      @Override
-      public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-        queuedTasks.add(((RssStreamProviderTask) invocationOnMock.getArguments()[0]).getRssFeed());
-        return null;
-      }
+    doAnswer(invocationOnMock -> {
+      queuedTasks.add(((RssStreamProviderTask) invocationOnMock.getArguments()[0]).getRssFeed());
+      return null;
     }).when(mockService).execute(any(Runnable.class));
 
-    RssFeedScheduler scheduler = new RssFeedScheduler(mockService, createFeedList(), new LinkedBlockingQueue<StreamsDatum>(), 1);
+    RssFeedScheduler scheduler = new RssFeedScheduler(mockService, createFeedList(), new LinkedBlockingQueue<>(), 1);
     scheduler.scheduleFeeds();
     assertEquals("Expected 2 Feeds to be scheduled", 2, queuedTasks.size());
     assertEquals("Expected Feed 1 to be queued first",  "1", queuedTasks.get(0));
@@ -78,7 +72,7 @@ public class RssFeedSchedulerTest {
   }
 
   private List<FeedDetails> createFeedList() {
-    List<FeedDetails> list = Lists.newLinkedList();
+    List<FeedDetails> list = new LinkedList<>();
     FeedDetails fd = new FeedDetails();
     fd.setPollIntervalMillis(1L);
     fd.setUrl("1");

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java
index ccd8b74..c3f10f9 100644
--- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java
+++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java
@@ -26,7 +26,6 @@ import org.apache.streams.rss.provider.RssStreamProvider;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
 import org.junit.Assert;
 import org.junit.Test;
@@ -43,13 +42,11 @@ import java.io.InputStreamReader;
 import java.io.LineNumberReader;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
+import java.util.ArrayList;
 import java.util.List;
 
 import static org.hamcrest.number.OrderingComparison.greaterThan;
 
-/**
- * Created by sblackmon on 2/5/14.
- */
 public class RssStreamProviderIT {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(RssStreamProviderIT.class);
@@ -67,7 +64,7 @@ public class RssStreamProviderIT {
     BufferedReader br = new BufferedReader(isr);
 
     RssStreamConfiguration configuration = new RssStreamConfiguration();
-    List<FeedDetails> feedArray = Lists.newArrayList();
+    List<FeedDetails> feedArray = new ArrayList<>();
     try {
       while (br.ready()) {
         String line = br.readLine();
@@ -77,7 +74,6 @@ public class RssStreamProviderIT {
       }
       configuration.setFeeds(feedArray);
     } catch ( Exception ex ) {
-      System.out.println(ex);
       ex.printStackTrace();
       Assert.fail();
     }
@@ -101,7 +97,7 @@ public class RssStreamProviderIT {
     assert (config.canRead());
     assert (config.isFile());
 
-    RssStreamProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2]));
+    RssStreamProvider.main(new String[]{configfile, outfile});
 
     File out = new File(outfile);
     assert (out.exists());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerializerIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerializerIT.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerializerIT.java
index 01f1999..37ff7cf 100644
--- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerializerIT.java
+++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerializerIT.java
@@ -27,7 +27,6 @@ import org.apache.streams.rss.serializer.SyndEntryActivitySerializer;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.junit.Test;
@@ -35,7 +34,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.URL;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Objects;
 import java.util.Scanner;
 
 import static org.junit.Assert.assertEquals;
@@ -54,8 +55,8 @@ public class SyndEntryActivitySerializerIT {
   @Test
   public void testJsonData() throws Exception {
     Scanner scanner = new Scanner(this.getClass().getResourceAsStream("/TestSyndEntryJson.txt"));
-    List<Activity> activities = Lists.newLinkedList();
-    List<ObjectNode> objects = Lists.newLinkedList();
+    List<Activity> activities = new LinkedList<>();
+    List<ObjectNode> objects = new LinkedList<>();
 
     SyndEntryActivitySerializer serializer = new SyndEntryActivitySerializer();
 
@@ -125,7 +126,8 @@ public class SyndEntryActivitySerializerIT {
   }
 
   public void testUrl(String expectedUri, String expectedLink, Activity activity) {
-    assertTrue((expectedUri == activity.getUrl() || expectedLink == activity.getUrl()));
-    assertTrue((expectedUri == activity.getObject().getUrl() || expectedLink == activity.getObject().getUrl()));
+    assertTrue((Objects.equals(expectedUri, activity.getUrl()) || Objects.equals(expectedLink, activity.getUrl())));
+    assertTrue((Objects.equals(expectedUri, activity.getObject().getUrl()) ||
+      Objects.equals(expectedLink, activity.getObject().getUrl())));
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java
index 6b59d1e..95d8ad9 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java
@@ -19,11 +19,7 @@
 
 package org.apache.streams.sysomos.provider;
 
-import org.apache.streams.sysomos.data.HeartbeatInfo;
-import org.apache.streams.sysomos.util.SysomosUtils;
-
 import java.net.HttpURLConnection;
-import java.net.URL;
 
 /**
  * Wrapper for the Sysomos API.

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java
index f555e8d..915a8cf 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java
@@ -30,12 +30,12 @@ import org.apache.streams.twitter.pojo.UserstreamEvent;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 
 import static org.apache.streams.twitter.converter.TwitterDateTimeFormat.TWITTER_FORMAT;
 
@@ -47,9 +47,9 @@ public class TwitterDocumentClassifier implements DocumentClassifier {
   @Override
   public List<Class> detectClasses(Object document) {
 
-    Preconditions.checkNotNull(document);
+    Objects.requireNonNull(document);
 
-    ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(TWITTER_FORMAT));
+    ObjectMapper mapper = StreamsJacksonMapper.getInstance(Collections.singletonList(TWITTER_FORMAT));
 
     ObjectNode objectNode;
     try {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java
index b8ce79b..9ca354f 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java
@@ -23,16 +23,17 @@ import org.apache.streams.exceptions.ActivityConversionException;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.twitter.pojo.User;
 
-import com.google.common.collect.Lists;
 import org.apache.commons.lang.NotImplementedException;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.updateActivity;
 
 public class TwitterJsonUserActivityConverter implements ActivityConverter<User> {
 
-  public static Class requiredClass = User.class;
+  private static Class requiredClass = User.class;
 
   @Override
   public Class requiredClass() {
@@ -67,12 +68,12 @@ public class TwitterJsonUserActivityConverter implements ActivityConverter<User>
     Activity activity = new Activity();
     updateActivity(user, activity);
 
-    return Lists.newArrayList(activity);
+    return Collections.singletonList(activity);
   }
 
   @Override
   public List<Activity> toActivityList(List<User> serializedList) {
-    List<Activity> result = Lists.newArrayList();
+    List<Activity> result = new ArrayList<>();
     for ( User item : serializedList ) {
       try {
         List<Activity> activities = toActivityList(item);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java
index e0e2e80..3344d05 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java
@@ -32,15 +32,14 @@ import org.apache.streams.twitter.pojo.Entities;
 import org.apache.streams.twitter.pojo.Hashtag;
 import org.apache.streams.twitter.pojo.Place;
 import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.TargetObject;
 import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.twitter.pojo.User;
 import org.apache.streams.twitter.pojo.UserMentions;
-import org.apache.streams.twitter.provider.TwitterErrorHandler;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import org.slf4j.Logger;
@@ -50,6 +49,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static com.google.common.math.DoubleMath.mean;
 
@@ -71,13 +71,11 @@ public class TwitterActivityUtil {
   public static void updateActivity(Tweet tweet, Activity activity) throws ActivityConversionException {
     activity.setActor(buildActor(tweet));
     activity.setId(formatId(activity.getVerb(),
-        Optional.fromNullable(
-            tweet.getIdStr())
-            .or(Optional.of(tweet.getId().toString()))
-            .orNull()));
+        Optional.ofNullable(Optional.ofNullable(tweet.getIdStr())
+            .orElseGet(Optional.of(tweet.getId().toString())::get)).orElse(null)));
 
     if (tweet instanceof Retweet) {
-      updateActivityContent(activity,  ((Retweet) tweet).getRetweetedStatus(), "share");
+      updateActivityContent(activity,  (tweet).getRetweetedStatus(), "share");
     } else {
       updateActivityContent(activity, tweet, "post");
     }
@@ -127,7 +125,7 @@ public class TwitterActivityUtil {
   }
 
   /**
-   * Builds the activity {@link org.apache.streams.pojo.json.ActivityObject} actor from the tweet
+   * Builds the activity {@link ActivityObject} actor from the tweet
    * @param tweet the object to use as the source
    * @return a valid Actor populated from the Tweet
    */
@@ -139,17 +137,15 @@ public class TwitterActivityUtil {
   }
 
   /**
-   * Builds the activity {@link org.apache.streams.pojo.json.ActivityObject} actor from the User
+   * Builds the activity {@link ActivityObject} actor from the User
    * @param user the object to use as the source
    * @return a valid Actor populated from the Tweet
    */
   public static ActivityObject buildActor(User user) {
     ActivityObject actor = new ActivityObject();
     actor.setId(formatId(
-        Optional.fromNullable(
-            user.getIdStr())
-            .or(Optional.of(user.getId().toString()))
-            .orNull()
+        Optional.ofNullable(Optional.ofNullable(user.getIdStr())
+            .orElseGet(Optional.of(user.getId().toString())::get)).orElse(null)
     ));
     actor.setObjectType("page");
     actor.setDisplayName(user.getName());
@@ -189,16 +185,14 @@ public class TwitterActivityUtil {
   }
 
   /**
-   * Creates an {@link org.apache.streams.pojo.json.ActivityObject} for the tweet
+   * Creates an {@link ActivityObject} for the tweet
    * @param tweet the object to use as the source
    * @return a valid ActivityObject
    */
   public static ActivityObject buildActivityObject(Tweet tweet) {
     ActivityObject actObj = new ActivityObject();
-    String id =  Optional.fromNullable(
-        tweet.getIdStr())
-        .or(Optional.of(tweet.getId().toString()))
-        .orNull();
+    String id = Optional.ofNullable(Optional.ofNullable(tweet.getIdStr())
+        .orElseGet(Optional.of(tweet.getId().toString())::get)).orElse(null);
     if ( id != null ) {
       actObj.setId(id);
     }
@@ -261,7 +255,7 @@ public class TwitterActivityUtil {
   }
 
   /**
-   * Builds the {@link org.apache.streams.twitter.pojo.TargetObject} from the tweet.
+   * Builds the {@link TargetObject} from the tweet.
    * @param tweet the object to use as the source
    * @return currently returns null for all activities
    */
@@ -278,17 +272,15 @@ public class TwitterActivityUtil {
     Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity);
     Map<String, Object> location = new HashMap<>();
     location.put("id", formatId(
-        Optional.fromNullable(
-            tweet.getIdStr())
-            .or(Optional.of(tweet.getId().toString()))
-            .orNull()
+        Optional.ofNullable(Optional.ofNullable(tweet.getIdStr())
+            .orElseGet(Optional.of(tweet.getId().toString())::get)).orElse(null)
     ));
     location.put("coordinates", boundingBoxCenter(tweet.getPlace()));
     extensions.put("location", location);
   }
 
   /**
-   * Gets the common twitter {@link org.apache.streams.pojo.json.Provider} object
+   * Gets the common twitter {@link Provider} object
    * @return a provider object representing Twitter
    */
   public static Provider getProvider() {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
index ee800fa..764b3ee 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
@@ -33,6 +33,8 @@ import twitter4j.Twitter;
 import twitter4j.TwitterException;
 import twitter4j.TwitterObjectFactory;
 
+import java.util.Objects;
+
 /**
  *  Retrieve friend or follower connections for a single user id.
  */
@@ -43,11 +45,11 @@ public class TwitterFollowingProviderTask implements Runnable {
   private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
   protected TwitterFollowingProvider provider;
-  protected Twitter client;
+  private Twitter client;
   protected Long id;
-  protected String screenName;
+  private String screenName;
 
-  int count = 0;
+  private int count = 0;
 
   /**
    * TwitterFollowingProviderTask constructor.
@@ -81,7 +83,7 @@ public class TwitterFollowingProviderTask implements Runnable {
 
     if ( id != null ) {
       getFollowing(id);
-    } else if ( screenName != null) {
+    } else {
       getFollowing(screenName);
     }
 
@@ -89,7 +91,7 @@ public class TwitterFollowingProviderTask implements Runnable {
 
   }
 
-  protected void getFollowing(Long id) {
+  private void getFollowing(Long id) {
 
     Preconditions.checkArgument(
         provider.getConfig().getEndpoint().equals("friends")
@@ -103,7 +105,7 @@ public class TwitterFollowingProviderTask implements Runnable {
     }
   }
 
-  protected void getFollowing(String screenName) {
+  private void getFollowing(String screenName) {
 
     twitter4j.User user = null;
     try {
@@ -111,7 +113,7 @@ public class TwitterFollowingProviderTask implements Runnable {
     } catch (TwitterException ex) {
       LOGGER.error("Failure looking up " + id);
     }
-    Preconditions.checkNotNull(user);
+    Objects.requireNonNull(user);
     getFollowing(user.getId());
   }
 
@@ -134,12 +136,12 @@ public class TwitterFollowingProviderTask implements Runnable {
 
         PagableResponseList<twitter4j.User> list = null;
         if ( provider.getConfig().getEndpoint().equals("followers") ) {
-          list = client.friendsFollowers().getFollowersList(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
+          list = client.friendsFollowers().getFollowersList(id, curser, provider.getConfig().getMaxItems().intValue());
         } else if ( provider.getConfig().getEndpoint().equals("friends") ) {
-          list = client.friendsFollowers().getFriendsList(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
+          list = client.friendsFollowers().getFriendsList(id, curser, provider.getConfig().getMaxItems().intValue());
         }
 
-        Preconditions.checkNotNull(list);
+        Objects.requireNonNull(list);
         Preconditions.checkArgument(list.size() > 0);
 
         for (twitter4j.User other : list) {
@@ -176,8 +178,6 @@ public class TwitterFollowingProviderTask implements Runnable {
           break;
         }
         curser = list.getNextCursor();
-      } catch (TwitterException twitterException) {
-        keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
       } catch (Exception ex) {
         keepTrying += TwitterErrorHandler.handleTwitterError(client, ex);
       }
@@ -194,12 +194,12 @@ public class TwitterFollowingProviderTask implements Runnable {
       try {
         twitter4j.IDs ids = null;
         if ( provider.getConfig().getEndpoint().equals("followers") ) {
-          ids = client.friendsFollowers().getFollowersIDs(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
+          ids = client.friendsFollowers().getFollowersIDs(id, curser, provider.getConfig().getMaxItems().intValue());
         } else if ( provider.getConfig().getEndpoint().equals("friends") ) {
-          ids = client.friendsFollowers().getFriendsIDs(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
+          ids = client.friendsFollowers().getFriendsIDs(id, curser, provider.getConfig().getMaxItems().intValue());
         }
 
-        Preconditions.checkNotNull(ids);
+        Objects.requireNonNull(ids);
         Preconditions.checkArgument(ids.getIDs().length > 0);
 
         for (long otherId : ids.getIDs()) {
@@ -216,7 +216,7 @@ public class TwitterFollowingProviderTask implements Runnable {
                   .withFollower(new User().withId(id));
             }
 
-            Preconditions.checkNotNull(follow);
+            Objects.requireNonNull(follow);
 
             if ( count < provider.getConfig().getMaxItems()) {
               ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamHelper.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamHelper.java
index a4562ef..2be77ac 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamHelper.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamHelper.java
@@ -24,7 +24,6 @@ import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
 import org.apache.streams.util.ComponentUtils;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
 import com.twitter.hbc.core.processor.StringDelimitedProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -100,7 +99,9 @@ public class TwitterStreamHelper extends StringDelimitedProcessor {
         Class itemClass = TWITTER_DOCUMENT_CLASSIFIER.detectClasses(item).get(0);
         Object document = mapper.readValue(item, itemClass);
         StreamsDatum rawDatum = new StreamsDatum(document);
-        return Lists.newArrayList(rawDatum);
+        List<StreamsDatum> streamsDatumList = new ArrayList<>();
+        streamsDatumList.add(rawDatum);
+        return streamsDatumList;
       }
       return new ArrayList<>();
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index 1895ee2..a37ec4d 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -35,8 +35,6 @@ import org.apache.streams.util.ComponentUtils;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Queues;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.twitter.hbc.ClientBuilder;
 import com.twitter.hbc.core.Constants;
@@ -67,25 +65,24 @@ import java.io.PrintStream;
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.Queue;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
- * TwitterStreamProvider wraps a hosebird client and passes recieved documents
+ * TwitterStreamProvider wraps a hosebird client and passes received documents
  * to subscribing components.
  */
 public class TwitterStreamProvider implements StreamsProvider, Serializable, DatumStatusCountable {
 
-  public static final String STREAMS_ID = "TwitterStreamProvider";
+  private static final String STREAMS_ID = "TwitterStreamProvider";
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProvider.class);
 
@@ -127,9 +124,9 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
     TwitterStreamConfiguration config = new ComponentConfigurator<>(TwitterStreamConfiguration.class).detectConfiguration(typesafe, "twitter");
     TwitterStreamProvider provider = new TwitterStreamProvider(config);
 
-    ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+    ObjectMapper mapper = StreamsJacksonMapper.getInstance(Collections.singletonList(TwitterDateTimeFormat.TWITTER_FORMAT));
 
-    PrintStream outStream = null;
+    PrintStream outStream;
     try {
       outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
     } catch (FileNotFoundException ex) {
@@ -140,9 +137,7 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
     provider.startStream();
     do {
       Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
-      Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
-      while (iterator.hasNext()) {
-        StreamsDatum datum = iterator.next();
+      for (StreamsDatum datum : provider.readCurrent()) {
         String json;
         try {
           json = mapper.writeValueAsString(datum.getDocument());
@@ -157,7 +152,7 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
     outStream.flush();
   }
 
-  public static final int MAX_BATCH = 1000;
+  private static final int MAX_BATCH = 1000;
 
   private TwitterStreamConfiguration config;
 
@@ -169,13 +164,12 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
     this.config = config;
   }
 
-  protected volatile Queue<Future<List<StreamsDatum>>> providerQueue;
+  private volatile Queue<Future<List<StreamsDatum>>> providerQueue;
 
-  protected Hosts hosebirdHosts;
-  protected Authentication auth;
+  private Authentication auth;
   protected StreamingEndpoint endpoint;
-  protected BasicClient client;
-  protected AtomicBoolean running = new AtomicBoolean(false);
+  private BasicClient client;
+  private AtomicBoolean running = new AtomicBoolean(false);
   protected TwitterStreamHelper processor = new TwitterStreamHelper(this);
   private DatumStatusCounter countersCurrent = new DatumStatusCounter();
   private DatumStatusCounter countersTotal = new DatumStatusCounter();
@@ -204,7 +198,7 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
 
     StreamsResultSet current;
     synchronized (this) {
-      Queue<StreamsDatum> drain = Queues.newLinkedBlockingDeque();
+      Queue<StreamsDatum> drain = new LinkedBlockingDeque<>();
       drainTo(drain);
       current = new StreamsResultSet(drain);
       current.setCounter(new DatumStatusCounter());
@@ -234,8 +228,9 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
   @Override
   public void prepare(Object configurationObject) {
 
-    Preconditions.checkNotNull(config.getEndpoint());
+    Objects.requireNonNull(config.getEndpoint());
 
+    Hosts hosebirdHosts;
     if (config.getEndpoint().equals("userstream") ) {
 
       hosebirdHosts = new HttpHosts(Constants.USERSTREAM_HOST);
@@ -276,8 +271,8 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
 
     if ( config.getBasicauth() != null ) {
 
-      Preconditions.checkNotNull(config.getBasicauth().getUsername());
-      Preconditions.checkNotNull(config.getBasicauth().getPassword());
+      Objects.requireNonNull(config.getBasicauth().getUsername());
+      Objects.requireNonNull(config.getBasicauth().getPassword());
 
       auth = new BasicAuth(
           config.getBasicauth().getUsername(),
@@ -286,10 +281,10 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
 
     } else if ( config.getOauth() != null ) {
 
-      Preconditions.checkNotNull(config.getOauth().getConsumerKey());
-      Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
-      Preconditions.checkNotNull(config.getOauth().getAccessToken());
-      Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
+      Objects.requireNonNull(config.getOauth().getConsumerKey());
+      Objects.requireNonNull(config.getOauth().getConsumerSecret());
+      Objects.requireNonNull(config.getOauth().getAccessToken());
+      Objects.requireNonNull(config.getOauth().getAccessTokenSecret());
 
       auth = new OAuth1(config.getOauth().getConsumerKey(),
           config.getOauth().getConsumerSecret(),

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index 3210f80..214d204 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -35,7 +35,6 @@ import org.apache.streams.util.ComponentUtils;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -47,7 +46,6 @@ import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import twitter4j.Twitter;
-import twitter4j.TwitterException;
 import twitter4j.TwitterFactory;
 import twitter4j.conf.ConfigurationBuilder;
 import twitter4j.json.DataObjectFactory;
@@ -59,8 +57,10 @@ import java.io.PrintStream;
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
@@ -78,9 +78,9 @@ import static java.util.concurrent.Executors.newSingleThreadExecutor;
  */
 public class TwitterUserInformationProvider implements StreamsProvider, Serializable {
 
-  public static final String STREAMS_ID = "TwitterUserInformationProvider";
+  private static final String STREAMS_ID = "TwitterUserInformationProvider";
 
-  private static ObjectMapper MAPPER = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+  private static ObjectMapper MAPPER = new StreamsJacksonMapper(Collections.singletonList(TwitterDateTimeFormat.TWITTER_FORMAT));
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TwitterUserInformationProvider.class);
 
@@ -133,9 +133,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
     provider.startStream();
     do {
       Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
-      Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
-      while (iterator.hasNext()) {
-        StreamsDatum datum = iterator.next();
+      for (StreamsDatum datum : provider.readCurrent()) {
         String json;
         try {
           json = MAPPER.writeValueAsString(datum.getDocument());
@@ -176,7 +174,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
   public static ExecutorService newFixedThreadPoolWithQueueSize(int numThreads, int queueSize) {
     return new ThreadPoolExecutor(numThreads, numThreads,
         5000L, TimeUnit.MILLISECONDS,
-        new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
+        new ArrayBlockingQueue<>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
   }
 
   /**
@@ -207,13 +205,13 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
       config = (TwitterUserInformationConfiguration) configurationObject;
     }
 
-    Preconditions.checkNotNull(config);
-    Preconditions.checkNotNull(config.getOauth());
-    Preconditions.checkNotNull(config.getOauth().getConsumerKey());
-    Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
-    Preconditions.checkNotNull(config.getOauth().getAccessToken());
-    Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
-    Preconditions.checkNotNull(config.getInfo());
+    Objects.requireNonNull(config);
+    Objects.requireNonNull(config.getOauth());
+    Objects.requireNonNull(config.getOauth().getConsumerKey());
+    Objects.requireNonNull(config.getOauth().getConsumerSecret());
+    Objects.requireNonNull(config.getOauth().getAccessToken());
+    Objects.requireNonNull(config.getOauth().getAccessTokenSecret());
+    Objects.requireNonNull(config.getInfo());
 
     try {
       lock.writeLock().lock();
@@ -222,13 +220,13 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
       lock.writeLock().unlock();
     }
 
-    Preconditions.checkNotNull(providerQueue);
+    Objects.requireNonNull(providerQueue);
 
-    List<String> screenNames = new ArrayList<String>();
-    List<String[]> screenNameBatches = new ArrayList<String[]>();
+    List<String> screenNames = new ArrayList<>();
+    List<String[]> screenNameBatches = new ArrayList<>();
 
-    List<Long> ids = new ArrayList<Long>();
-    List<Long[]> idsBatches = new ArrayList<Long[]>();
+    List<Long> ids = new ArrayList<>();
+    List<Long[]> idsBatches = new ArrayList<>();
 
     for (String s : config.getInfo()) {
       if (s != null) {
@@ -248,14 +246,14 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
           // add the batch
           idsBatches.add(ids.toArray(new Long[ids.size()]));
           // reset the Ids
-          ids = new ArrayList<Long>();
+          ids = new ArrayList<>();
         }
 
         if (screenNames.size() >= 100) {
           // add the batch
           screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
           // reset the Ids
-          screenNames = new ArrayList<String>();
+          screenNames = new ArrayList<>();
         }
       }
     }
@@ -275,7 +273,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
       executor = MoreExecutors.listeningDecorator(newSingleThreadExecutor());
     }
 
-    Preconditions.checkNotNull(executor);
+    Objects.requireNonNull(executor);
 
     this.idsBatches = idsBatches.iterator();
     this.screenNameBatches = screenNameBatches.iterator();
@@ -284,7 +282,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
   @Override
   public void startStream() {
 
-    Preconditions.checkNotNull(executor);
+    Objects.requireNonNull(executor);
 
     Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
 
@@ -327,8 +325,6 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
           }
         }
         keepTrying = 10;
-      } catch (TwitterException twitterException) {
-        keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
       } catch (Exception ex) {
         keepTrying += TwitterErrorHandler.handleTwitterError(client, ex);
       }
@@ -353,8 +349,6 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
           }
         }
         keepTrying = 10;
-      } catch (TwitterException twitterException) {
-        keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
       } catch (Exception ex) {
         keepTrying += TwitterErrorHandler.handleTwitterError(client, ex);
       }
@@ -383,7 +377,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
   }
 
   protected Queue<StreamsDatum> constructQueue() {
-    return new LinkedBlockingQueue<StreamsDatum>();
+    return new LinkedBlockingQueue<>();
   }
 
   public StreamsResultSet readNew(BigInteger sequence) {
@@ -397,8 +391,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
     this.start = start;
     this.end = end;
     readCurrent();
-    StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
-    return result;
+    return (StreamsResultSet)providerQueue.iterator();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java
index 6e269e5..d8967d1 100644
--- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java
+++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java
@@ -26,7 +26,6 @@ import org.apache.streams.pojo.json.Activity;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.google.api.services.youtube.model.Channel;
 import com.google.api.services.youtube.model.Video;
-import com.google.common.collect.Lists;
 import com.youtube.serializer.YoutubeActivityUtil;
 import com.youtube.serializer.YoutubeChannelDeserializer;
 import com.youtube.serializer.YoutubeEventClassifier;
@@ -35,6 +34,7 @@ import org.apache.commons.lang.NotImplementedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Queue;
 
@@ -47,7 +47,6 @@ public class YoutubeTypeConverter implements StreamsProcessor {
   private StreamsJacksonMapper mapper;
   private Queue<Video> inQueue;
   private Queue<StreamsDatum> outQueue;
-  private YoutubeActivityUtil youtubeActivityUtil;
   private int count = 0;
 
   public YoutubeTypeConverter() {}
@@ -65,7 +64,7 @@ public class YoutubeTypeConverter implements StreamsProcessor {
       Object item = streamsDatum.getDocument();
 
       LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
-      Activity activity = null;
+      Activity activity;
 
       if (item instanceof String) {
         item = deserializeItem(item);
@@ -73,10 +72,10 @@ public class YoutubeTypeConverter implements StreamsProcessor {
 
       if (item instanceof Video) {
         activity = new Activity();
-        youtubeActivityUtil.updateActivity((Video)item, activity, streamsDatum.getId());
+        YoutubeActivityUtil.updateActivity((Video)item, activity, streamsDatum.getId());
       } else if (item instanceof Channel) {
         activity = new Activity();
-        this.youtubeActivityUtil.updateActivity((Channel)item, activity, null);
+        YoutubeActivityUtil.updateActivity((Channel)item, activity, null);
       } else {
         throw new NotImplementedException("Type conversion not implement for type : " + item.getClass().getName());
       }
@@ -90,9 +89,11 @@ public class YoutubeTypeConverter implements StreamsProcessor {
     }
 
     if ( result != null ) {
-      return Lists.newArrayList(result);
+      List<StreamsDatum> streamsDatumList = new ArrayList<>();
+      streamsDatumList.add(result);
+      return streamsDatumList;
     } else {
-      return Lists.newArrayList();
+      return new ArrayList<>();
     }
   }
 
@@ -113,7 +114,6 @@ public class YoutubeTypeConverter implements StreamsProcessor {
 
   @Override
   public void prepare(Object configurationObject) {
-    youtubeActivityUtil = new YoutubeActivityUtil();
     mapper = StreamsJacksonMapper.getInstance();
 
     SimpleModule simpleModule = new SimpleModule();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java
index 1442f8b..401b836 100644
--- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java
+++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java
@@ -38,8 +38,6 @@ import com.google.api.client.json.jackson2.JacksonFactory;
 import com.google.api.client.repackaged.com.google.common.base.Strings;
 import com.google.api.services.youtube.YouTube;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -54,9 +52,11 @@ import java.io.IOException;
 import java.math.BigInteger;
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executors;
@@ -65,24 +65,24 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 public abstract class YoutubeProvider implements StreamsProvider {
 
-  public static final String STREAMS_ID = "YoutubeProvider";
+  private static final String STREAMS_ID = "YoutubeProvider";
 
   private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeProvider.class);
   private static final int MAX_BATCH_SIZE = 1000;
 
   // This OAuth 2.0 access scope allows for full read/write access to the
   // authenticated user's account.
-  private List<String> scopes = Lists.newArrayList("https://www.googleapis.com/auth/youtube");
+  private List<String> scopes = Collections.singletonList("https://www.googleapis.com/auth/youtube");
 
   /**
    * Define a global instance of the HTTP transport.
    */
-  public static final HttpTransport HTTP_TRANSPORT = new NetHttpTransport();
+  private static final HttpTransport HTTP_TRANSPORT = new NetHttpTransport();
 
   /**
    * Define a global instance of the JSON factory.
    */
-  public static final JsonFactory JSON_FACTORY = new JacksonFactory();
+  private static final JsonFactory JSON_FACTORY = new JacksonFactory();
 
   private static final int DEFAULT_THREAD_POOL_SIZE = 5;
 
@@ -104,7 +104,7 @@ public abstract class YoutubeProvider implements StreamsProvider {
     this.config = new ComponentConfigurator<>(YoutubeConfiguration.class)
         .detectConfiguration(StreamsConfigurator.getConfig().getConfig("youtube"));
 
-    Preconditions.checkNotNull(this.config.getApiKey());
+    Objects.requireNonNull(this.config.getApiKey());
   }
 
   /**
@@ -114,7 +114,7 @@ public abstract class YoutubeProvider implements StreamsProvider {
   public YoutubeProvider(YoutubeConfiguration config) {
     this.config = config;
 
-    Preconditions.checkNotNull(this.config.getApiKey());
+    Objects.requireNonNull(this.config.getApiKey());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java
index 4754353..0ca161f 100644
--- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java
+++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java
@@ -26,13 +26,12 @@ import org.apache.streams.pojo.json.ActivityObject;
 import org.apache.streams.pojo.json.Image;
 import org.apache.streams.pojo.json.Provider;
 
-import com.google.api.client.util.Maps;
+import com.google.api.services.youtube.YouTube;
 import com.google.api.services.youtube.model.Channel;
 import com.google.api.services.youtube.model.Thumbnail;
 import com.google.api.services.youtube.model.ThumbnailDetails;
 import com.google.api.services.youtube.model.Video;
 import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -40,14 +39,15 @@ import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 public class YoutubeActivityUtil {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeActivityUtil.class);
 
   /**
-   * Given a {@link com.google.api.services.youtube.YouTube.Videos} object and an
-   * {@link org.apache.streams.pojo.json.Activity} object, fill out the appropriate details
+   * Given a {@link YouTube.Videos} object and an
+   * {@link Activity} object, fill out the appropriate details
    *
    * @param video Video
    * @param activity Activity
@@ -57,10 +57,7 @@ public class YoutubeActivityUtil {
     activity.setActor(buildActor(video, video.getSnippet().getChannelId()));
     activity.setVerb("post");
 
-    activity.setId(formatId(activity.getVerb(),
-        Optional.fromNullable(
-            video.getId())
-            .orNull()));
+    activity.setId(formatId(activity.getVerb(), Optional.ofNullable(video.getId()).orElse(null)));
 
     activity.setPublished(new DateTime(video.getSnippet().getPublishedAt().getValue()));
     activity.setTitle(video.getSnippet().getTitle());
@@ -76,8 +73,8 @@ public class YoutubeActivityUtil {
 
 
   /**
-   * Given a {@link com.google.api.services.youtube.model.Channel} object and an
-   * {@link org.apache.streams.pojo.json.Activity} object, fill out the appropriate details
+   * Given a {@link Channel} object and an
+   * {@link Activity} object, fill out the appropriate details
    *
    * @param channel Channel
    * @param activity Activity
@@ -88,7 +85,7 @@ public class YoutubeActivityUtil {
       activity.setProvider(getProvider());
       activity.setVerb("post");
       activity.setActor(createActorForChannel(channel));
-      Map<String, Object> extensions = Maps.newHashMap();
+      Map<String, Object> extensions = new HashMap<>();
       extensions.put("youtube", channel);
       activity.setAdditionalProperty("extensions", extensions);
     } catch (Throwable throwable) {
@@ -111,7 +108,7 @@ public class YoutubeActivityUtil {
     image.setUrl(channel.getSnippet().getThumbnails().getHigh().getUrl());
     actor.setImage(image);
     actor.setUrl("https://youtube.com/user/" + channel.getId());
-    Map<String, Object> actorExtensions = Maps.newHashMap();
+    Map<String, Object> actorExtensions = new HashMap<>();
     actorExtensions.put("followers", channel.getStatistics().getSubscriberCount());
     actorExtensions.put("posts", channel.getStatistics().getVideoCount());
     actor.setAdditionalProperty("extensions", actorExtensions);
@@ -163,7 +160,7 @@ public class YoutubeActivityUtil {
   }
 
   /**
-   * Build an {@link org.apache.streams.pojo.json.ActivityObject} actor given the video object
+   * Build an {@link ActivityObject} actor given the video object
    * @param video Video
    * @param id id
    * @return Actor object
@@ -180,7 +177,7 @@ public class YoutubeActivityUtil {
   }
 
   /**
-   * Gets the common youtube {@link org.apache.streams.pojo.json.Provider} object
+   * Gets the common youtube {@link Provider} object
    * @return a provider object representing YouTube
    */
   public static Provider getProvider() {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java
index ea9a49d..e28b4a1 100644
--- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java
+++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java
@@ -20,7 +20,6 @@
 package com.youtube.serializer;
 
 import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.DeserializationContext;
 import com.fasterxml.jackson.databind.JsonDeserializer;
 import com.fasterxml.jackson.databind.JsonNode;
@@ -33,10 +32,9 @@ import com.google.api.services.youtube.model.ChannelStatistics;
 import com.google.api.services.youtube.model.ChannelTopicDetails;
 import com.google.api.services.youtube.model.Thumbnail;
 import com.google.api.services.youtube.model.ThumbnailDetails;
-import com.google.common.collect.Lists;
 
 import java.io.IOException;
-import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 
 /**
@@ -45,7 +43,7 @@ import java.util.List;
 public class YoutubeChannelDeserializer extends JsonDeserializer<Channel> {
 
   @Override
-  public Channel deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException {
+  public Channel deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException {
     JsonNode node = jp.getCodec().readTree(jp);
     try {
       Channel channel = new Channel();
@@ -144,10 +142,9 @@ public class YoutubeChannelDeserializer extends JsonDeserializer<Channel> {
     if (node == null) {
       return details;
     }
-    List<String> topicIds = Lists.newLinkedList();
-    Iterator<JsonNode> it = node.get("topicIds").iterator();
-    while (it.hasNext()) {
-      topicIds.add(it.next().asText());
+    List<String> topicIds = new LinkedList<>();
+    for (JsonNode jsonNode : node.get("topicIds")) {
+      topicIds.add(jsonNode.asText());
     }
     details.setTopicIds(topicIds);
     return  details;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java
index 65a454c..e7645bd 100644
--- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java
+++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java
@@ -27,6 +27,7 @@ import com.google.common.base.Preconditions;
 import org.apache.commons.lang.StringUtils;
 
 import java.io.IOException;
+import java.util.Objects;
 
 public class YoutubeEventClassifier {
   private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
@@ -39,7 +40,7 @@ public class YoutubeEventClassifier {
    * @return Class
    */
   public static Class detectClass(String json) {
-    Preconditions.checkNotNull(json);
+    Objects.requireNonNull(json);
     Preconditions.checkArgument(StringUtils.isNotEmpty(json));
 
     ObjectNode objectNode;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeChannelDataCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeChannelDataCollectorTest.java b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeChannelDataCollectorTest.java
index 4751f00..0ae4822 100644
--- a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeChannelDataCollectorTest.java
+++ b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeChannelDataCollectorTest.java
@@ -27,13 +27,13 @@ import org.apache.streams.util.api.requests.backoff.impl.LinearTimeBackOffStrate
 import com.google.api.services.youtube.YouTube;
 import com.google.api.services.youtube.model.Channel;
 import com.google.api.services.youtube.model.ChannelListResponse;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Queues;
 import org.apache.youtube.pojo.YoutubeConfiguration;
 import org.junit.Test;
 
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -51,7 +51,7 @@ public class YoutubeChannelDataCollectorTest {
   @Test
   public void testDataCollector() throws Exception {
     YouTube youTube = createMockYoutube();
-    BlockingQueue<StreamsDatum> queue = Queues.newLinkedBlockingQueue();
+    BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>();
     BackOffStrategy strategy = new LinearTimeBackOffStrategy(1);
     UserInfo userInfo = new UserInfo();
     userInfo.setUserId(ID);
@@ -91,7 +91,7 @@ public class YoutubeChannelDataCollectorTest {
 
   private ChannelListResponse createMockResponse() {
     ChannelListResponse response = new ChannelListResponse();
-    List<Channel> channelList = Lists.newLinkedList();
+    List<Channel> channelList = new LinkedList<>();
     response.setItems(channelList);
     Channel channel = new Channel();
     channel.setId(ID);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java
index 5a2af8a..54c0375 100644
--- a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java
+++ b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java
@@ -23,15 +23,15 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.google.gplus.configuration.UserInfo;
 import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
 
-import com.google.api.client.util.Maps;
-import com.google.api.client.util.Sets;
 import com.google.api.services.youtube.YouTube;
-import com.google.common.collect.Lists;
 import org.apache.youtube.pojo.YoutubeConfiguration;
 import org.joda.time.DateTime;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -54,7 +54,7 @@ public class YoutubeProviderTest {
   public void testDataCollectorRunsPerUser() {
     Random random = new Random(System.currentTimeMillis());
     int numUsers = random.nextInt(1000);
-    List<UserInfo> userList = Lists.newLinkedList();
+    List<UserInfo> userList = new LinkedList<>();
 
     for ( int i = 0; i < numUsers; ++i ) {
       userList.add(new UserInfo());
@@ -107,7 +107,7 @@ public class YoutubeProviderTest {
     provider.setDefaultAfterDate(afterDate);
     provider.setDefaultBeforeDate(beforeDate);
 
-    Set<String> users = Sets.newHashSet();
+    Set<String> users = new HashSet<>();
     users.add("test_user_1");
     users.add("test_user_2");
     users.add("test_user_3");
@@ -128,7 +128,7 @@ public class YoutubeProviderTest {
     config.setApiKey("API_KEY");
     YoutubeProvider provider = buildProvider(config);
 
-    Map<String, DateTime> users = Maps.newHashMap();
+    Map<String, DateTime> users = new HashMap<>();
     users.put("user1", new DateTime(System.currentTimeMillis()));
     users.put("user3", new DateTime(System.currentTimeMillis()));
     users.put("user4", new DateTime(System.currentTimeMillis()));

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeUserActivityCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeUserActivityCollectorTest.java b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeUserActivityCollectorTest.java
index 1870c14..d6a540e 100644
--- a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeUserActivityCollectorTest.java
+++ b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeUserActivityCollectorTest.java
@@ -24,7 +24,6 @@ import org.apache.streams.google.gplus.configuration.UserInfo;
 import org.apache.streams.local.queues.ThroughputQueue;
 import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
 
-import com.google.api.client.util.Lists;
 import com.google.api.services.youtube.YouTube;
 import com.google.api.services.youtube.model.Activity;
 import com.google.api.services.youtube.model.ActivityContentDetails;
@@ -41,6 +40,8 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 
@@ -170,7 +171,7 @@ public class YoutubeUserActivityCollectorTest {
 
   private ActivityListResponse buildActivityListResponse(int num) {
     ActivityListResponse activityListResponse = new ActivityListResponse();
-    List<Activity> items = Lists.newArrayList();
+    List<Activity> items = new ArrayList<>();
 
     for ( int x = 0; x < num; x++ ) {
       Activity activity = new Activity();
@@ -193,21 +194,15 @@ public class YoutubeUserActivityCollectorTest {
 
   private YouTube buildYouTube(int numBeforeRange, int numAfterRange, int numInRange, DateTime afterDate, DateTime beforeDate) {
 
-    YouTube youtube = createYoutubeMock(numBeforeRange, numAfterRange, numInRange, afterDate, beforeDate);
+    return createYoutubeMock(numBeforeRange, numAfterRange, numInRange, afterDate, beforeDate);
 
-    return youtube;
   }
 
   private YouTube createYoutubeMock(int numBefore, int numAfter, int numInRange,  DateTime after, DateTime before) {
     YouTube youtube = mock(YouTube.class);
 
     final YouTube.Videos videos = createMockVideos(numBefore, numAfter, numInRange, after, before);
-    doAnswer(new Answer() {
-      @Override
-      public YouTube.Videos answer(InvocationOnMock invocationOnMock) throws Throwable {
-        return videos;
-      }
-    }).when(youtube).videos();
+    doAnswer(invocationOnMock -> videos).when(youtube).videos();
 
     return youtube;
   }
@@ -245,7 +240,7 @@ public class YoutubeUserActivityCollectorTest {
 
   private static VideoListResponse createMockVideoListResponse(int numBefore, int numAfter, int numInRange,  DateTime after, DateTime before, boolean page) {
     VideoListResponse feed = new VideoListResponse();
-    List<Video> list = com.google.common.collect.Lists.newLinkedList();
+    List<Video> list = new LinkedList<>();
 
     for (int i = 0; i < numAfter; ++i) {
       com.google.api.client.util.DateTime published = new com.google.api.client.util.DateTime(after.getMillis() + 1000000);
@@ -255,7 +250,7 @@ public class YoutubeUserActivityCollectorTest {
       list.add(video);
     }
     for (int i = 0; i < numInRange; ++i) {
-      DateTime published = null;
+      DateTime published;
       if ((before == null && after == null) || before == null) {
         published = DateTime.now(); // no date range or end time date range so just make the time now.
       } else if (after == null) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java b/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java
index 1052647..0920829 100644
--- a/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java
+++ b/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java
@@ -40,9 +40,9 @@ import java.math.BigInteger;
  */
 public interface StreamBuilder extends Serializable {
 
-  public StreamBuilder setStreamsConfiguration(StreamsConfiguration configuration);
+  StreamBuilder setStreamsConfiguration(StreamsConfiguration configuration);
 
-  public StreamsConfiguration getStreamsConfiguration();
+  StreamsConfiguration getStreamsConfiguration();
 
   /**
    * Add a {@link org.apache.streams.core.StreamsProcessor} to the data processing stream.
@@ -53,7 +53,7 @@ public interface StreamBuilder extends Serializable {
    *                     receive data from.
    * @return this
    */
-  public StreamBuilder addStreamsProcessor(String processorId, StreamsProcessor processor, int numTasks, String... connectToIds);
+  StreamBuilder addStreamsProcessor(String processorId, StreamsProcessor processor, int numTasks, String... connectToIds);
 
   /**
    * Add a {@link org.apache.streams.core.StreamsPersistWriter} to the data processing stream.
@@ -64,7 +64,7 @@ public interface StreamBuilder extends Serializable {
    *                     receive data from.
    * @return this
    */
-  public StreamBuilder addStreamsPersistWriter(String persistWriterId, StreamsPersistWriter writer, int numTasks, String... connectToIds);
+  StreamBuilder addStreamsPersistWriter(String persistWriterId, StreamsPersistWriter writer, int numTasks, String... connectToIds);
 
   /**
    * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream.  The provider will execute
@@ -73,7 +73,7 @@ public interface StreamBuilder extends Serializable {
    * @param provider provider to execute
    * @return this
    */
-  public StreamBuilder newPerpetualStream(String streamId, StreamsProvider provider);
+  StreamBuilder newPerpetualStream(String streamId, StreamsProvider provider);
 
   /**
    * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream.  The provider will execute
@@ -82,7 +82,7 @@ public interface StreamBuilder extends Serializable {
    * @param provider provider to execute
    * @return this
    */
-  public StreamBuilder newReadCurrentStream(String streamId, StreamsProvider provider);
+  StreamBuilder newReadCurrentStream(String streamId, StreamsProvider provider);
 
   /**
    * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream.  The provider will execute
@@ -92,7 +92,7 @@ public interface StreamBuilder extends Serializable {
    * @param sequence sequence to pass to {@link org.apache.streams.core.StreamsProvider:readNext(BigInteger)} method
    * @return this
    */
-  public StreamBuilder newReadNewStream(String streamId, StreamsProvider provider, BigInteger sequence);
+  StreamBuilder newReadNewStream(String streamId, StreamsProvider provider, BigInteger sequence);
 
   /**
    * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream.  The provider will execute
@@ -104,25 +104,17 @@ public interface StreamBuilder extends Serializable {
    * @param end end date
    * @return this
    */
-  public StreamBuilder newReadRangeStream(String streamId, StreamsProvider provider, DateTime start, DateTime end);
+  StreamBuilder newReadRangeStream(String streamId, StreamsProvider provider, DateTime start, DateTime end);
 
   /**
    * Builds the stream, and starts it or submits it based on implementation.
    */
-  public void start();
+  void start();
 
   /**
    * Stops the streams processing.  No guarantee on a smooth shutdown. Optional method, may not be implemented in
    * all cases.
    */
-  public void stop();
-
-
-
-
-
-
-
-
+  void stop();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java b/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java
index 490f454..a86499b 100644
--- a/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java
+++ b/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java
@@ -28,19 +28,19 @@ public interface StreamsOperation extends Serializable {
   /**
    * Each operation must publish an identifier.
    */
-  public String getId();
+  String getId();
 
   /**
    * This method will be called after initialization/serialization. Initialize any non-serializable objects here.
    * @param configurationObject Any object to help intialize the operation. ie. Map, JobContext, Properties, etc. The type
    *                            will be based on where the operation is being run (ie. hadoop, storm, locally, etc.)
    */
-  public void prepare(Object configurationObject);
+  void prepare(Object configurationObject);
 
   /**
    * No guarantee that this method will ever be called.  But upon shutdown of the stream, an attempt to call this method
    * will be made.
    * Use this method to terminate connections, etc.
    */
-  public void cleanUp();
+  void cleanUp();
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
index a797ce5..5b85e68 100644
--- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
+++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
@@ -40,11 +40,11 @@ import org.apache.streams.pojo.json.ThroughputQueueBroadcast;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 
 import java.lang.management.ManagementFactory;
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -128,7 +128,7 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple
     LOGGER.info("BroadcastMonitorThread running");
     while (keepRunning) {
       try {
-        List<String> messages = Lists.newArrayList();
+        List<String> messages = new ArrayList<>();
         Set<ObjectName> beans = server.queryNames(null, null);
 
         for (ObjectName name : beans) {
@@ -185,13 +185,16 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple
       }
 
       if (broadcastUri != null) {
-        if (broadcastUri.getScheme().equals("http")) {
-          messagePersister = new BroadcastMessagePersister(broadcastUri.toString());
-        } else if (broadcastUri.getScheme().equals("udp")) {
-          messagePersister = new LogstashUdpMessagePersister(broadcastUri.toString());
-        } else {
-          LOGGER.error("You need to specify a broadcast URI with either a HTTP or UDP protocol defined.");
-          throw new RuntimeException();
+        switch (broadcastUri.getScheme()) {
+          case "http":
+            messagePersister = new BroadcastMessagePersister(broadcastUri.toString());
+            break;
+          case "udp":
+            messagePersister = new LogstashUdpMessagePersister(broadcastUri.toString());
+            break;
+          default:
+            LOGGER.error("You need to specify a broadcast URI with either a HTTP or UDP protocol defined.");
+            throw new RuntimeException();
         }
       } else {
         messagePersister = new Slf4jMessagePersister();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java b/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java
index 3f9a4c1..c85ff90 100644
--- a/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java
+++ b/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.net.DatagramPacket;
 import java.net.DatagramSocket;
 import java.net.SocketException;
+import java.util.ArrayList;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
@@ -37,7 +38,7 @@ public class LogstashUdpMessagePersisterTest {
 
   private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(LogstashUdpMessagePersisterTest.class);
 
-  DatagramSocket socket = null;
+  private DatagramSocket socket = null;
 
   /**
    * setup.
@@ -56,7 +57,7 @@ public class LogstashUdpMessagePersisterTest {
   public void testFailedPersist() {
     LogstashUdpMessagePersister persister = new LogstashUdpMessagePersister("udp://127.0.0.1:56789");
 
-    List<String> messageArray = Lists.newArrayList();
+    List<String> messageArray = new ArrayList<>();
     for (int x = 0; x < 10; x ++) {
       messageArray.add("Fake_message #" + x);
     }