You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/06/17 21:45:46 UTC

[03/15] git commit: STREAMS-83 | Updated Twitter user information provider with running method

STREAMS-83 | Updated Twitter user information provider with running method


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/673b38ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/673b38ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/673b38ba

Branch: refs/heads/master
Commit: 673b38ba41212850794847dc71efcd782933fa17
Parents: a8ae193
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 11:39:27 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 11:39:27 2014 -0500

----------------------------------------------------------------------
 .../twitter/provider/TwitterUserInformationProvider.java | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/673b38ba/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index f773e3b..b69c937 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -46,6 +46,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class TwitterUserInformationProvider implements StreamsProvider, Serializable
 {
@@ -71,6 +72,8 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
     protected DateTime start;
     protected DateTime end;
 
+    protected final AtomicBoolean running = new AtomicBoolean();
+
     private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
         return new ThreadPoolExecutor(nThreads, nThreads,
                 5000L, TimeUnit.MILLISECONDS,
@@ -104,7 +107,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
 
     @Override
     public void startStream() {
-        // no op
+        running.set(true);
     }
 
 
@@ -180,6 +183,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
         LOGGER.info("Providing {} docs", providerQueue.size());
 
         StreamsResultSet result =  new StreamsResultSet(providerQueue);
+        running.set(false);
 
         LOGGER.info("Exiting");
 
@@ -201,6 +205,11 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
         return result;
     }
 
+    @Override
+    public boolean isRunning() {
+        return running.get();
+    }
+
     void shutdownAndAwaitTermination(ExecutorService pool) {
         pool.shutdown(); // Disable new tasks from being submitted
         try {