You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/11/25 20:25:01 UTC

[20/42] incubator-streams git commit: STREAMS-440: custom checkstyle.xml, address compliance

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index 66c1104..2527d29 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -18,28 +18,26 @@
 
 package org.apache.streams.twitter.provider;
 
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.TwitterFollowingConfiguration;
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Queues;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.DatumStatusCounter;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.TwitterFollowingConfiguration;
-import org.apache.streams.twitter.TwitterStreamConfiguration;
-import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
-import org.apache.streams.util.ComponentUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import twitter4j.Twitter;
@@ -51,162 +49,184 @@ import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
- * Created by sblackmon on 11/25/14.
+ * Retrieve all follow adjacencies from a list of user ids or names.
  */
 public class TwitterFollowingProvider extends TwitterUserInformationProvider {
 
-    public static final String STREAMS_ID = "TwitterFollowingProvider";
-    private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowingProvider.class);
-
-    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
-    private TwitterFollowingConfiguration config;
-
-    List<ListenableFuture<Object>> futures = new ArrayList<>();
-
-    public static void main(String[] args) throws Exception {
+  public static final String STREAMS_ID = "TwitterFollowingProvider";
+  private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowingProvider.class);
+
+  protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+  private TwitterFollowingConfiguration config;
+
+  List<ListenableFuture<Object>> futures = new ArrayList<>();
+
+  /**
+   * To use from command line:
+   *
+   * <p/>
+   * Supply (at least) the following required configuration in application.conf:
+   *
+   * <p/>
+   * twitter.oauth.consumerKey
+   * twitter.oauth.consumerSecret
+   * twitter.oauth.accessToken
+   * twitter.oauth.accessTokenSecret
+   * twitter.info
+   *
+   * <p/>
+   * Launch using:
+   *
+   * <p/>
+   * mvn exec:java -Dexec.mainClass=org.apache.streams.twitter.provider.TwitterFollowingProvider -Dexec.args="application.conf tweets.json"
+   *
+   * @param args args
+   * @throws Exception Exception
+   */
+  public static void main(String[] args) throws Exception {
+
+    Preconditions.checkArgument(args.length >= 2);
+
+    String configfile = args[0];
+    String outfile = args[1];
+
+    Config reference = ConfigFactory.load();
+    File file = new File(configfile);
+    assert (file.exists());
+    Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+
+    StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+    TwitterFollowingConfiguration config = new ComponentConfigurator<>(TwitterFollowingConfiguration.class).detectConfiguration(typesafe, "twitter");
+    TwitterFollowingProvider provider = new TwitterFollowingProvider(config);
+
+    ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+
+    PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+    provider.prepare(config);
+    provider.startStream();
+    do {
+      Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+      Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+      while (iterator.hasNext()) {
+        StreamsDatum datum = iterator.next();
+        String json;
+        try {
+          json = mapper.writeValueAsString(datum.getDocument());
+          outStream.println(json);
+        } catch (JsonProcessingException ex) {
+          System.err.println(ex.getMessage());
+        }
+      }
+    }
+    while ( provider.isRunning());
+    provider.cleanUp();
+    outStream.flush();
+  }
 
-        Preconditions.checkArgument(args.length >= 2);
+  public TwitterFollowingConfiguration getConfig() {
+    return config;
+  }
 
-        String configfile = args[0];
-        String outfile = args[1];
+  public static final int MAX_NUMBER_WAITING = 10000;
 
-        Config reference = ConfigFactory.load();
-        File conf_file = new File(configfile);
-        assert(conf_file.exists());
-        Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+  public TwitterFollowingProvider() {
+    this.config = new ComponentConfigurator<>(TwitterFollowingConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("twitter"));
+  }
 
-        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+  public TwitterFollowingProvider(TwitterFollowingConfiguration config) {
+    super(config);
+    this.config = config;
+  }
 
-        StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
-        TwitterFollowingConfiguration config = new ComponentConfigurator<>(TwitterFollowingConfiguration.class).detectConfiguration(typesafe, "twitter");
-        TwitterFollowingProvider provider = new TwitterFollowingProvider(config);
+  @Override
+  public void prepare(Object configurationObject) {
+    super.prepare(config);
+    Preconditions.checkNotNull(getConfig().getEndpoint());
+    Preconditions.checkArgument(getConfig().getEndpoint().equals("friends") || getConfig().getEndpoint().equals("followers"));
+    return;
+  }
 
-        ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+  @Override
+  public void startStream() {
 
-        PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
-        provider.prepare(config);
-        provider.startStream();
-        do {
-            Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
-            Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
-            while(iterator.hasNext()) {
-                StreamsDatum datum = iterator.next();
-                String json;
-                try {
-                    json = mapper.writeValueAsString(datum.getDocument());
-                    outStream.println(json);
-                } catch (JsonProcessingException e) {
-                    System.err.println(e.getMessage());
-                }
-            }
-        } while( provider.isRunning());
-        provider.cleanUp();
-        outStream.flush();
-    }
+    Preconditions.checkNotNull(executor);
 
-    public TwitterFollowingConfiguration getConfig()              { return config; }
+    Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
 
-    public static final int MAX_NUMBER_WAITING = 10000;
+    LOGGER.info("startStream");
 
-    public TwitterFollowingProvider() {
-        this.config = new ComponentConfigurator<>(TwitterFollowingConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("twitter"));
-    }
+    running.set(true);
 
-    public TwitterFollowingProvider(TwitterFollowingConfiguration config) {
-        super(config);
-        this.config = config;
+    while (idsBatches.hasNext()) {
+      submitFollowingThreads(idsBatches.next());
     }
-
-    @Override
-    public void prepare(Object o) {
-        super.prepare(config);
-        Preconditions.checkNotNull(getConfig().getEndpoint());
-        Preconditions.checkArgument(getConfig().getEndpoint().equals("friends") || getConfig().getEndpoint().equals("followers"));
-        return;
+    while (screenNameBatches.hasNext()) {
+      submitFollowingThreads(screenNameBatches.next());
     }
 
-    @Override
-    public void startStream() {
+    executor.shutdown();
 
-        Preconditions.checkNotNull(executor);
-
-        Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
-
-        LOGGER.info("startStream");
-
-        running.set(true);
-
-        while (idsBatches.hasNext()) {
-            submitFollowingThreads(idsBatches.next());
-        }
-        while (screenNameBatches.hasNext()) {
-            submitFollowingThreads(screenNameBatches.next());
-        }
+  }
 
-        executor.shutdown();
+  protected void submitFollowingThreads(Long[] ids) {
+    Twitter client = getTwitterClient();
 
+    for (int i = 0; i < ids.length; i++) {
+      TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, ids[i]);
+      ListenableFuture future = executor.submit(providerTask);
+      futures.add(future);
+      LOGGER.info("submitted {}", ids[i]);
     }
+  }
 
-    protected void submitFollowingThreads(Long[] ids) {
-        Twitter client = getTwitterClient();
+  protected void submitFollowingThreads(String[] screenNames) {
+    Twitter client = getTwitterClient();
 
-        for (int i = 0; i < ids.length; i++) {
-            TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, ids[i]);
-            ListenableFuture future = executor.submit(providerTask);
-            futures.add(future);
-            LOGGER.info("submitted {}", ids[i]);
-        }
+    for (int i = 0; i < screenNames.length; i++) {
+      TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, screenNames[i]);
+      ListenableFuture future = executor.submit(providerTask);
+      futures.add(future);
+      LOGGER.info("submitted {}", screenNames[i]);
     }
 
-    protected void submitFollowingThreads(String[] screenNames) {
-        Twitter client = getTwitterClient();
-
-        for (int i = 0; i < screenNames.length; i++) {
-            TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, screenNames[i]);
-            ListenableFuture future = executor.submit(providerTask);
-            futures.add(future);
-            LOGGER.info("submitted {}", screenNames[i]);
-        }
-
-    }
+  }
 
-    @Override
-    public StreamsResultSet readCurrent() {
+  @Override
+  public StreamsResultSet readCurrent() {
 
-        LOGGER.info("{}{} - readCurrent", idsBatches, screenNameBatches);
+    LOGGER.info("{}{} - readCurrent", idsBatches, screenNameBatches);
 
-        StreamsResultSet result;
+    StreamsResultSet result;
 
-        try {
-            lock.writeLock().lock();
-            result = new StreamsResultSet(providerQueue);
-            result.setCounter(new DatumStatusCounter());
-            providerQueue = constructQueue();
-            LOGGER.debug("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size());
-        } finally {
-            lock.writeLock().unlock();
-        }
+    try {
+      lock.writeLock().lock();
+      result = new StreamsResultSet(providerQueue);
+      result.setCounter(new DatumStatusCounter());
+      providerQueue = constructQueue();
+      LOGGER.debug("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size());
+    } finally {
+      lock.writeLock().unlock();
+    }
 
-        return result;
+    return result;
 
-    }
+  }
 
-    @Override
-    public boolean isRunning() {
-        if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) {
-            LOGGER.info("Completed");
-            running.set(false);
-            LOGGER.info("Exiting");
-        }
-        return running.get();
+  @Override
+  public boolean isRunning() {
+    if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) {
+      LOGGER.info("Completed");
+      running.set(false);
+      LOGGER.info("Exiting");
     }
+    return running.get();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
index f2346fb..ee800fa 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
@@ -18,13 +18,14 @@
 
 package org.apache.streams.twitter.provider;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.pojo.Follow;
 import org.apache.streams.twitter.pojo.User;
 import org.apache.streams.util.ComponentUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import twitter4j.PagableResponseList;
@@ -37,188 +38,208 @@ import twitter4j.TwitterObjectFactory;
  */
 public class TwitterFollowingProviderTask implements Runnable {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowingProviderTask.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowingProviderTask.class);
+
+  private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+  protected TwitterFollowingProvider provider;
+  protected Twitter client;
+  protected Long id;
+  protected String screenName;
+
+  int count = 0;
+
+  /**
+   * TwitterFollowingProviderTask constructor.
+   * @param provider TwitterFollowingProvider
+   * @param twitter Twitter
+   * @param id numeric id
+   */
+  public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id) {
+    this.provider = provider;
+    this.client = twitter;
+    this.id = id;
+  }
+
+  /**
+   * TwitterFollowingProviderTask constructor.
+   * @param provider TwitterFollowingProvider
+   * @param twitter Twitter
+   * @param screenName screenName
+   */
+  public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, String screenName) {
+    this.provider = provider;
+    this.client = twitter;
+    this.screenName = screenName;
+  }
+
+
+  @Override
+  public void run() {
+
+    Preconditions.checkArgument(id != null || screenName != null);
+
+    if ( id != null ) {
+      getFollowing(id);
+    } else if ( screenName != null) {
+      getFollowing(screenName);
+    }
 
-    private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+    LOGGER.info(id != null ? id.toString() : screenName + " Thread Finished");
 
-    protected TwitterFollowingProvider provider;
-    protected Twitter client;
-    protected Long id;
-    protected String screenName;
+  }
 
-    int count = 0;
+  protected void getFollowing(Long id) {
 
-    public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id) {
-        this.provider = provider;
-        this.client = twitter;
-        this.id = id;
-    }
+    Preconditions.checkArgument(
+        provider.getConfig().getEndpoint().equals("friends")
+        || provider.getConfig().getEndpoint().equals("followers")
+    );
 
-    public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, String screenName) {
-        this.provider = provider;
-        this.client = twitter;
-        this.screenName = screenName;
+    if ( provider.getConfig().getIdsOnly() ) {
+      collectIds(id);
+    } else {
+      collectUsers(id);
     }
+  }
 
+  protected void getFollowing(String screenName) {
 
-    @Override
-    public void run() {
+    twitter4j.User user = null;
+    try {
+      user = client.users().showUser(screenName);
+    } catch (TwitterException ex) {
+      LOGGER.error("Failure looking up " + id);
+    }
+    Preconditions.checkNotNull(user);
+    getFollowing(user.getId());
+  }
 
-        Preconditions.checkArgument(id != null || screenName != null);
+  private void collectUsers(Long id) {
+    int keepTrying = 0;
 
-        if( id != null )
-            getFollowing(id);
-        else if( screenName != null)
-            getFollowing(screenName);
+    long curser = -1;
 
-        LOGGER.info(id != null ? id.toString() : screenName + " Thread Finished");
+    do {
+      try {
+        twitter4j.User user;
+        String userJson;
+        try {
+          user = client.users().showUser(id);
+          userJson = TwitterObjectFactory.getRawJSON(user);
+        } catch (TwitterException ex) {
+          LOGGER.error("Failure looking up " + id);
+          break;
+        }
 
-    }
+        PagableResponseList<twitter4j.User> list = null;
+        if ( provider.getConfig().getEndpoint().equals("followers") ) {
+          list = client.friendsFollowers().getFollowersList(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
+        } else if ( provider.getConfig().getEndpoint().equals("friends") ) {
+          list = client.friendsFollowers().getFriendsList(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
+        }
 
-    protected void getFollowing(Long id) {
+        Preconditions.checkNotNull(list);
+        Preconditions.checkArgument(list.size() > 0);
 
-        Preconditions.checkArgument(provider.getConfig().getEndpoint().equals("friends") || provider.getConfig().getEndpoint().equals("followers"));
+        for (twitter4j.User other : list) {
 
-        if( provider.getConfig().getIdsOnly() )
-            collectIds(id);
-        else
-            collectUsers(id);
-    }
+          String otherJson = TwitterObjectFactory.getRawJSON(other);
 
-    private void collectUsers(Long id) {
-        int keepTrying = 0;
-
-        long curser = -1;
-
-        do
-        {
-            try
-            {
-                twitter4j.User user;
-                String userJson;
-                try {
-                    user = client.users().showUser(id);
-                    userJson = TwitterObjectFactory.getRawJSON(user);
-                } catch (TwitterException e) {
-                    LOGGER.error("Failure looking up " + id);
-                    break;
-                }
-
-                PagableResponseList<twitter4j.User> list = null;
-                if( provider.getConfig().getEndpoint().equals("followers") )
-                    list = client.friendsFollowers().getFollowersList(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
-                else if( provider.getConfig().getEndpoint().equals("friends") )
-                    list = client.friendsFollowers().getFriendsList(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
-
-                Preconditions.checkNotNull(list);
-                Preconditions.checkArgument(list.size() > 0);
-
-                for (twitter4j.User other : list) {
-
-                    String otherJson = TwitterObjectFactory.getRawJSON(other);
-
-                    try {
-                        Follow follow = null;
-                        if( provider.getConfig().getEndpoint().equals("followers") ) {
-                            follow = new Follow()
-                                    .withFollowee(mapper.readValue(userJson, User.class))
-                                    .withFollower(mapper.readValue(otherJson, User.class));
-                        } else if( provider.getConfig().getEndpoint().equals("friends") ) {
-                            follow = new Follow()
-                                    .withFollowee(mapper.readValue(otherJson, User.class))
-                                    .withFollower(mapper.readValue(userJson, User.class));
-                        }
-
-                        Preconditions.checkNotNull(follow);
-
-                        if( count < provider.getConfig().getMaxItems()) {
-                            ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
-                            count++;
-                        }
-
-                    } catch (Exception e) {
-                        LOGGER.warn("Exception: {}", e);
-                    }
-                }
-                if( !list.hasNext() ) break;
-                if( list.getNextCursor() == 0 ) break;
-                curser = list.getNextCursor();
-            }
-            catch(TwitterException twitterException) {
-                keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+          try {
+            Follow follow = null;
+            if ( provider.getConfig().getEndpoint().equals("followers") ) {
+              follow = new Follow()
+                  .withFollowee(mapper.readValue(userJson, User.class))
+                  .withFollower(mapper.readValue(otherJson, User.class));
+            } else if ( provider.getConfig().getEndpoint().equals("friends") ) {
+              follow = new Follow()
+                  .withFollowee(mapper.readValue(otherJson, User.class))
+                  .withFollower(mapper.readValue(userJson, User.class));
             }
-            catch(Exception e) {
-                keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+
+            Preconditions.checkNotNull(follow);
+
+            if ( count < provider.getConfig().getMaxItems()) {
+              ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
+              count++;
             }
-        } while (curser != 0 && keepTrying < provider.getConfig().getRetryMax() && count < provider.getConfig().getMaxItems());
+
+          } catch (Exception ex) {
+            LOGGER.warn("Exception: {}", ex);
+          }
+        }
+        if ( !list.hasNext() ) {
+          break;
+        }
+        if ( list.getNextCursor() == 0 ) {
+          break;
+        }
+        curser = list.getNextCursor();
+      } catch (TwitterException twitterException) {
+        keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+      } catch (Exception ex) {
+        keepTrying += TwitterErrorHandler.handleTwitterError(client, ex);
+      }
     }
+    while (curser != 0 && keepTrying < provider.getConfig().getRetryMax() && count < provider.getConfig().getMaxItems());
+  }
 
-    private void collectIds(Long id) {
-        int keepTrying = 0;
-
-        long curser = -1;
-
-        do
-        {
-            try
-            {
-                twitter4j.IDs ids = null;
-                if( provider.getConfig().getEndpoint().equals("followers") )
-                    ids = client.friendsFollowers().getFollowersIDs(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
-                else if( provider.getConfig().getEndpoint().equals("friends") )
-                    ids = client.friendsFollowers().getFriendsIDs(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
-
-                Preconditions.checkNotNull(ids);
-                Preconditions.checkArgument(ids.getIDs().length > 0);
-
-                for (long otherId : ids.getIDs()) {
-
-                    try {
-                        Follow follow = null;
-                        if( provider.getConfig().getEndpoint().equals("followers") ) {
-                            follow = new Follow()
-                                    .withFollowee(new User().withId(id))
-                                    .withFollower(new User().withId(otherId));
-                        } else if( provider.getConfig().getEndpoint().equals("friends") ) {
-                            follow = new Follow()
-                                    .withFollowee(new User().withId(otherId))
-                                    .withFollower(new User().withId(id));
-                        }
-
-                        Preconditions.checkNotNull(follow);
-
-                        if( count < provider.getConfig().getMaxItems()) {
-                            ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
-                            count++;
-                        }
-                    } catch (Exception e) {
-                        LOGGER.warn("Exception: {}", e);
-                    }
-                }
-                if( !ids.hasNext() ) break;
-                if( ids.getNextCursor() == 0 ) break;
-                curser = ids.getNextCursor();
-            }
-            catch(TwitterException twitterException) {
-                keepTrying += TwitterErrorHandler.handleTwitterError(client, id, twitterException);
-            }
-            catch(Exception e) {
-                keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+  private void collectIds(Long id) {
+    int keepTrying = 0;
+
+    long curser = -1;
+
+    do {
+      try {
+        twitter4j.IDs ids = null;
+        if ( provider.getConfig().getEndpoint().equals("followers") ) {
+          ids = client.friendsFollowers().getFollowersIDs(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
+        } else if ( provider.getConfig().getEndpoint().equals("friends") ) {
+          ids = client.friendsFollowers().getFriendsIDs(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
+        }
+
+        Preconditions.checkNotNull(ids);
+        Preconditions.checkArgument(ids.getIDs().length > 0);
+
+        for (long otherId : ids.getIDs()) {
+
+          try {
+            Follow follow = null;
+            if ( provider.getConfig().getEndpoint().equals("followers") ) {
+              follow = new Follow()
+                  .withFollowee(new User().withId(id))
+                  .withFollower(new User().withId(otherId));
+            } else if ( provider.getConfig().getEndpoint().equals("friends") ) {
+              follow = new Follow()
+                  .withFollowee(new User().withId(otherId))
+                  .withFollower(new User().withId(id));
             }
-        } while (curser != 0 && keepTrying < provider.getConfig().getRetryMax() && count < provider.getConfig().getMaxItems());
-    }
 
-    protected void getFollowing(String screenName) {
+            Preconditions.checkNotNull(follow);
 
-        twitter4j.User user = null;
-        try {
-            user = client.users().showUser(screenName);
-        } catch (TwitterException e) {
-            LOGGER.error("Failure looking up " + id);
+            if ( count < provider.getConfig().getMaxItems()) {
+              ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
+              count++;
+            }
+          } catch (Exception ex) {
+            LOGGER.warn("Exception: {}", ex);
+          }
+        }
+        if ( !ids.hasNext() ) {
+          break;
         }
-        Preconditions.checkNotNull(user);
-        getFollowing(user.getId());
+        if ( ids.getNextCursor() == 0 ) {
+          break;
+        }
+        curser = ids.getNextCursor();
+      } catch (TwitterException twitterException) {
+        keepTrying += TwitterErrorHandler.handleTwitterError(client, id, twitterException);
+      } catch (Exception ex) {
+        keepTrying += TwitterErrorHandler.handleTwitterError(client, ex);
+      }
     }
-
+    while (curser != 0 && keepTrying < provider.getConfig().getRetryMax() && count < provider.getConfig().getMaxItems());
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProviderUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProviderUtil.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProviderUtil.java
index d9f4ec2..48666cb 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProviderUtil.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProviderUtil.java
@@ -16,28 +16,34 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.twitter.provider;
 
 import org.apache.streams.twitter.TwitterConfiguration;
 
 /**
- * Created by sblackmon on 7/26/15.
+ * TwitterProviderUtil contains utilities for Twitter Providers.
  */
 public class TwitterProviderUtil {
 
-    public static String baseUrl(TwitterConfiguration config) {
+  /**
+   * baseUrl from TwitterConfiguration.
+   * @param config TwitterConfiguration
+   * @return baseUrl
+   */
+  public static String baseUrl(TwitterConfiguration config) {
 
-        String baseUrl = new StringBuilder()
-                .append(config.getProtocol())
-                .append("://")
-                .append(config.getHost())
-                .append(":")
-                .append(config.getPort())
-                .append("/")
-                .append(config.getVersion())
-                .append("/")
-                .toString();
+    String baseUrl = new StringBuilder()
+        .append(config.getProtocol())
+        .append("://")
+        .append(config.getHost())
+        .append(":")
+        .append(config.getPort())
+        .append("/")
+        .append(config.getVersion())
+        .append("/")
+        .toString();
 
-        return baseUrl;
-    }
+    return baseUrl;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamHelper.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamHelper.java
new file mode 100644
index 0000000..a4562ef
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamHelper.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.provider;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
+import org.apache.streams.util.ComponentUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.twitter.hbc.core.processor.StringDelimitedProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * TwitterStreamHelper helps with hosebird twitter stream.
+ */
+public class TwitterStreamHelper extends StringDelimitedProcessor {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TwitterStreamHelper.class);
+  private static final int DEFAULT_POOL_SIZE = 5;
+
+  private static final TwitterDocumentClassifier TWITTER_DOCUMENT_CLASSIFIER = new TwitterDocumentClassifier();
+
+  private final TwitterStreamProvider provider;
+  private final ExecutorService service;
+
+  public TwitterStreamHelper(TwitterStreamProvider provider) {
+    this(provider, DEFAULT_POOL_SIZE);
+  }
+
+  /**
+   * TwitterStreamHelper constructor.
+   * @param provider TwitterStreamProvider
+   * @param poolSize poolSize
+   */
+  public TwitterStreamHelper(TwitterStreamProvider provider, int poolSize) {
+    //We are only going to use the Hosebird processor to manage the extraction of the tweets from the Stream
+    super(null);
+    service = Executors.newFixedThreadPool(poolSize);
+    this.provider = provider;
+  }
+
+  @Override
+  public boolean process() throws IOException, InterruptedException {
+    String msg;
+    do {
+      msg = this.processNextMessage();
+      if (msg == null) {
+        Thread.sleep(10);
+      }
+    }
+    while (msg == null);
+
+    //Deserializing to an ObjectNode can take time.  Parallelize the task to improve throughput
+    return provider.addDatum(service.submit(new StreamDeserializer(msg)));
+  }
+
+  public void cleanUp() {
+    ComponentUtils.shutdownExecutor(service, 1, 30);
+  }
+
+  protected static class StreamDeserializer implements Callable<List<StreamsDatum>> {
+
+    protected static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    protected String item;
+
+    public StreamDeserializer(String item) {
+      this.item = item;
+    }
+
+    @Override
+    public List<StreamsDatum> call() throws Exception {
+      if (item != null) {
+        Class itemClass = TWITTER_DOCUMENT_CLASSIFIER.detectClasses(item).get(0);
+        Object document = mapper.readValue(item, itemClass);
+        StreamsDatum rawDatum = new StreamsDatum(document);
+        return Lists.newArrayList(rawDatum);
+      }
+      return new ArrayList<>();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
deleted file mode 100644
index 96df67b..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.provider;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import com.twitter.hbc.core.processor.StringDelimitedProcessor;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.util.ComponentUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- *
- */
-public class TwitterStreamProcessor extends StringDelimitedProcessor {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProcessor.class);
-    private static final int DEFAULT_POOL_SIZE = 5;
-
-    private final TwitterStreamProvider provider;
-    private final ExecutorService service;
-
-    public TwitterStreamProcessor(TwitterStreamProvider provider) {
-        this(provider, DEFAULT_POOL_SIZE);
-    }
-
-    public TwitterStreamProcessor(TwitterStreamProvider provider, int poolSize) {
-        //We are only going to use the Hosebird processor to manage the extraction of the tweets from the Stream
-        super(null);
-        service = Executors.newFixedThreadPool(poolSize);
-        this.provider = provider;
-    }
-
-
-    @Override
-    public boolean process() throws IOException, InterruptedException {
-        String msg;
-        do {
-            msg = this.processNextMessage();
-            if(msg == null) {
-                Thread.sleep(10);
-            }
-        } while(msg == null);
-
-        //Deserializing to an ObjectNode can take time.  Parallelize the task to improve throughput
-        return provider.addDatum(service.submit(new StreamDeserializer(msg)));
-    }
-
-    public void cleanUp() {
-        ComponentUtils.shutdownExecutor(service, 1, 30);
-    }
-
-    protected static class StreamDeserializer implements Callable<List<StreamsDatum>> {
-
-        protected static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-        protected String item;
-
-        public StreamDeserializer(String item) {
-            this.item = item;
-        }
-
-        @Override
-        public List<StreamsDatum> call() throws Exception {
-            if(item != null) {
-                Class itemClass = TwitterEventClassifier.detectClass(item);
-                Object document = mapper.readValue(item, itemClass);
-                StreamsDatum rawDatum = new StreamsDatum(document);
-                return Lists.newArrayList(rawDatum);
-            }
-            return new ArrayList<>();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index 3856935..1895ee2 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -18,6 +18,20 @@
 
 package org.apache.streams.twitter.provider;
 
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatus;
+import org.apache.streams.core.DatumStatusCountable;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.util.ComponentUtils;
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
@@ -41,19 +55,6 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
 import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.DatumStatus;
-import org.apache.streams.core.DatumStatusCountable;
-import org.apache.streams.core.DatumStatusCounter;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.TwitterStreamConfiguration;
-import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
-import org.apache.streams.util.ComponentUtils;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -84,269 +85,282 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public class TwitterStreamProvider implements StreamsProvider, Serializable, DatumStatusCountable {
 
-    public final static String STREAMS_ID = "TwitterStreamProvider";
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProvider.class);
-
-    public static void main(String[] args) {
-
-        Preconditions.checkArgument(args.length >= 2);
-
-        String configfile = args[0];
-        String outfile = args[1];
-
-        Config reference = ConfigFactory.load();
-        File conf_file = new File(configfile);
-        assert(conf_file.exists());
-        Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-
-        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
-
-        StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
-        TwitterStreamConfiguration config = new ComponentConfigurator<>(TwitterStreamConfiguration.class).detectConfiguration(typesafe, "twitter");
-        TwitterStreamProvider provider = new TwitterStreamProvider(config);
-
-        ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
-
-        PrintStream outStream = null;
+  public static final String STREAMS_ID = "TwitterStreamProvider";
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProvider.class);
+
+  /**
+   * To use from command line:
+   *
+   * <p/>
+   * Supply (at least) the following required configuration in application.conf:
+   *
+   * <p/>
+   * twitter.oauth.consumerKey
+   * twitter.oauth.consumerSecret
+   * twitter.oauth.accessToken
+   * twitter.oauth.accessTokenSecret
+   *
+   * <p/>
+   * Launch using:
+   *
+   * <p/>
+   * mvn exec:java -Dexec.mainClass=org.apache.streams.twitter.provider.TwitterStreamProvider -Dexec.args="application.conf tweets.json"
+   *
+   * @param args
+   */
+  public static void main(String[] args) {
+
+    Preconditions.checkArgument(args.length >= 2);
+
+    String configfile = args[0];
+    String outfile = args[1];
+
+    Config reference = ConfigFactory.load();
+    File file = new File(configfile);
+    assert (file.exists());
+    Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+
+    StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+    TwitterStreamConfiguration config = new ComponentConfigurator<>(TwitterStreamConfiguration.class).detectConfiguration(typesafe, "twitter");
+    TwitterStreamProvider provider = new TwitterStreamProvider(config);
+
+    ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+
+    PrintStream outStream = null;
+    try {
+      outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+    } catch (FileNotFoundException ex) {
+      LOGGER.error("FileNotFoundException", ex);
+      return;
+    }
+    provider.prepare(config);
+    provider.startStream();
+    do {
+      Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+      Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+      while (iterator.hasNext()) {
+        StreamsDatum datum = iterator.next();
+        String json;
         try {
-            outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
-        } catch (FileNotFoundException e) {
-            LOGGER.error("FileNotFoundException", e);
-            return;
+          json = mapper.writeValueAsString(datum.getDocument());
+          outStream.println(json);
+        } catch (JsonProcessingException ex) {
+          System.err.println(ex.getMessage());
         }
-        provider.prepare(config);
-        provider.startStream();
-        do {
-            Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
-            Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
-            while(iterator.hasNext()) {
-                StreamsDatum datum = iterator.next();
-                String json;
-                try {
-                    json = mapper.writeValueAsString(datum.getDocument());
-                    outStream.println(json);
-                } catch (JsonProcessingException e) {
-                    System.err.println(e.getMessage());
-                }
-            }
-        } while( provider.isRunning());
-        provider.cleanUp();
-        outStream.flush();
+      }
     }
-
-    public static final int MAX_BATCH = 1000;
-
-    private TwitterStreamConfiguration config;
-
-    public TwitterStreamConfiguration getConfig() {
-        return config;
+    while ( provider.isRunning());
+    provider.cleanUp();
+    outStream.flush();
+  }
+
+  public static final int MAX_BATCH = 1000;
+
+  private TwitterStreamConfiguration config;
+
+  public TwitterStreamConfiguration getConfig() {
+    return config;
+  }
+
+  public void setConfig(TwitterStreamConfiguration config) {
+    this.config = config;
+  }
+
+  protected volatile Queue<Future<List<StreamsDatum>>> providerQueue;
+
+  protected Hosts hosebirdHosts;
+  protected Authentication auth;
+  protected StreamingEndpoint endpoint;
+  protected BasicClient client;
+  protected AtomicBoolean running = new AtomicBoolean(false);
+  protected TwitterStreamHelper processor = new TwitterStreamHelper(this);
+  private DatumStatusCounter countersCurrent = new DatumStatusCounter();
+  private DatumStatusCounter countersTotal = new DatumStatusCounter();
+
+  public TwitterStreamProvider() {
+    this.config = new ComponentConfigurator<>(TwitterStreamConfiguration.class).detectConfiguration(StreamsConfigurator.config, "twitter");
+  }
+
+  public TwitterStreamProvider(TwitterStreamConfiguration config) {
+    this.config = config;
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public void startStream() {
+    client.connect();
+    running.set(true);
+  }
+
+  @Override
+  public synchronized StreamsResultSet readCurrent() {
+
+    StreamsResultSet current;
+    synchronized (this) {
+      Queue<StreamsDatum> drain = Queues.newLinkedBlockingDeque();
+      drainTo(drain);
+      current = new StreamsResultSet(drain);
+      current.setCounter(new DatumStatusCounter());
+      current.getCounter().add(countersCurrent);
+      countersTotal.add(countersCurrent);
+      countersCurrent = new DatumStatusCounter();
     }
 
-    public void setConfig(TwitterStreamConfiguration config) {
-        this.config = config;
-    }
+    return current;
+  }
 
-    protected volatile Queue<Future<List<StreamsDatum>>> providerQueue;
-
-    protected Hosts hosebirdHosts;
-    protected Authentication auth;
-    protected StreamingEndpoint endpoint;
-    protected BasicClient client;
-    protected AtomicBoolean running = new AtomicBoolean(false);
-    protected TwitterStreamProcessor processor = new TwitterStreamProcessor(this);
-    private DatumStatusCounter countersCurrent = new DatumStatusCounter();
-    private DatumStatusCounter countersTotal = new DatumStatusCounter();
-
-    private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
-        return new ThreadPoolExecutor(nThreads, nThreads,
-                5000L, TimeUnit.MILLISECONDS,
-                new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
-    }
+  @Override
+  public StreamsResultSet readNew(BigInteger sequence) {
+    throw new NotImplementedException();
+  }
 
-    public TwitterStreamProvider() {
-        this.config = new ComponentConfigurator<>(TwitterStreamConfiguration.class).detectConfiguration(StreamsConfigurator.config, "twitter");
-    }
+  @Override
+  public StreamsResultSet readRange(DateTime start, DateTime end)  {
+    throw new NotImplementedException();
+  }
 
-    public TwitterStreamProvider(TwitterStreamConfiguration config) {
-        this.config = config;
-    }
+  @Override
+  public boolean isRunning() {
+    return this.running.get() && !client.isDone();
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  @Override
+  public void prepare(Object configurationObject) {
 
-    @Override
-    public void startStream() {
-        client.connect();
-        running.set(true);
-    }
+    Preconditions.checkNotNull(config.getEndpoint());
 
-    @Override
-    public synchronized StreamsResultSet readCurrent() {
-
-        StreamsResultSet current;
-        synchronized(this) {
-            Queue<StreamsDatum> drain = Queues.newLinkedBlockingDeque();
-            drainTo(drain);
-            current = new StreamsResultSet(drain);
-            current.setCounter(new DatumStatusCounter());
-            current.getCounter().add(countersCurrent);
-            countersTotal.add(countersCurrent);
-            countersCurrent = new DatumStatusCounter();
-        }
+    if (config.getEndpoint().equals("userstream") ) {
 
-        return current;
-    }
+      hosebirdHosts = new HttpHosts(Constants.USERSTREAM_HOST);
 
-    @Override
-    public StreamsResultSet readNew(BigInteger sequence) {
-        throw new NotImplementedException();
-    }
+      UserstreamEndpoint userstreamEndpoint = new UserstreamEndpoint();
+      userstreamEndpoint.withFollowings(true);
+      userstreamEndpoint.withUser(false);
+      userstreamEndpoint.allReplies(false);
+      endpoint = userstreamEndpoint;
+    } else if (config.getEndpoint().equals("sample") ) {
 
-    @Override
-    public StreamsResultSet readRange(DateTime start, DateTime end)  {
-        throw new NotImplementedException();
-    }
-
-    @Override
-    public boolean isRunning() {
-        return this.running.get() && !client.isDone();
-    }
+      hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
 
-    @Override
-    public void prepare(Object o) {
+      boolean track = config.getTrack() != null && !config.getTrack().isEmpty();
+      boolean follow = config.getFollow() != null && !config.getFollow().isEmpty();
 
-        Preconditions.checkNotNull(config.getEndpoint());
-
-        if(config.getEndpoint().equals("userstream") ) {
-
-            hosebirdHosts = new HttpHosts(Constants.USERSTREAM_HOST);
-
-            UserstreamEndpoint userstreamEndpoint = new UserstreamEndpoint();
-            userstreamEndpoint.withFollowings(true);
-            userstreamEndpoint.withUser(false);
-            userstreamEndpoint.allReplies(false);
-            endpoint = userstreamEndpoint;
+      if ( track || follow ) {
+        LOGGER.debug("***\tPRESENT\t***");
+        StatusesFilterEndpoint statusesFilterEndpoint = new StatusesFilterEndpoint();
+        if ( track ) {
+          statusesFilterEndpoint.trackTerms(config.getTrack());
         }
-        else if(config.getEndpoint().equals("sample") ) {
-
-            hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
-
-            boolean track = config.getTrack() != null && !config.getTrack().isEmpty();
-            boolean follow = config.getFollow() != null && !config.getFollow().isEmpty();
-
-            if( track || follow ) {
-                LOGGER.debug("***\tPRESENT\t***");
-                StatusesFilterEndpoint statusesFilterEndpoint = new StatusesFilterEndpoint();
-                if( track ) {
-                    statusesFilterEndpoint.trackTerms(config.getTrack());
-                }
-                if( follow ) {
-                    statusesFilterEndpoint.followings(config.getFollow());
-                }
-                this.endpoint = statusesFilterEndpoint;
-            } else {
-                endpoint = new StatusesSampleEndpoint();
-            }
-
+        if ( follow ) {
+          statusesFilterEndpoint.followings(config.getFollow());
         }
-        else if( config.getEndpoint().endsWith("firehose")) {
-            hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
-            endpoint = new StatusesFirehoseEndpoint();
-        } else {
-            LOGGER.error("NO ENDPOINT RESOLVED");
-            return;
-        }
-
-        if( config.getBasicauth() != null ) {
-
-            Preconditions.checkNotNull(config.getBasicauth().getUsername());
-            Preconditions.checkNotNull(config.getBasicauth().getPassword());
-
-            auth = new BasicAuth(
-                    config.getBasicauth().getUsername(),
-                    config.getBasicauth().getPassword()
-            );
-
-        } else if( config.getOauth() != null ) {
-
-            Preconditions.checkNotNull(config.getOauth().getConsumerKey());
-            Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
-            Preconditions.checkNotNull(config.getOauth().getAccessToken());
-            Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
-
-            auth = new OAuth1(config.getOauth().getConsumerKey(),
-                    config.getOauth().getConsumerSecret(),
-                    config.getOauth().getAccessToken(),
-                    config.getOauth().getAccessTokenSecret());
+        this.endpoint = statusesFilterEndpoint;
+      } else {
+        endpoint = new StatusesSampleEndpoint();
+      }
+
+    } else if ( config.getEndpoint().endsWith("firehose")) {
+      hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
+      endpoint = new StatusesFirehoseEndpoint();
+    } else {
+      LOGGER.error("NO ENDPOINT RESOLVED");
+      return;
+    }
 
-        } else {
-            LOGGER.error("NO AUTH RESOLVED");
-            return;
-        }
+    if ( config.getBasicauth() != null ) {
 
-        LOGGER.debug("host={}\tendpoint={}\taut={}", hosebirdHosts, endpoint, auth);
+      Preconditions.checkNotNull(config.getBasicauth().getUsername());
+      Preconditions.checkNotNull(config.getBasicauth().getPassword());
 
-        providerQueue = new LinkedBlockingQueue<>(MAX_BATCH);
+      auth = new BasicAuth(
+          config.getBasicauth().getUsername(),
+          config.getBasicauth().getPassword()
+      );
 
-        client = new ClientBuilder()
-            .name("apache/streams/streams-contrib/streams-provider-twitter")
-            .hosts(hosebirdHosts)
-            .endpoint(endpoint)
-            .authentication(auth)
-            .connectionTimeout(1200000)
-            .processor(processor)
-            .build();
+    } else if ( config.getOauth() != null ) {
 
-    }
+      Preconditions.checkNotNull(config.getOauth().getConsumerKey());
+      Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
+      Preconditions.checkNotNull(config.getOauth().getAccessToken());
+      Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
 
-    @Override
-    public void cleanUp() {
-        this.client.stop();
-        this.processor.cleanUp();
-        this.running.set(false);
-    }
+      auth = new OAuth1(config.getOauth().getConsumerKey(),
+          config.getOauth().getConsumerSecret(),
+          config.getOauth().getAccessToken(),
+          config.getOauth().getAccessTokenSecret());
 
-    @Override
-    public DatumStatusCounter getDatumStatusCounter() {
-        return countersTotal;
+    } else {
+      LOGGER.error("NO AUTH RESOLVED");
+      return;
     }
 
-    protected boolean addDatum(Future<List<StreamsDatum>> future) {
-        try {
-            ComponentUtils.offerUntilSuccess(future, providerQueue);
-            countersCurrent.incrementStatus(DatumStatus.SUCCESS);
-            return true;
-        } catch (Exception e) {
-            countersCurrent.incrementStatus(DatumStatus.FAIL);
-            LOGGER.warn("Unable to enqueue item from Twitter stream");
-            return false;
-        }
+    LOGGER.debug("host={}\tendpoint={}\taut={}", hosebirdHosts, endpoint, auth);
+
+    providerQueue = new LinkedBlockingQueue<>(MAX_BATCH);
+
+    client = new ClientBuilder()
+        .name("apache/streams/streams-contrib/streams-provider-twitter")
+        .hosts(hosebirdHosts)
+        .endpoint(endpoint)
+        .authentication(auth)
+        .connectionTimeout(1200000)
+        .processor(processor)
+        .build();
+
+  }
+
+  @Override
+  public void cleanUp() {
+    this.client.stop();
+    this.processor.cleanUp();
+    this.running.set(false);
+  }
+
+  @Override
+  public DatumStatusCounter getDatumStatusCounter() {
+    return countersTotal;
+  }
+
+  protected boolean addDatum(Future<List<StreamsDatum>> future) {
+    try {
+      ComponentUtils.offerUntilSuccess(future, providerQueue);
+      countersCurrent.incrementStatus(DatumStatus.SUCCESS);
+      return true;
+    } catch (Exception ex) {
+      countersCurrent.incrementStatus(DatumStatus.FAIL);
+      LOGGER.warn("Unable to enqueue item from Twitter stream");
+      return false;
     }
-
-    protected void drainTo(Queue<StreamsDatum> drain) {
-        int count = 0;
-        while(!providerQueue.isEmpty() && count <= MAX_BATCH) {
-            for(StreamsDatum datum : pollForDatum()) {
-                ComponentUtils.offerUntilSuccess(datum, drain);
-                count++;
-            }
-        }
+  }
+
+  protected void drainTo(Queue<StreamsDatum> drain) {
+    int count = 0;
+    while (!providerQueue.isEmpty() && count <= MAX_BATCH) {
+      for (StreamsDatum datum : pollForDatum()) {
+        ComponentUtils.offerUntilSuccess(datum, drain);
+        count++;
+      }
     }
-
-    protected List<StreamsDatum> pollForDatum()  {
-        try {
-            return providerQueue.poll().get();
-        } catch (InterruptedException e) {
-            LOGGER.warn("Interrupted while waiting for future.  Initiate shutdown.");
-            this.cleanUp();
-            Thread.currentThread().interrupt();
-            return new ArrayList<>();
-        } catch (ExecutionException e) {
-            LOGGER.warn("Error getting tweet from future");
-            return new ArrayList<>();
-        }
+  }
+
+  protected List<StreamsDatum> pollForDatum()  {
+    try {
+      return providerQueue.poll().get();
+    } catch (InterruptedException ex) {
+      LOGGER.warn("Interrupted while waiting for future.  Initiate shutdown.");
+      this.cleanUp();
+      Thread.currentThread().interrupt();
+      return new ArrayList<>();
+    } catch (ExecutionException ex) {
+      LOGGER.warn("Error getting tweet from future");
+      return new ArrayList<>();
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index cea9829..7461356 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -18,6 +18,17 @@
 
 package org.apache.streams.twitter.provider;
 
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
@@ -31,17 +42,6 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
 import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.DatumStatusCounter;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.TwitterUserInformationConfiguration;
-import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
-import org.apache.streams.util.ComponentUtils;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -65,7 +65,6 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -75,320 +74,335 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
 
 /**
- *  Retrieve recent posts from a list of user ids or names.
- *
- *  To use from command line:
- *
- *  Supply (at least) the following required configuration in application.conf:
- *
- *  twitter.oauth.consumerKey
- *  twitter.oauth.consumerSecret
- *  twitter.oauth.accessToken
- *  twitter.oauth.accessTokenSecret
- *  twitter.info
- *
- *  Launch using:
- *
- *  mvn exec:java -Dexec.mainClass=org.apache.streams.twitter.provider.TwitterTimelineProvider -Dexec.args="application.conf tweets.json"
+ * Retrieve recent posts from a list of user ids or names.
  */
 public class TwitterTimelineProvider implements StreamsProvider, Serializable {
 
-    public final static String STREAMS_ID = "TwitterTimelineProvider";
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProvider.class);
-
-    public static final int MAX_NUMBER_WAITING = 10000;
-
-    private TwitterUserInformationConfiguration config;
-
-    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
-    public TwitterUserInformationConfiguration getConfig() {
-        return config;
-    }
-
-    public void setConfig(TwitterUserInformationConfiguration config) {
-        this.config = config;
-    }
-
-    protected Collection<String[]> screenNameBatches;
-    protected Collection<Long> ids;
+  public static final String STREAMS_ID = "TwitterTimelineProvider";
 
-    protected volatile Queue<StreamsDatum> providerQueue;
+  private static final Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProvider.class);
 
-    protected int idsCount;
-    protected Twitter client;
+  public static final int MAX_NUMBER_WAITING = 10000;
 
-    protected ListeningExecutorService executor;
+  private TwitterUserInformationConfiguration config;
 
-    protected DateTime start;
-    protected DateTime end;
+  protected final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-    protected final AtomicBoolean running = new AtomicBoolean();
+  public TwitterUserInformationConfiguration getConfig() {
+    return config;
+  }
 
-    List<ListenableFuture<Object>> futures = new ArrayList<>();
+  public void setConfig(TwitterUserInformationConfiguration config) {
+    this.config = config;
+  }
 
-    Boolean jsonStoreEnabled;
-    Boolean includeEntitiesEnabled;
+  protected Collection<String[]> screenNameBatches;
+  protected Collection<Long> ids;
 
-    public static void main(String[] args) throws Exception {
+  protected volatile Queue<StreamsDatum> providerQueue;
 
-        Preconditions.checkArgument(args.length >= 2);
+  protected int idsCount;
+  protected Twitter client;
 
-        String configfile = args[0];
-        String outfile = args[1];
+  protected ListeningExecutorService executor;
 
-        Config reference = ConfigFactory.load();
-        File conf_file = new File(configfile);
-        assert(conf_file.exists());
-        Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+  protected DateTime start;
+  protected DateTime end;
 
-        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+  protected final AtomicBoolean running = new AtomicBoolean();
 
-        StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
-        TwitterUserInformationConfiguration config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe, "twitter");
-        TwitterTimelineProvider provider = new TwitterTimelineProvider(config);
+  List<ListenableFuture<Object>> futures = new ArrayList<>();
 
-        ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+  Boolean jsonStoreEnabled;
+  Boolean includeEntitiesEnabled;
 
-        PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
-        provider.prepare(config);
-        provider.startStream();
-        do {
-            Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
-            Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
-            while(iterator.hasNext()) {
-                StreamsDatum datum = iterator.next();
-                String json;
-                try {
-                    json = mapper.writeValueAsString(datum.getDocument());
-                    outStream.println(json);
-                } catch (JsonProcessingException e) {
-                    System.err.println(e.getMessage());
-                }
-            }
-        } while( provider.isRunning());
-        provider.cleanUp();
-        outStream.flush();
-    }
+  /**
+   * To use from command line:
+   *
+   * <p/>
+   * Supply (at least) the following required configuration in application.conf:
+   *
+   * <p/>
+   * twitter.oauth.consumerKey
+   * twitter.oauth.consumerSecret
+   * twitter.oauth.accessToken
+   * twitter.oauth.accessTokenSecret
+   * twitter.info
+   *
+   * <p/>
+   * Launch using:
+   *
+   * <p/>
+   * mvn exec:java -Dexec.mainClass=org.apache.streams.twitter.provider.TwitterTimelineProvider -Dexec.args="application.conf tweets.json"
+   *
+   * @param args args
+   * @throws Exception Exception
+   */
+  public static void main(String[] args) throws Exception {
 
-    public TwitterTimelineProvider(TwitterUserInformationConfiguration config) {
-        this.config = config;
-    }
+    Preconditions.checkArgument(args.length >= 2);
 
-    public Queue<StreamsDatum> getProviderQueue() {
-        return this.providerQueue;
-    }
+    String configfile = args[0];
+    String outfile = args[1];
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+    Config reference = ConfigFactory.load();
+    File file = new File(configfile);
+    assert (file.exists());
+    Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false));
 
-    @Override
-    public void prepare(Object o) {
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
 
+    StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+    TwitterUserInformationConfiguration config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe, "twitter");
+    TwitterTimelineProvider provider = new TwitterTimelineProvider(config);
 
+    ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
 
+    PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+    provider.prepare(config);
+    provider.startStream();
+    do {
+      Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+      Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+      while (iterator.hasNext()) {
+        StreamsDatum datum = iterator.next();
+        String json;
         try {
-            lock.writeLock().lock();
-            providerQueue = constructQueue();
-        } finally {
-            lock.writeLock().unlock();
+          json = mapper.writeValueAsString(datum.getDocument());
+          outStream.println(json);
+        } catch (JsonProcessingException ex) {
+          System.err.println(ex.getMessage());
         }
-
-        Preconditions.checkNotNull(providerQueue);
-        Preconditions.checkNotNull(config.getOauth().getConsumerKey());
-        Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
-        Preconditions.checkNotNull(config.getOauth().getAccessToken());
-        Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
-        Preconditions.checkNotNull(config.getInfo());
-
-        consolidateToIDs();
-
-        if(ids.size() > 1)
-            executor = MoreExecutors.listeningDecorator(TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(5, ids.size()));
-        else
-            executor = MoreExecutors.listeningDecorator(newSingleThreadExecutor());
+      }
+    }
+    while ( provider.isRunning() );
+    provider.cleanUp();
+    outStream.flush();
+  }
+
+  public TwitterTimelineProvider(TwitterUserInformationConfiguration config) {
+    this.config = config;
+  }
+
+  public Queue<StreamsDatum> getProviderQueue() {
+    return this.providerQueue;
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+
+    try {
+      lock.writeLock().lock();
+      providerQueue = constructQueue();
+    } finally {
+      lock.writeLock().unlock();
     }
 
-    @Override
-    public void startStream() {
+    Preconditions.checkNotNull(providerQueue);
+    Preconditions.checkNotNull(config.getOauth().getConsumerKey());
+    Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
+    Preconditions.checkNotNull(config.getOauth().getAccessToken());
+    Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
+    Preconditions.checkNotNull(config.getInfo());
 
-        LOGGER.debug("{} startStream", STREAMS_ID);
+    consolidateToIDs();
 
-        Preconditions.checkArgument(!ids.isEmpty());
+    if (ids.size() > 1) {
+      executor = MoreExecutors.listeningDecorator(TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(5, ids.size()));
+    } else {
+      executor = MoreExecutors.listeningDecorator(newSingleThreadExecutor());
+    }
+  }
 
-        running.set(true);
+  @Override
+  public void startStream() {
 
-        submitTimelineThreads(ids.toArray(new Long[0]));
+    LOGGER.debug("{} startStream", STREAMS_ID);
 
-        executor.shutdown();
+    Preconditions.checkArgument(!ids.isEmpty());
 
-    }
+    running.set(true);
 
-    public boolean shouldContinuePulling(List<Status> statuses) {
-        return (statuses != null) && (statuses.size() > 0);
-    }
+    submitTimelineThreads(ids.toArray(new Long[0]));
 
-    protected void submitTimelineThreads(Long[] ids) {
+    executor.shutdown();
 
-        Twitter client = getTwitterClient();
+  }
 
-        for(int i = 0; i < ids.length; i++) {
+  public boolean shouldContinuePulling(List<Status> statuses) {
+    return (statuses != null) && (statuses.size() > 0);
+  }
 
-            TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, ids[i]);
-            ListenableFuture future = executor.submit(providerTask);
-            futures.add(future);
-            LOGGER.info("submitted {}", ids[i]);
-        }
+  protected void submitTimelineThreads(Long[] ids) {
 
-    }
+    Twitter client = getTwitterClient();
 
-    private Collection<Long> retrieveIds(String[] screenNames) {
-        Twitter client = getTwitterClient();
+    for (int i = 0; i < ids.length; i++) {
 
-        List<Long> ids = Lists.newArrayList();
-        try {
-            for (User tStat : client.lookupUsers(screenNames)) {
-                ids.add(tStat.getId());
-            }
-        } catch (TwitterException e) {
-            LOGGER.error("Failure retrieving user details.", e.getMessage());
-        }
-        return ids;
+      TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, ids[i]);
+      ListenableFuture future = executor.submit(providerTask);
+      futures.add(future);
+      LOGGER.info("submitted {}", ids[i]);
     }
 
-    public StreamsResultSet readCurrent() {
+  }
 
-        StreamsResultSet result;
+  private Collection<Long> retrieveIds(String[] screenNames) {
+    Twitter client = getTwitterClient();
 
-        LOGGER.debug("Providing {} docs", providerQueue.size());
+    List<Long> ids = Lists.newArrayList();
+    try {
+      for (User twitterUser : client.lookupUsers(screenNames)) {
+        ids.add(twitterUser.getId());
+      }
+    } catch (TwitterException ex) {
+      LOGGER.error("Failure retrieving user details.", ex.getMessage());
+    }
+    return ids;
+  }
 
-        try {
-            lock.writeLock().lock();
-            result = new StreamsResultSet(providerQueue);
-            result.setCounter(new DatumStatusCounter());
-            providerQueue = constructQueue();
-        } finally {
-            lock.writeLock().unlock();
-        }
+  @Override
+  public StreamsResultSet readCurrent() {
 
-        if( result.size() == 0 && providerQueue.isEmpty() && executor.isTerminated() ) {
-            LOGGER.info("Finished.  Cleaning up...");
+    StreamsResultSet result;
 
-            running.set(false);
+    LOGGER.debug("Providing {} docs", providerQueue.size());
 
-            LOGGER.info("Exiting");
-        }
+    try {
+      lock.writeLock().lock();
+      result = new StreamsResultSet(providerQueue);
+      result.setCounter(new DatumStatusCounter());
+      providerQueue = constructQueue();
+    } finally {
+      lock.writeLock().unlock();
+    }
 
-        return result;
+    if ( result.size() == 0 && providerQueue.isEmpty() && executor.isTerminated() ) {
+      LOGGER.info("Finished.  Cleaning up...");
 
-    }
+      running.set(false);
 
-    protected Queue<StreamsDatum> constructQueue() {
-        return new LinkedBlockingQueue<StreamsDatum>();
+      LOGGER.info("Exiting");
     }
 
-    public StreamsResultSet readNew(BigInteger sequence) {
-        LOGGER.debug("{} readNew", STREAMS_ID);
-        throw new NotImplementedException();
-    }
+    return result;
 
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        LOGGER.debug("{} readRange", STREAMS_ID);
-        throw new NotImplementedException();
-    }
+  }
 
+  protected Queue<StreamsDatum> constructQueue() {
+    return new LinkedBlockingQueue<StreamsDatum>();
+  }
 
+  public StreamsResultSet readNew(BigInteger sequence) {
+    LOGGER.debug("{} readNew", STREAMS_ID);
+    throw new NotImplementedException();
+  }
 
-    /**
-     * Using the "info" list that is contained in the configuration, ensure that all
-     * account identifiers are converted to IDs (Longs) instead of screenNames (Strings)
-     */
-    protected void consolidateToIDs() {
-        List<String> screenNames = Lists.newArrayList();
-        ids = Lists.newArrayList();
-
-        for(String account : config.getInfo()) {
-            try {
-                if (new Long(account) != null) {
-                    ids.add(Long.parseLong(Objects.toString(account, null)));
-                } else {
-                    screenNames.add(account);
-                }
-            } catch (Exception e) {
-                LOGGER.error("Exception while trying to add ID: {{}}, {}", account, e);
-            }
-        }
+  public StreamsResultSet readRange(DateTime start, DateTime end) {
+    LOGGER.debug("{} readRange", STREAMS_ID);
+    throw new NotImplementedException();
+  }
 
-        // Twitter allows for batches up to 100 per request, but you cannot mix types
-        screenNameBatches = new ArrayList<String[]>();
-        while(screenNames.size() >= 100) {
-            screenNameBatches.add(screenNames.subList(0, 100).toArray(new String[0]));
-            screenNames = screenNames.subList(100, screenNames.size());
-        }
 
-        if(screenNames.size() > 0)
-            screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
 
-        Iterator<String[]> screenNameBatchIterator = screenNameBatches.iterator();
+  /**
+   * Using the "info" list that is contained in the configuration, ensure that all
+   * account identifiers are converted to IDs (Longs) instead of screenNames (Strings).
+   */
+  protected void consolidateToIDs() {
+    List<String> screenNames = Lists.newArrayList();
+    ids = Lists.newArrayList();
 
-        while(screenNameBatchIterator.hasNext()) {
-            Collection<Long> batchIds = retrieveIds(screenNameBatchIterator.next());
-            ids.addAll(batchIds);
+    for (String account : config.getInfo()) {
+      try {
+        if (new Long(account) != null) {
+          ids.add(Long.parseLong(Objects.toString(account, null)));
+        } else {
+          screenNames.add(account);
         }
+      } catch (Exception ex) {
+        LOGGER.error("Exception while trying to add ID: {{}}, {}", account, ex);
+      }
     }
 
-    public Twitter getTwitterClient() {
-
-        String baseUrl = TwitterProviderUtil.baseUrl(config);
-
-        ConfigurationBuilder builder = new ConfigurationBuilder()
-                .setOAuthConsumerKey(config.getOauth().getConsumerKey())
-                .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
-                .setOAuthAccessToken(config.getOauth().getAccessToken())
-                .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
-                .setIncludeEntitiesEnabled(true)
-                .setJSONStoreEnabled(true)
-                .setAsyncNumThreads(3)
-                .setRestBaseURL(baseUrl)
-                .setIncludeMyRetweetEnabled(Boolean.TRUE)
-                .setPrettyDebugEnabled(Boolean.TRUE);
-
-        return new TwitterFactory(builder.build()).getInstance();
+    // Twitter allows for batches up to 100 per request, but you cannot mix types
+    screenNameBatches = new ArrayList<String[]>();
+    while (screenNames.size() >= 100) {
+      screenNameBatches.add(screenNames.subList(0, 100).toArray(new String[0]));
+      screenNames = screenNames.subList(100, screenNames.size());
     }
 
-    @Override
-    public void cleanUp() {
-        shutdownAndAwaitTermination(executor);
+    if (screenNames.size() > 0) {
+      screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
     }
 
-    void shutdownAndAwaitTermination(ExecutorService pool) {
-        pool.shutdown(); // Disable new tasks from being submitted
-        try {
-            // Wait a while for existing tasks to terminate
-            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-                pool.shutdownNow(); // Cancel currently executing tasks
-                // Wait a while for tasks to respond to being cancelled
-                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
-                    System.err.println("Pool did not terminate");
-            }
-        } catch (InterruptedException ie) {
-            // (Re-)Cancel if current thread also interrupted
-            pool.shutdownNow();
-            // Preserve interrupt status
-            Thread.currentThread().interrupt();
-        }
-    }
+    Iterator<String[]> screenNameBatchIterator = screenNameBatches.iterator();
 
-    @Override
-    public boolean isRunning() {
-        if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) {
-            LOGGER.info("Completed");
-            running.set(false);
-            LOGGER.info("Exiting");
+    while (screenNameBatchIterator.hasNext()) {
+      Collection<Long> batchIds = retrieveIds(screenNameBatchIterator.next());
+      ids.addAll(batchIds);
+    }
+  }
+
+  /**
+   * get Twitter Client from TwitterUserInformationConfiguration.
+   * @return result
+   */
+  public Twitter getTwitterClient() {
+
+    String baseUrl = TwitterProviderUtil.baseUrl(config);
+
+    ConfigurationBuilder builder = new ConfigurationBuilder()
+        .setOAuthConsumerKey(config.getOauth().getConsumerKey())
+        .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
+        .setOAuthAccessToken(config.getOauth().getAccessToken())
+        .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
+        .setIncludeEntitiesEnabled(true)
+        .setJSONStoreEnabled(true)
+        .setAsyncNumThreads(3)
+        .setRestBaseURL(baseUrl)
+        .setIncludeMyRetweetEnabled(Boolean.TRUE)
+        .setPrettyDebugEnabled(Boolean.TRUE);
+
+    return new TwitterFactory(builder.build()).getInstance();
+  }
+
+  @Override
+  public void cleanUp() {
+    shutdownAndAwaitTermination(executor);
+  }
+
+  void shutdownAndAwaitTermination(ExecutorService pool) {
+    pool.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+        pool.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+          System.err.println("Pool did not terminate");
         }
-        return running.get();
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      pool.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  @Override
+  public boolean isRunning() {
+    if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) {
+      LOGGER.info("Completed");
+      running.set(false);
+      LOGGER.info("Exiting");
     }
+    return running.get();
+  }
 }