You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/06/17 21:45:46 UTC
[03/15] git commit: STREAMS-83 | Updated Twitter user information
provider with running method
STREAMS-83 | Updated Twitter user information provider with running method
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/673b38ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/673b38ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/673b38ba
Branch: refs/heads/master
Commit: 673b38ba41212850794847dc71efcd782933fa17
Parents: a8ae193
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 11:39:27 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 11:39:27 2014 -0500
----------------------------------------------------------------------
.../twitter/provider/TwitterUserInformationProvider.java | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/673b38ba/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index f773e3b..b69c937 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -46,6 +46,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
public class TwitterUserInformationProvider implements StreamsProvider, Serializable
{
@@ -71,6 +72,8 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
protected DateTime start;
protected DateTime end;
+ protected final AtomicBoolean running = new AtomicBoolean();
+
private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
return new ThreadPoolExecutor(nThreads, nThreads,
5000L, TimeUnit.MILLISECONDS,
@@ -104,7 +107,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
@Override
public void startStream() {
- // no op
+ running.set(true);
}
@@ -180,6 +183,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
LOGGER.info("Providing {} docs", providerQueue.size());
StreamsResultSet result = new StreamsResultSet(providerQueue);
+ running.set(false);
LOGGER.info("Exiting");
@@ -201,6 +205,11 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
return result;
}
+ @Override
+ public boolean isRunning() {
+ return running.get();
+ }
+
void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {