You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/10/21 16:26:12 UTC
incubator-streams git commit: wrap up STREAMS-403 and STREAMS-425
Repository: incubator-streams
Updated Branches:
refs/heads/master 11e3a0f1b -> 48d54c290
wrap up STREAMS-403 and STREAMS-425
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/48d54c29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/48d54c29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/48d54c29
Branch: refs/heads/master
Commit: 48d54c2902050e2f76f0ea1618da7cc1cadbd574
Parents: 11e3a0f
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Fri Oct 21 11:25:57 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Fri Oct 21 11:25:57 2016 -0500
----------------------------------------------------------------------
.../provider/InstagramAbstractProvider.java | 42 +++++++++++++++-----
1 file changed, 32 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/48d54c29/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
index 8bbd900..fe4b8da 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
@@ -16,6 +16,10 @@ package org.apache.streams.instagram.provider;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
@@ -31,12 +35,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigInteger;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -53,8 +61,11 @@ public abstract class InstagramAbstractProvider implements StreamsProvider {
protected InstagramConfiguration config;
private InstagramDataCollector dataCollector;
- protected Queue<StreamsDatum> dataQueue; //exposed for testing
- private ExecutorService executorService;
+ protected Queue<StreamsDatum> dataQueue;
+ private ListeningExecutorService executorService;
+
+ List<ListenableFuture<Object>> futures = new ArrayList<>();
+
private AtomicBoolean isCompleted;
public InstagramAbstractProvider() {
@@ -65,6 +76,12 @@ public abstract class InstagramAbstractProvider implements StreamsProvider {
this.config = SerializationUtil.cloneBySerialization(config);
}
+ public static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
+ return new ThreadPoolExecutor(nThreads, nThreads,
+ 5000L, TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
+ }
+
@Override
public String getId() {
return STREAMS_ID;
@@ -73,8 +90,9 @@ public abstract class InstagramAbstractProvider implements StreamsProvider {
@Override
public void startStream() {
this.dataCollector = getInstagramDataCollector();
- this.executorService = Executors.newSingleThreadExecutor();
- this.executorService.submit(this.dataCollector);
+ this.executorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+ ListenableFuture future = this.executorService.submit(this.dataCollector);
+ this.futures.add(future);
}
/**
@@ -92,7 +110,6 @@ public abstract class InstagramAbstractProvider implements StreamsProvider {
ComponentUtils.offerUntilSuccess(ComponentUtils.pollWhileNotEmpty(this.dataQueue), batch);
++count;
}
- this.isCompleted.set(batch.size() == 0 && this.dataQueue.isEmpty() && this.dataCollector.isCompleted());
return new StreamsResultSet(batch);
}
@@ -107,11 +124,6 @@ public abstract class InstagramAbstractProvider implements StreamsProvider {
}
@Override
- public boolean isRunning() {
- return !this.isCompleted.get();
- }
-
- @Override
public void prepare(Object configurationObject) {
this.dataQueue = Queues.newConcurrentLinkedQueue();
this.isCompleted = new AtomicBoolean(false);
@@ -204,4 +216,14 @@ public abstract class InstagramAbstractProvider implements StreamsProvider {
return usersInfo;
}
+ @Override
+ public boolean isRunning() {
+ if (dataQueue.isEmpty() && executorService.isTerminated() && Futures.allAsList(futures).isDone()) {
+ LOGGER.info("Completed");
+ isCompleted.set(true);
+ LOGGER.info("Exiting");
+ }
+ return !isCompleted.get();
+ }
+
}