You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/06/17 21:45:50 UTC
[07/15] git commit: STREAMS-83 | Updated twitter timeline provider
with running method
STREAMS-83 | Updated twitter timeline provider with running method
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/1327b3a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/1327b3a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/1327b3a3
Branch: refs/heads/master
Commit: 1327b3a38036e78af427adb5d7b122882ce49cc7
Parents: 82858eb
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 12:16:47 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 12:16:47 2014 -0500
----------------------------------------------------------------------
.../twitter/provider/TwitterTimelineProvider.java | 12 ++++++++++--
1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1327b3a3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 27ba777..9f9d524 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -42,6 +42,7 @@ import twitter4j.conf.ConfigurationBuilder;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.Iterator;
@@ -86,6 +87,8 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
protected DateTime start;
protected DateTime end;
+ protected final AtomicBoolean running = new AtomicBoolean();
+
Boolean jsonStoreEnabled;
Boolean includeEntitiesEnabled;
@@ -225,7 +228,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
} finally {
lock.writeLock().unlock();
}
-
+ running.set(false);
LOGGER.info("Exiting");
return result;
@@ -246,6 +249,11 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
throw new NotImplementedException();
}
+ @Override
+ public boolean isRunning() {
+ return running.get();
+ }
+
void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
@@ -269,7 +277,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
public void prepare(Object o) {
executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
-
+ running.set(true);
try {
lock.writeLock().lock();
providerQueue = constructQueue();