You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/10/21 16:26:12 UTC

incubator-streams git commit: wrap up STREAMS-403 and STREAMS-425

Repository: incubator-streams
Updated Branches:
  refs/heads/master 11e3a0f1b -> 48d54c290


wrap up STREAMS-403 and STREAMS-425


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/48d54c29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/48d54c29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/48d54c29

Branch: refs/heads/master
Commit: 48d54c2902050e2f76f0ea1618da7cc1cadbd574
Parents: 11e3a0f
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Fri Oct 21 11:25:57 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Fri Oct 21 11:25:57 2016 -0500

----------------------------------------------------------------------
 .../provider/InstagramAbstractProvider.java     | 42 +++++++++++++++-----
 1 file changed, 32 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/48d54c29/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
index 8bbd900..fe4b8da 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
@@ -16,6 +16,10 @@ package org.apache.streams.instagram.provider;
 
 import com.google.common.collect.Queues;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
@@ -31,12 +35,16 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.math.BigInteger;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -53,8 +61,11 @@ public abstract class InstagramAbstractProvider implements StreamsProvider {
 
     protected InstagramConfiguration config;
     private InstagramDataCollector dataCollector;
-    protected Queue<StreamsDatum> dataQueue; //exposed for testing
-    private ExecutorService executorService;
+    protected Queue<StreamsDatum> dataQueue;
+    private ListeningExecutorService executorService;
+
+    List<ListenableFuture<Object>> futures = new ArrayList<>();
+
     private AtomicBoolean isCompleted;
 
     public InstagramAbstractProvider() {
@@ -65,6 +76,12 @@ public abstract class InstagramAbstractProvider implements StreamsProvider {
         this.config = SerializationUtil.cloneBySerialization(config);
     }
 
+    public static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
+        return new ThreadPoolExecutor(nThreads, nThreads,
+                5000L, TimeUnit.MILLISECONDS,
+                new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
+    }
+
     @Override
     public String getId() {
         return STREAMS_ID;
@@ -73,8 +90,9 @@ public abstract class InstagramAbstractProvider implements StreamsProvider {
     @Override
     public void startStream() {
         this.dataCollector = getInstagramDataCollector();
-        this.executorService = Executors.newSingleThreadExecutor();
-        this.executorService.submit(this.dataCollector);
+        this.executorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+        ListenableFuture future = this.executorService.submit(this.dataCollector);
+        this.futures.add(future);
     }
 
     /**
@@ -92,7 +110,6 @@ public abstract class InstagramAbstractProvider implements StreamsProvider {
             ComponentUtils.offerUntilSuccess(ComponentUtils.pollWhileNotEmpty(this.dataQueue), batch);
             ++count;
         }
-        this.isCompleted.set(batch.size() == 0 && this.dataQueue.isEmpty() && this.dataCollector.isCompleted());
         return new StreamsResultSet(batch);
     }
 
@@ -107,11 +124,6 @@ public abstract class InstagramAbstractProvider implements StreamsProvider {
     }
 
     @Override
-    public boolean isRunning() {
-        return !this.isCompleted.get();
-    }
-
-    @Override
     public void prepare(Object configurationObject) {
         this.dataQueue = Queues.newConcurrentLinkedQueue();
         this.isCompleted = new AtomicBoolean(false);
@@ -204,4 +216,14 @@ public abstract class InstagramAbstractProvider implements StreamsProvider {
         return usersInfo;
     }
 
+    @Override
+    public boolean isRunning() {
+        if (dataQueue.isEmpty() && executorService.isTerminated() && Futures.allAsList(futures).isDone()) {
+            LOGGER.info("Completed");
+            isCompleted.set(true);
+            LOGGER.info("Exiting");
+        }
+        return !isCompleted.get();
+    }
+
 }