You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2018/06/11 16:16:36 UTC

[streams] branch STREAMS-605 created (now 66508b0)

This is an automated email from the ASF dual-hosted git repository.

sblackmon pushed a change to branch STREAMS-605
in repository https://gitbox.apache.org/repos/asf/streams.git.


      at 66508b0  resolves STREAMS-605

This branch includes the following new commits:

     new 66508b0  resolves STREAMS-605

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


-- 
To stop receiving notification emails like this one, please contact
sblackmon@apache.org.

[streams] 01/01: resolves STREAMS-605

Posted by sb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sblackmon pushed a commit to branch STREAMS-605
in repository https://gitbox.apache.org/repos/asf/streams.git

commit 66508b0cb3ff198b58a43cc90eeb1299bbcafb57
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
AuthorDate: Mon Jun 11 11:16:25 2018 -0500

    resolves STREAMS-605
---
 .../twitter/provider/SevenDaySearchProvider.java   | 119 +++++++-------
 .../provider/SevenDaySearchProviderTask.java       |  13 +-
 .../twitter/provider/ThirtyDaySearchProvider.java  | 121 +++++++-------
 .../provider/ThirtyDaySearchProviderTask.java      |  14 +-
 .../twitter/provider/TwitterEngagersProvider.java  | 175 +++++++++++----------
 .../provider/TwitterFollowersIdsProviderTask.java  |  18 ++-
 .../provider/TwitterFollowersListProviderTask.java |  20 ++-
 .../twitter/provider/TwitterFollowingProvider.java | 149 +++++++++---------
 .../provider/TwitterFriendsIdsProviderTask.java    |  17 +-
 .../provider/TwitterFriendsListProviderTask.java   |  23 ++-
 .../twitter/provider/TwitterRetweetsTask.java      |   9 +-
 .../twitter/provider/TwitterTimelineProvider.java  |  99 +++++++-----
 .../provider/TwitterTimelineProviderTask.java      |  16 +-
 .../provider/TwitterUserInformationProvider.java   |  60 +++----
 14 files changed, 489 insertions(+), 364 deletions(-)

diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java
index 10a2661..2c76a5f 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java
@@ -25,11 +25,15 @@ import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.core.util.ExecutorUtils;
+import org.apache.streams.core.util.QueueUtils;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.StreamsJacksonMapperConfiguration;
 import org.apache.streams.twitter.config.SevenDaySearchProviderConfiguration;
 import org.apache.streams.twitter.api.SevenDaySearchRequest;
 import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.Tweet;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -54,10 +58,13 @@ import java.io.PrintStream;
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.Queue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -69,7 +76,7 @@ import java.util.stream.Stream;
 /**
  * Retrieve recent posts from a list of user ids or names.
  */
-public class SevenDaySearchProvider implements StreamsProvider, Serializable {
+public class SevenDaySearchProvider implements Callable<Iterator<Tweet>>, StreamsProvider, Serializable {
 
   private static final String STREAMS_ID = "SevenDaySearchProvider";
 
@@ -91,11 +98,14 @@ public class SevenDaySearchProvider implements StreamsProvider, Serializable {
 
   protected Twitter client;
 
-  protected ListeningExecutorService executor;
+  protected ExecutorService executor;
 
-  protected final AtomicBoolean running = new AtomicBoolean();
+  StreamsConfiguration streamsConfiguration;
+
+  private List<Callable<Object>> tasks = new ArrayList<>();
+  private List<Future<Object>> futures = new ArrayList<>();
 
-  private List<ListenableFuture<Object>> futures = new ArrayList<>();
+  protected final AtomicBoolean running = new AtomicBoolean();
 
   /**
    * To use from command line:
@@ -136,26 +146,21 @@ public class SevenDaySearchProvider implements StreamsProvider, Serializable {
     SevenDaySearchProviderConfiguration config = new ComponentConfigurator<>(SevenDaySearchProviderConfiguration.class).detectConfiguration();
     SevenDaySearchProvider provider = new SevenDaySearchProvider(config);
 
-    ObjectMapper mapper = new StreamsJacksonMapper(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList()));
-
     PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
-    provider.prepare(config);
-    provider.startStream();
-    do {
-      Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
-      for (StreamsDatum datum : provider.readCurrent()) {
-        String json;
-        try {
-          json = mapper.writeValueAsString(datum.getDocument());
-          outStream.println(json);
-        } catch (JsonProcessingException ex) {
-          System.err.println(ex.getMessage());
-        }
+    ObjectMapper mapper = StreamsJacksonMapper.getInstance(new StreamsJacksonMapperConfiguration().withDateFormats(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList())));
+
+    Iterator<Tweet> results = provider.call();
+
+    results.forEachRemaining(d -> {
+      try {
+        outStream.println(mapper.writeValueAsString(d));
+      } catch( Exception e ) {
+        LOGGER.warn("Exception", e);
       }
-    }
-    while ( provider.isRunning() );
-    provider.cleanUp();
+    });
+
     outStream.flush();
+
   }
 
   public SevenDaySearchProvider() {
@@ -184,7 +189,7 @@ public class SevenDaySearchProvider implements StreamsProvider, Serializable {
 
     try {
       lock.writeLock().lock();
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
@@ -200,7 +205,7 @@ public class SevenDaySearchProvider implements StreamsProvider, Serializable {
     request = new SevenDaySearchRequest();
     request.setQ(config.getQ());
 
-    StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration();
+    streamsConfiguration = StreamsConfigurator.detectConfiguration();
 
     try {
       client = getTwitterClient();
@@ -211,7 +216,7 @@ public class SevenDaySearchProvider implements StreamsProvider, Serializable {
     Objects.requireNonNull(client);
 
     executor = MoreExecutors.listeningDecorator(
-        TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(
+      ExecutorUtils.newFixedThreadPoolWithQueueSize(
             config.getThreadsPerProvider().intValue(),
             streamsConfiguration.getQueueSize().intValue()
         )
@@ -224,24 +229,32 @@ public class SevenDaySearchProvider implements StreamsProvider, Serializable {
   @Override
   public void startStream() {
 
-    LOGGER.debug("{} startStream", STREAMS_ID);
+    Objects.requireNonNull(executor);
+
+    LOGGER.info("startStream");
 
     running.set(true);
 
-    executor.shutdown();
+    LOGGER.info("running: {}", running.get());
+
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
+
+    LOGGER.info("running: {}", running.get());
 
   }
 
   protected void submitSearchThread() {
 
-      SevenDaySearchProviderTask providerTask = new SevenDaySearchProviderTask(
+      Callable providerTask = new SevenDaySearchProviderTask(
           this,
           client,
         request
       );
-      ListenableFuture future = executor.submit(providerTask);
+      LOGGER.info("Thread Created: {}", request);
+      tasks.add(providerTask);
+      Future future = executor.submit(providerTask);
       futures.add(future);
-      LOGGER.info("Thread Submitted: {}", providerTask.request);
+      LOGGER.info("Thread Submitted: {}", request);
 
   }
 
@@ -255,8 +268,7 @@ public class SevenDaySearchProvider implements StreamsProvider, Serializable {
     try {
       lock.writeLock().lock();
       result = new StreamsResultSet(providerQueue);
-      result.setCounter(new DatumStatusCounter());
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
@@ -273,10 +285,6 @@ public class SevenDaySearchProvider implements StreamsProvider, Serializable {
 
   }
 
-  protected Queue<StreamsDatum> constructQueue() {
-    return new LinkedBlockingQueue<StreamsDatum>();
-  }
-
   public StreamsResultSet readNew(BigInteger sequence) {
     LOGGER.debug("{} readNew", STREAMS_ID);
     throw new NotImplementedException();
@@ -299,35 +307,30 @@ public class SevenDaySearchProvider implements StreamsProvider, Serializable {
 
   @Override
   public void cleanUp() {
-    shutdownAndAwaitTermination(executor);
-  }
-
-  void shutdownAndAwaitTermination(ExecutorService pool) {
-    pool.shutdown(); // Disable new tasks from being submitted
-    try {
-      // Wait a while for existing tasks to terminate
-      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-        pool.shutdownNow(); // Cancel currently executing tasks
-        // Wait a while for tasks to respond to being cancelled
-        if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-          System.err.println("Pool did not terminate");
-        }
-      }
-    } catch (InterruptedException ie) {
-      // (Re-)Cancel if current thread also interrupted
-      pool.shutdownNow();
-      // Preserve interrupt status
-      Thread.currentThread().interrupt();
-    }
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
   }
 
   @Override
   public boolean isRunning() {
-    if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) {
-      LOGGER.info("Completed");
+    LOGGER.debug("executor.isTerminated: {}", executor.isTerminated());
+    LOGGER.debug("tasks.size(): {}", tasks.size());
+    LOGGER.debug("futures.size(): {}", futures.size());
+    if ( tasks.size() > 0 && tasks.size() == futures.size() && executor.isTerminated() ) {
       running.set(false);
-      LOGGER.info("Exiting");
     }
+    LOGGER.debug("isRunning: {}", running.get());
     return running.get();
   }
+
+  @Override
+  public Iterator<Tweet> call() throws Exception {
+    prepare(config);
+    startStream();
+    do {
+      Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+    } while ( isRunning());
+    cleanUp();
+    return providerQueue.stream().map( x -> ((Tweet)x.getDocument())).distinct().iterator();
+
+  }
 }
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java
index 8bb0b0f..ac094dc 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProviderTask.java
@@ -32,14 +32,16 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
  *  Retrieve recent posts for a single user id.
  */
-public class SevenDaySearchProviderTask implements Runnable {
+public class SevenDaySearchProviderTask implements Callable<Iterator<Tweet>>, Runnable {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(SevenDaySearchProviderTask.class);
 
@@ -48,6 +50,7 @@ public class SevenDaySearchProviderTask implements Runnable {
   protected SevenDaySearchProvider provider;
   protected Twitter client;
   protected SevenDaySearchRequest request;
+  protected List<Tweet> responseList;
 
   /**
    * SevenDaySearchProviderTask constructor.
@@ -76,6 +79,8 @@ public class SevenDaySearchProviderTask implements Runnable {
 
       List<Tweet> statuses = response.getStatuses();
 
+      responseList.addAll(statuses);
+
       last_count = statuses.size();
       if( statuses.size() > 0 ) {
 
@@ -109,6 +114,10 @@ public class SevenDaySearchProviderTask implements Runnable {
             && page_count <= provider.getConfig().getMaxPages());
   }
 
-
+  @Override
+  public Iterator<Tweet> call() throws Exception {
+    run();
+    return responseList.iterator();
+  }
 
 }
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java
index 080a30d..287ab4f 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProvider.java
@@ -25,11 +25,15 @@ import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.core.util.ExecutorUtils;
+import org.apache.streams.core.util.QueueUtils;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.StreamsJacksonMapperConfiguration;
 import org.apache.streams.twitter.config.ThirtyDaySearchProviderConfiguration;
 import org.apache.streams.twitter.api.ThirtyDaySearchRequest;
 import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.Tweet;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -54,10 +58,13 @@ import java.io.PrintStream;
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.Queue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -69,7 +76,7 @@ import java.util.stream.Stream;
 /**
  * Retrieve recent posts from a list of user ids or names.
  */
-public class ThirtyDaySearchProvider implements StreamsProvider, Serializable {
+public class ThirtyDaySearchProvider implements Callable<Iterator<Tweet>>, StreamsProvider, Serializable {
 
   private static final String STREAMS_ID = "ThirtyDaySearchProvider";
 
@@ -91,11 +98,14 @@ public class ThirtyDaySearchProvider implements StreamsProvider, Serializable {
 
   protected Twitter client;
 
-  protected ListeningExecutorService executor;
+  protected ExecutorService executor;
 
-  protected final AtomicBoolean running = new AtomicBoolean();
+  private List<Callable<Object>> tasks = new ArrayList<>();
+  private List<Future<Object>> futures = new ArrayList<>();
+
+  StreamsConfiguration streamsConfiguration;
 
-  private List<ListenableFuture<Object>> futures = new ArrayList<>();
+  protected final AtomicBoolean running = new AtomicBoolean();
 
   /**
    * To use from command line:
@@ -137,26 +147,21 @@ public class ThirtyDaySearchProvider implements StreamsProvider, Serializable {
     ThirtyDaySearchProviderConfiguration config = new ComponentConfigurator<>(ThirtyDaySearchProviderConfiguration.class).detectConfiguration();
     ThirtyDaySearchProvider provider = new ThirtyDaySearchProvider(config);
 
-    ObjectMapper mapper = new StreamsJacksonMapper(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList()));
-
     PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
-    provider.prepare(config);
-    provider.startStream();
-    do {
-      Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
-      for (StreamsDatum datum : provider.readCurrent()) {
-        String json;
-        try {
-          json = mapper.writeValueAsString(datum.getDocument());
-          outStream.println(json);
-        } catch (JsonProcessingException ex) {
-          System.err.println(ex.getMessage());
-        }
+    ObjectMapper mapper = StreamsJacksonMapper.getInstance(new StreamsJacksonMapperConfiguration().withDateFormats(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList())));
+
+    Iterator<Tweet> results = provider.call();
+
+    results.forEachRemaining(d -> {
+      try {
+        outStream.println(mapper.writeValueAsString(d));
+      } catch( Exception e ) {
+        LOGGER.warn("Exception", e);
       }
-    }
-    while ( provider.isRunning() );
-    provider.cleanUp();
+    });
+
     outStream.flush();
+
   }
 
   public ThirtyDaySearchProvider(ThirtyDaySearchProviderConfiguration config) {
@@ -181,7 +186,7 @@ public class ThirtyDaySearchProvider implements StreamsProvider, Serializable {
 
     try {
       lock.writeLock().lock();
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
@@ -197,7 +202,7 @@ public class ThirtyDaySearchProvider implements StreamsProvider, Serializable {
     request = new ThirtyDaySearchRequest();
     request.setQuery(config.getQuery());
 
-    StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration();
+    streamsConfiguration = StreamsConfigurator.detectConfiguration();
 
     try {
       client = getTwitterClient();
@@ -208,7 +213,7 @@ public class ThirtyDaySearchProvider implements StreamsProvider, Serializable {
     Objects.requireNonNull(client);
 
     executor = MoreExecutors.listeningDecorator(
-        TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(
+      ExecutorUtils.newFixedThreadPoolWithQueueSize(
             config.getThreadsPerProvider().intValue(),
             streamsConfiguration.getQueueSize().intValue()
         )
@@ -221,25 +226,32 @@ public class ThirtyDaySearchProvider implements StreamsProvider, Serializable {
   @Override
   public void startStream() {
 
-    LOGGER.debug("{} startStream", STREAMS_ID);
+    Objects.requireNonNull(executor);
+
+    LOGGER.info("startStream");
 
     running.set(true);
 
-    executor.shutdown();
+    LOGGER.info("running: {}", running.get());
+
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
+
+    LOGGER.info("running: {}", running.get());
 
   }
 
   protected void submitSearchThread() {
 
-    ThirtyDaySearchProviderTask providerTask = new ThirtyDaySearchProviderTask(
+    Callable providerTask = new ThirtyDaySearchProviderTask(
           this,
           client,
         request
       );
-      ListenableFuture future = executor.submit(providerTask);
-      futures.add(future);
-      LOGGER.info("Thread Submitted: {}", providerTask.request);
-
+    LOGGER.info("Thread Created: {}", request);
+    tasks.add(providerTask);
+    Future future = executor.submit(providerTask);
+    futures.add(future);
+    LOGGER.info("Thread Submitted: {}", request);
   }
 
   @Override
@@ -252,8 +264,7 @@ public class ThirtyDaySearchProvider implements StreamsProvider, Serializable {
     try {
       lock.writeLock().lock();
       result = new StreamsResultSet(providerQueue);
-      result.setCounter(new DatumStatusCounter());
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
@@ -270,10 +281,6 @@ public class ThirtyDaySearchProvider implements StreamsProvider, Serializable {
 
   }
 
-  protected Queue<StreamsDatum> constructQueue() {
-    return new LinkedBlockingQueue<StreamsDatum>();
-  }
-
   public StreamsResultSet readNew(BigInteger sequence) {
     LOGGER.debug("{} readNew", STREAMS_ID);
     throw new NotImplementedException();
@@ -296,35 +303,29 @@ public class ThirtyDaySearchProvider implements StreamsProvider, Serializable {
 
   @Override
   public void cleanUp() {
-    shutdownAndAwaitTermination(executor);
-  }
-
-  void shutdownAndAwaitTermination(ExecutorService pool) {
-    pool.shutdown(); // Disable new tasks from being submitted
-    try {
-      // Wait a while for existing tasks to terminate
-      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-        pool.shutdownNow(); // Cancel currently executing tasks
-        // Wait a while for tasks to respond to being cancelled
-        if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-          System.err.println("Pool did not terminate");
-        }
-      }
-    } catch (InterruptedException ie) {
-      // (Re-)Cancel if current thread also interrupted
-      pool.shutdownNow();
-      // Preserve interrupt status
-      Thread.currentThread().interrupt();
-    }
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
   }
 
   @Override
   public boolean isRunning() {
-    if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) {
-      LOGGER.info("Completed");
+    LOGGER.debug("executor.isTerminated: {}", executor.isTerminated());
+    LOGGER.debug("tasks.size(): {}", tasks.size());
+    LOGGER.debug("futures.size(): {}", futures.size());
+    if ( tasks.size() > 0 && tasks.size() == futures.size() && executor.isShutdown() && executor.isTerminated() ) {
       running.set(false);
-      LOGGER.info("Exiting");
     }
+    LOGGER.debug("isRunning: {}", running.get());
     return running.get();
   }
+
+  @Override
+  public Iterator<Tweet> call() throws Exception {
+    prepare(config);
+    startStream();
+    do {
+      Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+    } while ( isRunning());
+    cleanUp();
+    return providerQueue.stream().map( x -> ((Tweet)x.getDocument())).iterator();
+  }
 }
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java
index 07b40e5..dea770b 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/ThirtyDaySearchProviderTask.java
@@ -31,14 +31,16 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
  *  Retrieve recent posts for a single user id.
  */
-public class ThirtyDaySearchProviderTask implements Runnable {
+public class ThirtyDaySearchProviderTask implements Callable<Iterator<Tweet>>, Runnable {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(ThirtyDaySearchProviderTask.class);
 
@@ -47,6 +49,7 @@ public class ThirtyDaySearchProviderTask implements Runnable {
   protected ThirtyDaySearchProvider provider;
   protected Twitter client;
   protected ThirtyDaySearchRequest request;
+  protected List<Tweet> responseList;
 
   /**
    * ThirtyDaySearchProviderTask constructor.
@@ -75,6 +78,8 @@ public class ThirtyDaySearchProviderTask implements Runnable {
 
       List<Tweet> statuses = response.getResults();
 
+      responseList.addAll(statuses);
+
       last_count = statuses.size();
       if( statuses.size() > 0 ) {
 
@@ -107,6 +112,9 @@ public class ThirtyDaySearchProviderTask implements Runnable {
         && page_count <= provider.getConfig().getMaxPages());
   }
 
-
-
+  @Override
+  public Iterator<Tweet> call() throws Exception {
+    run();
+    return responseList.iterator();
+  }
 }
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEngagersProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEngagersProvider.java
index bbb2881..e244f50 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEngagersProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEngagersProvider.java
@@ -24,8 +24,11 @@ import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.core.util.ExecutorUtils;
+import org.apache.streams.core.util.QueueUtils;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.StreamsJacksonMapperConfiguration;
+import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.config.TwitterEngagersProviderConfiguration;
 import org.apache.streams.twitter.config.TwitterTimelineProviderConfiguration;
 import org.apache.streams.twitter.api.RetweetsRequest;
@@ -36,6 +39,7 @@ import org.apache.streams.twitter.pojo.User;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -58,8 +62,11 @@ import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Objects;
 import java.util.Queue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -73,7 +80,7 @@ import static org.apache.streams.twitter.provider.TwitterUserInformationProvider
 /**
  * Retrieve posts from a list of user ids or names, then provide all of the users who retweeted those posts.
  */
-public class TwitterEngagersProvider extends TwitterTimelineProvider implements StreamsProvider, Serializable {
+public class TwitterEngagersProvider implements Callable<Iterator<User>>, StreamsProvider, Serializable {
 
   private static final String STREAMS_ID = "TwitterEngagersProvider";
 
@@ -91,14 +98,19 @@ public class TwitterEngagersProvider extends TwitterTimelineProvider implements
 
   protected final AtomicBoolean running = new AtomicBoolean();
 
-  private List<ListenableFuture<Object>> futures = new ArrayList<>();
+  protected Twitter client;
 
-  protected ListeningExecutorService executor;
+  private List<Callable<Object>> tasks = new ArrayList<>();
+  private List<Future<Object>> futures = new ArrayList<>();
+
+  public static ExecutorService executor;
 
   StreamsConfiguration streamsConfiguration;
 
   RetweetsRequest baseRetweetsRequest;
 
+  TwitterTimelineProvider timelineProvider;
+
   /**
    * To use from command line:
    *
@@ -134,29 +146,22 @@ public class TwitterEngagersProvider extends TwitterTimelineProvider implements
     Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false)).withFallback(StreamsConfigurator.getConfig());
     StreamsConfigurator.addConfig(testResourceConfig);
 
-    StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration();
+    PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+    ObjectMapper mapper = StreamsJacksonMapper.getInstance(new StreamsJacksonMapperConfiguration().withDateFormats(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList())));
+
     TwitterEngagersProviderConfiguration config = new ComponentConfigurator<>(TwitterEngagersProviderConfiguration.class).detectConfiguration();
     TwitterEngagersProvider provider = new TwitterEngagersProvider(config);
 
-    ObjectMapper mapper = StreamsJacksonMapper.getInstance(new StreamsJacksonMapperConfiguration().withDateFormats(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList())));
+    Iterator<User> results = provider.call();
 
-    PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
-    provider.prepare(config);
-    provider.startStream();
-    do {
-      Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
-      for (StreamsDatum datum : provider.readCurrent()) {
-        String json;
-        try {
-          json = mapper.writeValueAsString(datum.getDocument());
-          outStream.println(json);
-        } catch (JsonProcessingException ex) {
-          System.err.println(ex.getMessage());
-        }
+    results.forEachRemaining(d -> {
+      try {
+        outStream.println(mapper.writeValueAsString(d));
+      } catch( Exception e ) {
+        LOGGER.warn("Exception", e);
       }
-    }
-    while ( provider.isRunning() );
-    provider.cleanUp();
+    });
+
     outStream.flush();
   }
 
@@ -176,27 +181,37 @@ public class TwitterEngagersProvider extends TwitterTimelineProvider implements
   @Override
   public void prepare(Object configurationObject) {
 
+    timelineProvider = new TwitterTimelineProvider(config);
+
     if( configurationObject instanceof TwitterEngagersProviderConfiguration ) {
       this.config = (TwitterEngagersProviderConfiguration)configurationObject;
-      super.prepare(MAPPER.convertValue(this.config, TwitterTimelineProviderConfiguration.class));
+      timelineProvider.prepare(MAPPER.convertValue(this.config, TwitterTimelineProviderConfiguration.class));
     } else if( configurationObject instanceof TwitterTimelineProviderConfiguration ) {
-      super.prepare(configurationObject);
+      timelineProvider.prepare(configurationObject);
       this.config = MAPPER.convertValue(this.config, TwitterEngagersProviderConfiguration.class);
     } else {
-      super.prepare(null);
+      timelineProvider.prepare(null);
     }
 
     streamsConfiguration = StreamsConfigurator.detectConfiguration(StreamsConfigurator.getConfig());
 
     try {
       lock.writeLock().lock();
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
 
+    try {
+      client = getTwitterClient();
+    } catch (InstantiationException e) {
+      LOGGER.error("InstantiationException", e);
+    }
+
+    Objects.requireNonNull(client);
+
     executor = MoreExecutors.listeningDecorator(
-      TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(
+      ExecutorUtils.newFixedThreadPoolWithQueueSize(
         config.getThreadsPerProvider().intValue(),
         streamsConfiguration.getQueueSize().intValue()
       )
@@ -206,41 +221,50 @@ public class TwitterEngagersProvider extends TwitterTimelineProvider implements
 
   }
 
+  /**
+   * get Twitter Client from TwitterUserInformationConfiguration.
+   * @return result
+   */
+  public Twitter getTwitterClient() throws InstantiationException {
+
+    return Twitter.getInstance(config);
+
+  }
+
   @Override
   public void startStream() {
 
     LOGGER.debug("{} startStream", STREAMS_ID);
 
-    super.startStream();
+    Iterator<Tweet> timelineIterator = timelineProvider.call();
 
-    do {
-      Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
-      Iterator<StreamsDatum> iterator = super.readCurrent().iterator();
-      while (iterator.hasNext()) {
-        StreamsDatum datum = iterator.next();
-        Tweet tweet = (Tweet) datum.getDocument();
-        submitRetweeterIdsTaskThread(tweet.getId());
-      }
-    }
-    while ( super.isRunning() );
-    super.cleanUp();
-    executor.shutdown();
+    List<Tweet> timelineList = Lists.newArrayList(timelineIterator);
+
+    LOGGER.info("running: {}", running.get());
+
+    timelineList.forEach(tweet -> submitRetweeterIdsTaskThread(tweet.getId()));
+
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
+
+    LOGGER.info("running: {}", running.get());
 
   }
 
   protected void submitRetweeterIdsTaskThread( Long postId ) {
 
+    Callable<Object> callable = createTask(postId);
+    LOGGER.info("Thread Created: {}", postId);
+    tasks.add(callable);
+    futures.add(executor.submit(callable));
+    LOGGER.info("Thread Submitted: {}", postId);
+
+  }
+
+  protected Callable createTask( Long postId ) {
     RetweetsRequest request = new ComponentConfigurator<>(RetweetsRequest.class).detectConfiguration();
     request.setId(postId);
-      TwitterRetweetsTask providerTask = new TwitterRetweetsTask(
-      this,
-      client,
-      request
-    );
-    ListenableFuture future = executor.submit(providerTask);
-    super.futures.add(future);
-    LOGGER.info("Thread Submitted: {}", providerTask.request);
-
+    Callable callable = new TwitterRetweetsTask(this, client, request);
+    return callable;
   }
 
   @Override
@@ -252,7 +276,7 @@ public class TwitterEngagersProvider extends TwitterTimelineProvider implements
 
     try {
       lock.writeLock().lock();
-      Queue<StreamsDatum> resultQueue = constructQueue();
+      Queue<StreamsDatum> resultQueue = QueueUtils.constructQueue();
       providerQueue.iterator().forEachRemaining(
         datum -> {
           Tweet tweet = ((Tweet) datum.getDocument());
@@ -266,12 +290,12 @@ public class TwitterEngagersProvider extends TwitterTimelineProvider implements
         }
       );
       result = new StreamsResultSet(resultQueue);
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
 
-    if ( result.size() == 0 && providerQueue.isEmpty() && executor.isTerminated() ) {
+    if ( result.size() == 0 && providerQueue.isEmpty() && executor.isShutdown() && executor.isTerminated() ) {
       LOGGER.info("Finished.  Cleaning up...");
 
       running.set(false);
@@ -283,10 +307,6 @@ public class TwitterEngagersProvider extends TwitterTimelineProvider implements
 
   }
 
-  protected Queue<StreamsDatum> constructQueue() {
-    return new LinkedBlockingQueue<StreamsDatum>();
-  }
-
   public StreamsResultSet readNew(BigInteger sequence) {
     LOGGER.debug("{} readNew", STREAMS_ID);
     throw new NotImplementedException();
@@ -299,36 +319,31 @@ public class TwitterEngagersProvider extends TwitterTimelineProvider implements
 
   @Override
   public void cleanUp() {
-    super.cleanUp();
-    shutdownAndAwaitTermination(executor);
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
   }
 
-  void shutdownAndAwaitTermination(ExecutorService pool) {
-    pool.shutdown(); // Disable new tasks from being submitted
-    try {
-      // Wait a while for existing tasks to terminate
-      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-        pool.shutdownNow(); // Cancel currently executing tasks
-        // Wait a while for tasks to respond to being cancelled
-        if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-          System.err.println("Pool did not terminate");
-        }
-      }
-    } catch (InterruptedException ie) {
-      // (Re-)Cancel if current thread also interrupted
-      pool.shutdownNow();
-      // Preserve interrupt status
-      Thread.currentThread().interrupt();
-    }
-  }
-
-  @Override
   public boolean isRunning() {
-    if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) {
-      LOGGER.info("Completed");
+    LOGGER.debug("timelineProvider.isRunning: {}", timelineProvider.isRunning());
+    LOGGER.debug("providerQueue.isEmpty: {}", providerQueue.isEmpty());
+    LOGGER.debug("providerQueue.size: {}", providerQueue.size());
+    LOGGER.debug("executor.isTerminated: {}", executor.isTerminated());
+    LOGGER.debug("tasks.size(): {}", tasks.size());
+    LOGGER.debug("futures.size(): {}", futures.size());
+    if ( timelineProvider.isRunning() == false && tasks.size() > 0 && tasks.size() == futures.size() && executor.isTerminated() ) {
       running.set(false);
-      LOGGER.info("Exiting");
     }
+    LOGGER.debug("isRunning: {}", running.get());
     return running.get();
   }
+
+  @Override
+  public Iterator<User> call() {
+    prepare(config);
+    startStream();
+    do {
+      Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+    } while ( isRunning());
+    cleanUp();
+    return providerQueue.stream().map( x -> ((Tweet)x.getDocument()).getUser()).distinct().iterator();
+  }
 }
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersIdsProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersIdsProviderTask.java
index 7acbeef..0b2305a 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersIdsProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersIdsProviderTask.java
@@ -22,6 +22,7 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.api.FollowersIdsRequest;
 import org.apache.streams.twitter.api.FollowersIdsResponse;
+import org.apache.streams.twitter.api.FriendsIdsResponse;
 import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.pojo.Follow;
 import org.apache.streams.twitter.pojo.User;
@@ -32,10 +33,15 @@ import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+
 /**
  *  Retrieve friend or follower connections for a single user id.
  */
-public class TwitterFollowersIdsProviderTask implements Runnable {
+public class TwitterFollowersIdsProviderTask implements Callable<Iterator<FollowersIdsResponse>>, Runnable {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowersIdsProviderTask.class);
 
@@ -44,6 +50,7 @@ public class TwitterFollowersIdsProviderTask implements Runnable {
   protected Twitter client;
   protected TwitterFollowingProvider provider;
   protected FollowersIdsRequest request;
+  protected List<FollowersIdsResponse> responseList;
 
   /**
    * TwitterFollowingProviderTask constructor.
@@ -62,6 +69,8 @@ public class TwitterFollowersIdsProviderTask implements Runnable {
 
     Preconditions.checkArgument(request.getId() != null || request.getScreenName() != null);
 
+    responseList = new ArrayList<>();
+
     LOGGER.info("Thread Starting: {}", request.toString());
 
     getFollowersIds(request);
@@ -81,6 +90,8 @@ public class TwitterFollowersIdsProviderTask implements Runnable {
 
       FollowersIdsResponse response = client.ids(request);
 
+      responseList.add(response);
+
       last_count = response.getIds().size();
 
       if (response.getIds().size() > 0) {
@@ -122,4 +133,9 @@ public class TwitterFollowersIdsProviderTask implements Runnable {
             && page_count <= provider.getConfig().getMaxPages());
   }
 
+  @Override
+  public Iterator<FollowersIdsResponse> call() throws Exception {
+    run();
+    return responseList.iterator();
+  }
 }
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersListProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersListProviderTask.java
index 5195a13..94ec455 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersListProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersListProviderTask.java
@@ -28,13 +28,19 @@ import org.apache.streams.twitter.pojo.User;
 import org.apache.streams.util.ComponentUtils;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+
 /**
  *  Retrieve friend or follower connections for a single user id.
  */
-public class TwitterFollowersListProviderTask implements Runnable {
+public class TwitterFollowersListProviderTask implements Callable<Iterator<FollowersListResponse>>, Runnable {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowersListProviderTask.class);
 
@@ -43,6 +49,7 @@ public class TwitterFollowersListProviderTask implements Runnable {
   protected Twitter client;
   protected TwitterFollowingProvider provider;
   protected FollowersListRequest request;
+  protected List<FollowersListResponse> responseList;
 
   /**
    * TwitterFollowersListProviderTask constructor.
@@ -59,8 +66,12 @@ public class TwitterFollowersListProviderTask implements Runnable {
   @Override
   public void run() {
 
+    Preconditions.checkArgument(request.getId() != null || request.getScreenName() != null);
+
     LOGGER.info("Thread Starting: {}", request.toString());
 
+    responseList = new ArrayList<>();
+
     getFollowersList(request);
 
     LOGGER.info("Thread Finished: {}", request.toString());
@@ -78,6 +89,8 @@ public class TwitterFollowersListProviderTask implements Runnable {
 
       FollowersListResponse response = client.list(request);
 
+      responseList.add(response);
+
       last_count = response.getUsers().size();
 
       if (response.getUsers().size() > 0) {
@@ -118,4 +131,9 @@ public class TwitterFollowersListProviderTask implements Runnable {
             && page_count <= provider.getConfig().getMaxPages());
   }
 
+  @Override
+  public Iterator<FollowersListResponse> call() throws Exception {
+    run();
+    return responseList.iterator();
+  }
 }
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index 7e6949c..3b45cf3 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -25,6 +25,8 @@ import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.core.util.ExecutorUtils;
+import org.apache.streams.core.util.QueueUtils;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.StreamsJacksonMapperConfiguration;
 import org.apache.streams.twitter.config.TwitterFollowingConfiguration;
@@ -34,6 +36,7 @@ import org.apache.streams.twitter.api.FriendsIdsRequest;
 import org.apache.streams.twitter.api.FriendsListRequest;
 import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.Follow;
 import org.apache.streams.twitter.pojo.User;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -43,6 +46,7 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
@@ -59,26 +63,34 @@ import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.Queue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static sun.misc.PostVMInitHook.run;
 
 /**
  * Retrieve all follow adjacencies from a list of user ids or names.
  */
-public class TwitterFollowingProvider implements StreamsProvider, Serializable {
+public class TwitterFollowingProvider implements Callable<Iterator<Follow>>, StreamsProvider, Serializable {
 
   public static final String STREAMS_ID = "TwitterFollowingProvider";
   private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowingProvider.class);
 
   private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
+  private StreamsConfiguration streamsConfiguration;
   private TwitterFollowingConfiguration config;
 
   protected List<String> names = new ArrayList<>();
@@ -86,9 +98,10 @@ public class TwitterFollowingProvider implements StreamsProvider, Serializable {
 
   protected Twitter client;
 
-  protected ListeningExecutorService executor;
+  public static ExecutorService executor;
 
-  private List<ListenableFuture<Object>> futures = new ArrayList<>();
+  private List<Callable<Object>> tasks = new ArrayList<>();
+  private List<Future<Object>> futures = new ArrayList<>();
 
   protected final AtomicBoolean running = new AtomicBoolean();
 
@@ -129,31 +142,22 @@ public class TwitterFollowingProvider implements StreamsProvider, Serializable {
     Config configFile = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults());
     StreamsConfigurator.addConfig(configFile);
 
-    StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration();
+    PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+    ObjectMapper mapper = StreamsJacksonMapper.getInstance(new StreamsJacksonMapperConfiguration().withDateFormats(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList())));
+
     TwitterFollowingConfiguration config = new ComponentConfigurator<>(TwitterFollowingConfiguration.class).detectConfiguration();
     TwitterFollowingProvider provider = new TwitterFollowingProvider(config);
 
-    StreamsJacksonMapperConfiguration mapperConfiguration = new StreamsJacksonMapperConfiguration()
-        .withDateFormats(Collections.singletonList(TwitterDateTimeFormat.TWITTER_FORMAT));
-    ObjectMapper mapper = StreamsJacksonMapper.getInstance(mapperConfiguration);
+    Iterator<Follow> results = provider.call();
 
-    PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
-    provider.prepare(config);
-    provider.startStream();
-    do {
-      Thread.sleep(streamsConfiguration.getBatchFrequencyMs());
-      for (StreamsDatum datum : provider.readCurrent()) {
-        String json;
-        try {
-          json = mapper.writeValueAsString(datum.getDocument());
-          outStream.println(json);
-        } catch (JsonProcessingException ex) {
-          System.err.println(ex.getMessage());
-        }
+    results.forEachRemaining(d -> {
+      try {
+        outStream.println(mapper.writeValueAsString(d));
+      } catch( Exception e ) {
+        LOGGER.warn("Exception", e);
       }
-    }
-    while ( provider.isRunning());
-    provider.cleanUp();
+    });
+
     outStream.flush();
   }
 
@@ -178,6 +182,8 @@ public class TwitterFollowingProvider implements StreamsProvider, Serializable {
 
   public void prepare(Object configurationObject) {
 
+    this.streamsConfiguration = StreamsConfigurator.detectConfiguration();
+
     if( configurationObject instanceof TwitterFollowingConfiguration) {
       this.config = (TwitterFollowingConfiguration) configurationObject;
     }
@@ -203,7 +209,7 @@ public class TwitterFollowingProvider implements StreamsProvider, Serializable {
 
     try {
       lock.writeLock().lock();
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
@@ -228,7 +234,7 @@ public class TwitterFollowingProvider implements StreamsProvider, Serializable {
     Objects.requireNonNull(getConfig().getEndpoint());
 
     executor = MoreExecutors.listeningDecorator(
-        TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(
+      ExecutorUtils.newFixedThreadPoolWithQueueSize(
             config.getThreadsPerProvider().intValue(),
             streamsConfiguration.getQueueSize().intValue()
         )
@@ -237,12 +243,18 @@ public class TwitterFollowingProvider implements StreamsProvider, Serializable {
     Preconditions.checkArgument(getConfig().getEndpoint().equals("friends") || getConfig().getEndpoint().equals("followers"));
 
     for (Long id : ids) {
-      submitTask(createTask(id, null));
+      Callable<Object> callable = createTask(id, null);
+      LOGGER.info("Thread Created: {}", id);
+      tasks.add(callable);
+      futures.add(executor.submit(callable));
       LOGGER.info("Thread Submitted: {}", id);
     }
 
     for (String name : names) {
-      submitTask(createTask(null, name));
+      Callable<Object> callable = createTask(null, name);
+      LOGGER.info("Thread Created: {}", name);
+      tasks.add(callable);
+      futures.add(executor.submit(callable));
       LOGGER.info("Thread Submitted: {}", name);
     }
 
@@ -256,45 +268,42 @@ public class TwitterFollowingProvider implements StreamsProvider, Serializable {
 
     running.set(true);
 
-    LOGGER.info("isRunning");
+    LOGGER.info("running: {}", running.get());
+
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
 
-    executor.shutdown();
+    LOGGER.info("running: {}", running.get());
 
   }
 
-  protected Runnable createTask(Long id, String name) {
+  protected Callable createTask(Long id, String name) {
     if( config.getEndpoint().equals("friends") && config.getIdsOnly() == true ) {
       FriendsIdsRequest request = (FriendsIdsRequest)new FriendsIdsRequest().withId(id).withScreenName(name);
       return new TwitterFriendsIdsProviderTask(
-              this,
-              client,
-              request);
+        this,
+        client,
+        request);
     } else if( config.getEndpoint().equals("friends") && config.getIdsOnly() == false ) {
       FriendsListRequest request = (FriendsListRequest)new FriendsListRequest().withId(id).withScreenName(name);
       return new TwitterFriendsListProviderTask(
-          this,
-          client,
-          request);
+        this,
+        client,
+        request);
     } else if( config.getEndpoint().equals("followers") && config.getIdsOnly() == true ) {
       FollowersIdsRequest request = (FollowersIdsRequest)new FollowersIdsRequest().withId(id).withScreenName(name);
       return new TwitterFollowersIdsProviderTask(
-          this,
-          client,
-          request);
+        this,
+        client,
+        request);
     } else if( config.getEndpoint().equals("followers") && config.getIdsOnly() == false ) {
       FollowersListRequest request = (FollowersListRequest)new FollowersListRequest().withId(id).withScreenName(name);
       return new TwitterFollowersListProviderTask(
-          this,
-          client,
-          request);
+        this,
+        client,
+        request);
     } else return null;
   }
 
-  protected void submitTask(Runnable providerTask) {
-    ListenableFuture future = executor.submit(providerTask);
-    futures.add(future);
-  }
-
   protected Twitter getTwitterClient() throws InstantiationException {
     return Twitter.getInstance(config);
   }
@@ -306,8 +315,7 @@ public class TwitterFollowingProvider implements StreamsProvider, Serializable {
     try {
       lock.writeLock().lock();
       result = new StreamsResultSet(providerQueue);
-      result.setCounter(new DatumStatusCounter());
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
       LOGGER.debug("readCurrent: {} Documents", result.size());
     } finally {
       lock.writeLock().unlock();
@@ -332,40 +340,29 @@ public class TwitterFollowingProvider implements StreamsProvider, Serializable {
   }
 
   public boolean isRunning() {
-    if ( providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone() ) {
-      LOGGER.info("All Threads Completed");
+    LOGGER.debug("executor.isTerminated: {}", executor.isTerminated());
+    LOGGER.debug("tasks.size(): {}", tasks.size());
+    LOGGER.debug("futures.size(): {}", futures.size());
+    if ( tasks.size() > 0 && tasks.size() == futures.size() && executor.isShutdown() && executor.isTerminated() ) {
       running.set(false);
-      LOGGER.info("Exiting");
     }
+    LOGGER.debug("isRunning: {}", running.get());
     return running.get();
   }
 
-  // abstract this out
-  protected Queue<StreamsDatum> constructQueue() {
-    return new LinkedBlockingQueue<>();
+  public void cleanUp() {
+    // cleanUp
   }
 
-  // abstract this out
-  void shutdownAndAwaitTermination(ExecutorService pool) {
-    pool.shutdown(); // Disable new tasks from being submitted
-    try {
-      // Wait a while for existing tasks to terminate
-      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-        pool.shutdownNow(); // Cancel currently executing tasks
-        // Wait a while for tasks to respond to being cancelled
-        if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-          System.err.println("Pool did not terminate");
-        }
-      }
-    } catch (InterruptedException ie) {
-      // (Re-)Cancel if current thread also interrupted
-      pool.shutdownNow();
-      // Preserve interrupt status
-      Thread.currentThread().interrupt();
-    }
+  @Override
+  public Iterator<Follow> call() {
+    prepare(config);
+    startStream();
+    do {
+      Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+    } while ( isRunning());
+    cleanUp();
+    return providerQueue.stream().map( x -> (Follow)x.getDocument()).iterator();
   }
 
-  public void cleanUp() {
-    shutdownAndAwaitTermination(executor);
-  }
 }
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsIdsProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsIdsProviderTask.java
index 5a53123..402e825 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsIdsProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsIdsProviderTask.java
@@ -32,10 +32,15 @@ import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+
 /**
  *  Retrieve friend or follower connections for a single user id.
  */
-public class TwitterFriendsIdsProviderTask implements Runnable {
+public class TwitterFriendsIdsProviderTask implements Callable<Iterator<FriendsIdsResponse>>, Runnable {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFriendsIdsProviderTask.class);
 
@@ -44,6 +49,7 @@ public class TwitterFriendsIdsProviderTask implements Runnable {
   protected Twitter client;
   protected TwitterFollowingProvider provider;
   protected FriendsIdsRequest request;
+  protected List<FriendsIdsResponse> responseList;
 
   /**
    * TwitterFollowingProviderTask constructor.
@@ -62,6 +68,8 @@ public class TwitterFriendsIdsProviderTask implements Runnable {
 
     Preconditions.checkArgument(request.getId() != null || request.getScreenName() != null);
 
+    responseList = new ArrayList<>();
+
     LOGGER.info("Thread Starting: {}", request.toString());
 
     getFriendsIds(request);
@@ -81,6 +89,8 @@ public class TwitterFriendsIdsProviderTask implements Runnable {
 
       FriendsIdsResponse response = client.ids(request);
 
+      responseList.add(response);
+
       last_count = response.getIds().size();
 
       if (response.getIds().size() > 0) {
@@ -123,4 +133,9 @@ public class TwitterFriendsIdsProviderTask implements Runnable {
             && page_count <= provider.getConfig().getMaxPages());
   }
 
+  @Override
+  public Iterator<FriendsIdsResponse> call() throws Exception {
+    run();
+    return responseList.iterator();
+  }
 }
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsListProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsListProviderTask.java
index e116f2f..b2917e2 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsListProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsListProviderTask.java
@@ -20,6 +20,8 @@ package org.apache.streams.twitter.provider;
 
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.api.FollowersListResponse;
+import org.apache.streams.twitter.api.FriendsIdsResponse;
 import org.apache.streams.twitter.api.FriendsListRequest;
 import org.apache.streams.twitter.api.FriendsListResponse;
 import org.apache.streams.twitter.api.Twitter;
@@ -32,10 +34,15 @@ import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+
 /**
  *  Retrieve friend or follower connections for a single user id.
  */
-public class TwitterFriendsListProviderTask implements Runnable {
+public class TwitterFriendsListProviderTask implements Callable<Iterator<FriendsListResponse>>, Runnable {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFriendsListProviderTask.class);
 
@@ -44,6 +51,7 @@ public class TwitterFriendsListProviderTask implements Runnable {
   protected Twitter client;
   protected TwitterFollowingProvider provider;
   protected FriendsListRequest request;
+  protected List<FriendsListResponse> responseList;
 
   /**
    * TwitterFollowingProviderTask constructor.
@@ -67,9 +75,13 @@ public class TwitterFriendsListProviderTask implements Runnable {
 
     Preconditions.checkArgument(request.getId() != null || request.getScreenName() != null);
 
+    responseList = new ArrayList<>();
+
+    LOGGER.info("Thread Starting: {}", request.toString());
+
     getFriendsList(request);
 
-    LOGGER.info(request.getId() != null ? request.getId().toString() : request.getScreenName() + " Thread Finished");
+    LOGGER.info("Thread Finished: {}", request.toString());
 
   }
 
@@ -79,6 +91,8 @@ public class TwitterFriendsListProviderTask implements Runnable {
 
       FriendsListResponse response = client.list(request);
 
+      responseList.add(response);
+
       last_count = response.getUsers().size();
 
       if (response.getUsers().size() > 0) {
@@ -121,4 +135,9 @@ public class TwitterFriendsListProviderTask implements Runnable {
             && page_count <= provider.getConfig().getMaxPages());
   }
 
+  @Override
+  public Iterator<FriendsListResponse> call() throws Exception {
+    run();
+    return responseList.iterator();
+  }
 }
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterRetweetsTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterRetweetsTask.java
index acbe008..b1b39b1 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterRetweetsTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterRetweetsTask.java
@@ -32,14 +32,16 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
  *  Retrieve recent posts for a single user id.
  */
-public class TwitterRetweetsTask implements Runnable {
+public class TwitterRetweetsTask implements Callable<Iterator<Tweet>>, Runnable {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TwitterRetweetsTask.class);
 
@@ -76,4 +78,9 @@ public class TwitterRetweetsTask implements Runnable {
 
   }
 
+  @Override
+  public Iterator<Tweet> call() throws Exception {
+    run();
+    return provider.providerQueue.stream().map(x -> (Tweet)x.getDocument()).iterator();
+  }
 }
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index d9a0759..5512d35 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -25,11 +25,14 @@ import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.core.util.ExecutorUtils;
+import org.apache.streams.core.util.QueueUtils;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.config.TwitterTimelineProviderConfiguration;
 import org.apache.streams.twitter.api.StatusesUserTimelineRequest;
 import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.Tweet;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -54,10 +57,13 @@ import java.io.PrintStream;
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.Queue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -71,7 +77,7 @@ import static java.util.concurrent.Executors.newSingleThreadExecutor;
 /**
  * Retrieve recent posts from a list of user ids or names.
  */
-public class TwitterTimelineProvider implements StreamsProvider, Serializable {
+public class TwitterTimelineProvider implements Callable<Iterator<Tweet>>, StreamsProvider, Serializable {
 
   private static final String STREAMS_ID = "TwitterTimelineProvider";
 
@@ -95,14 +101,17 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
   protected int idsCount;
   protected Twitter client;
 
-  protected ListeningExecutorService executor;
+  protected ExecutorService executor;
 
   protected DateTime start;
   protected DateTime end;
 
+  StreamsConfiguration streamsConfiguration;
+
   protected final AtomicBoolean running = new AtomicBoolean();
 
-  protected List<ListenableFuture<Object>> futures = new ArrayList<>();
+  private List<Callable<Object>> tasks = new ArrayList<>();
+  private List<Future<Object>> futures = new ArrayList<>();
 
   /**
    * To use from command line:
@@ -193,9 +202,11 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
       this.config = (TwitterTimelineProviderConfiguration)configurationObject;
     }
 
+    streamsConfiguration = StreamsConfigurator.detectConfiguration(StreamsConfigurator.getConfig());
+
     try {
       lock.writeLock().lock();
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
@@ -233,7 +244,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
     }
 
     executor = MoreExecutors.listeningDecorator(
-        TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(
+      ExecutorUtils.newFixedThreadPoolWithQueueSize(
             config.getThreadsPerProvider().intValue(),
             streamsConfiguration.getQueueSize().intValue()
         )
@@ -241,16 +252,25 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
 
     submitTimelineThreads(ids, names);
 
+    LOGGER.info("tasks: {}", tasks.size());
+    LOGGER.info("futures: {}", futures.size());
+
   }
 
   @Override
   public void startStream() {
 
-    LOGGER.debug("{} startStream", STREAMS_ID);
+    Objects.requireNonNull(executor);
+
+    LOGGER.info("startStream");
 
     running.set(true);
 
-    executor.shutdown();
+    LOGGER.info("running: {}", running.get());
+
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
+
+    LOGGER.info("running: {}", running.get());
 
   }
 
@@ -260,27 +280,31 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
       StatusesUserTimelineRequest request = new StatusesUserTimelineRequest();
       request.setUserId(id);
       request.setCount(config.getPageSize());
-      TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(
+      Callable providerTask = new TwitterTimelineProviderTask(
           this,
           client,
           request
       );
-      ListenableFuture future = executor.submit(providerTask);
+      LOGGER.info("Thread Created: {}", request);
+      tasks.add(providerTask);
+      Future future = executor.submit(providerTask);
       futures.add(future);
-      LOGGER.info("Thread Submitted: {}", providerTask.request);
+      LOGGER.info("Thread Submitted: {}", request);
     }
     for (String name : names) {
       StatusesUserTimelineRequest request = new StatusesUserTimelineRequest();
       request.setScreenName(name);
       request.setCount(config.getPageSize());
-      TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(
+      Callable providerTask = new TwitterTimelineProviderTask(
           this,
           client,
           request
       );
-      ListenableFuture future = executor.submit(providerTask);
+      LOGGER.info("Thread Created: {}", request);
+      tasks.add(providerTask);
+      Future future = executor.submit(providerTask);
       futures.add(future);
-      LOGGER.info("Thread Submitted: {}", providerTask.request);
+      LOGGER.info("Thread Submitted: {}", request);
     }
   }
 
@@ -294,8 +318,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
     try {
       lock.writeLock().lock();
       result = new StreamsResultSet(providerQueue);
-      result.setCounter(new DatumStatusCounter());
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
@@ -312,10 +335,6 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
 
   }
 
-  protected Queue<StreamsDatum> constructQueue() {
-    return new LinkedBlockingQueue<StreamsDatum>();
-  }
-
   public StreamsResultSet readNew(BigInteger sequence) {
     LOGGER.debug("{} readNew", STREAMS_ID);
     throw new NotImplementedException();
@@ -338,35 +357,31 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
 
   @Override
   public void cleanUp() {
-    shutdownAndAwaitTermination(executor);
-  }
-
-  void shutdownAndAwaitTermination(ExecutorService pool) {
-    pool.shutdown(); // Disable new tasks from being submitted
-    try {
-      // Wait a while for existing tasks to terminate
-      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-        pool.shutdownNow(); // Cancel currently executing tasks
-        // Wait a while for tasks to respond to being cancelled
-        if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-          System.err.println("Pool did not terminate");
-        }
-      }
-    } catch (InterruptedException ie) {
-      // (Re-)Cancel if current thread also interrupted
-      pool.shutdownNow();
-      // Preserve interrupt status
-      Thread.currentThread().interrupt();
-    }
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
   }
 
   @Override
   public boolean isRunning() {
-    if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) {
-      LOGGER.info("Completed");
+    LOGGER.debug("providerQueue.isEmpty: {}", providerQueue.isEmpty());
+    LOGGER.debug("providerQueue.size: {}", providerQueue.size());
+    LOGGER.debug("executor.isTerminated: {}", executor.isTerminated());
+    LOGGER.debug("tasks.size(): {}", tasks.size());
+    LOGGER.debug("futures.size(): {}", futures.size());
+    if ( tasks.size() > 0 && tasks.size() == futures.size() && executor.isShutdown() && executor.isTerminated() ) {
       running.set(false);
-      LOGGER.info("Exiting");
     }
+    LOGGER.debug("isRunning: ", running.get());
     return running.get();
   }
+
+  @Override
+  public Iterator<Tweet> call() {
+    prepare(config);
+    startStream();
+    do {
+      Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+    } while ( isRunning());
+    cleanUp();
+    return providerQueue.stream().map( x -> (Tweet)x.getDocument()).iterator();
+  }
 }
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
index f8816e0..6bc9822 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
@@ -30,14 +30,17 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
  *  Retrieve recent posts for a single user id.
  */
-public class TwitterTimelineProviderTask implements Runnable {
+public class TwitterTimelineProviderTask implements Callable<Iterator<Tweet>>, Runnable {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProviderTask.class);
 
@@ -46,6 +49,7 @@ public class TwitterTimelineProviderTask implements Runnable {
   protected TwitterTimelineProvider provider;
   protected Twitter client;
   protected StatusesUserTimelineRequest request;
+  protected List<Tweet> responseList;
 
   /**
    * TwitterTimelineProviderTask constructor.
@@ -68,10 +72,14 @@ public class TwitterTimelineProviderTask implements Runnable {
 
     LOGGER.info("Thread Starting: {}", request.toString());
 
+    responseList = new ArrayList<>();
+
     do {
 
       List<Tweet> statuses = client.userTimeline(request);
 
+      responseList.addAll(statuses);
+
       last_count = statuses.size();
       if( statuses.size() > 0 ) {
 
@@ -106,5 +114,9 @@ public class TwitterTimelineProviderTask implements Runnable {
   }
 
 
-
+  @Override
+  public Iterator<Tweet> call() throws Exception {
+    run();
+    return responseList.iterator();
+  }
 }
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index 90cd23d..5e9970a 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -25,12 +25,15 @@ import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.core.util.ExecutorUtils;
+import org.apache.streams.core.util.QueueUtils;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.config.TwitterFollowingConfiguration;
 import org.apache.streams.twitter.config.TwitterUserInformationConfiguration;
 import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.api.UsersLookupRequest;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.User;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -56,10 +59,12 @@ import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -73,7 +78,7 @@ import static java.util.concurrent.Executors.newSingleThreadExecutor;
 /**
  * Retrieve current profile status from a list of user ids or names.
  */
-public class TwitterUserInformationProvider implements StreamsProvider, Serializable {
+public class TwitterUserInformationProvider implements Callable<Iterator<User>>, StreamsProvider, Serializable {
 
   private static final String STREAMS_ID = "TwitterUserInformationProvider";
 
@@ -92,6 +97,8 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
 
   protected volatile Queue<StreamsDatum> providerQueue;
 
+  StreamsConfiguration streamsConfiguration;
+
   public TwitterUserInformationConfiguration getConfig() {
     return config;
   }
@@ -171,11 +178,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
   }
 
   // TODO: this should be abstracted out
-  public static ExecutorService newFixedThreadPoolWithQueueSize(int numThreads, int queueSize) {
-    return new ThreadPoolExecutor(numThreads, numThreads,
-        5000L, TimeUnit.MILLISECONDS,
-        new ArrayBlockingQueue<>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
-  }
+
 
   /**
    * TwitterUserInformationProvider constructor.
@@ -214,7 +217,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
     Objects.requireNonNull(config.getInfo());
     Objects.requireNonNull(config.getThreadsPerProvider());
 
-    StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration();
+    streamsConfiguration = StreamsConfigurator.detectConfiguration();
 
     Objects.requireNonNull(streamsConfiguration.getQueueSize());
 
@@ -228,7 +231,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
 
     try {
       lock.writeLock().lock();
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
     } finally {
       lock.writeLock().unlock();
     }
@@ -250,7 +253,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
     }
 
     executor = MoreExecutors.listeningDecorator(
-        TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(
+        ExecutorUtils.newFixedThreadPoolWithQueueSize(
             config.getThreadsPerProvider().intValue(),
             streamsConfiguration.getQueueSize().intValue()
         )
@@ -332,8 +335,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
     try {
       lock.writeLock().lock();
       result = new StreamsResultSet(providerQueue);
-      result.setCounter(new DatumStatusCounter());
-      providerQueue = constructQueue();
+      providerQueue = QueueUtils.constructQueue();
       LOGGER.debug("readCurrent: {} Documents", result.size());
     } finally {
       lock.writeLock().unlock();
@@ -343,10 +345,6 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
 
   }
 
-  protected Queue<StreamsDatum> constructQueue() {
-    return new LinkedBlockingQueue<>();
-  }
-
   public StreamsResultSet readNew(BigInteger sequence) {
     LOGGER.debug("{} readNew", STREAMS_ID);
     throw new NotImplementedException();
@@ -372,31 +370,23 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
     return running.get();
   }
 
-  void shutdownAndAwaitTermination(ExecutorService pool) {
-    pool.shutdown(); // Disable new tasks from being submitted
-    try {
-      // Wait a while for existing tasks to terminate
-      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-        pool.shutdownNow(); // Cancel currently executing tasks
-        // Wait a while for tasks to respond to being cancelled
-        if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-          System.err.println("Pool did not terminate");
-        }
-      }
-    } catch (InterruptedException ie) {
-      // (Re-)Cancel if current thread also interrupted
-      pool.shutdownNow();
-      // Preserve interrupt status
-      Thread.currentThread().interrupt();
-    }
-  }
-
   protected Twitter getTwitterClient() throws InstantiationException {
     return Twitter.getInstance(config);
   }
 
   @Override
   public void cleanUp() {
-    shutdownAndAwaitTermination(executor);
+    ExecutorUtils.shutdownAndAwaitTermination(executor);
+  }
+
+  @Override
+  public Iterator<User> call() throws Exception {
+    prepare(config);
+    startStream();
+    do {
+      Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+    } while ( isRunning());
+    cleanUp();
+    return providerQueue.stream().map( x -> (User)x.getDocument()).iterator();
   }
 }

-- 
To stop receiving notification emails like this one, please contact
sblackmon@apache.org.