You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/05/27 15:43:59 UTC

[5/9] git commit: STREAMS-86 | Added a Reentrant Read/Write lock to ensure that the provider queue is only accessed when appropriate. This helps to make sure that we don't drop data between this provider any any subsequent components

STREAMS-86 | Added a Reentrant Read/Write lock to ensure that the provider queue is only accessed when appropriate. This helps to make sure that we don't drop data between this provider any any subsequent components


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b48c03d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b48c03d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b48c03d2

Branch: refs/heads/master
Commit: b48c03d2498b13e3ea6eb85906e1c616249ac174
Parents: 252801c
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Mon May 19 13:33:47 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Tue May 27 09:33:20 2014 -0400

----------------------------------------------------------------------
 .../provider/TwitterTimelineProvider.java       | 36 ++++++++++++++++----
 1 file changed, 29 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b48c03d2/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index e4b0460..f5b5d24 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -2,12 +2,17 @@ package org.apache.streams.twitter.provider;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Queues;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.typesafe.config.Config;
 import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
@@ -24,6 +29,10 @@ import twitter4j.json.DataObjectFactory;
 import java.io.Serializable;
 import java.lang.Math;
 import java.math.BigInteger;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -45,6 +54,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
     private TwitterStreamConfiguration config;
 
     private Class klass;
+    private ReadWriteLock lock = new ReentrantReadWriteLock();
 
     public TwitterStreamConfiguration getConfig() {
         return config;
@@ -54,12 +64,14 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
         this.config = config;
     }
 
-    protected final Queue<StreamsDatum> providerQueue = Queues.synchronizedQueue(new ArrayBlockingQueue<StreamsDatum>(5000));
+    protected volatile Queue<StreamsDatum> providerQueue;
 
     protected int idsCount;
     protected Twitter client;
     protected Iterator<Long> ids;
 
+    ListenableFuture providerTaskComplete;
+
     protected ListeningExecutorService executor;
 
     protected DateTime start;
@@ -116,14 +128,12 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
             int keepTrying = 0;
 
             // keep trying to load, give it 5 attempts.
-            //while (keepTrying < 10)
-            while (keepTrying < 1)
+            while (keepTrying < 5)
             {
 
                 try
                 {
                     statuses = client.getUserTimeline(currentId, paging);
-
                     for (Status tStat : statuses) {
                         String json = TwitterObjectFactory.getRawJSON(tStat);
                         ComponentUtils.offerUntilSuccess(new StreamsDatum(json), providerQueue);
@@ -172,6 +182,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
         LOGGER.debug("{} readCurrent", STREAMS_ID);
 
         Preconditions.checkArgument(ids.hasNext());
+        StreamsResultSet result;
 
         StreamsResultSet current;
 
@@ -189,7 +200,14 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
 
         LOGGER.info("Providing {} docs", providerQueue.size());
 
-        StreamsResultSet result =  new StreamsResultSet(providerQueue);
+        try {
+            lock.writeLock().lock();
+            result = new StreamsResultSet(providerQueue);
+            result.setCounter(new DatumStatusCounter());
+            providerQueue = constructQueue();
+        } finally {
+            lock.writeLock().unlock();
+        }
 
         LOGGER.info("Exiting");
 
@@ -197,6 +215,10 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
 
     }
 
+    private Queue<StreamsDatum> constructQueue() {
+        return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
+    }
+
     public StreamsResultSet readNew(BigInteger sequence) {
         LOGGER.debug("{} readNew", STREAMS_ID);
         throw new NotImplementedException();
@@ -248,7 +270,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
         client = getTwitterClient();
     }
 
-    
+
     protected Twitter getTwitterClient()
     {
         String baseUrl = "https://api.twitter.com:443/1.1/";
@@ -272,4 +294,4 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
     public void cleanUp() {
         shutdownAndAwaitTermination(executor);
     }
-}
+}
\ No newline at end of file