You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/08/01 14:22:57 UTC

[5/5] git commit: STREAMS-135 | Updated counting model and locking

STREAMS-135 | Updated counting model and locking


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c2d59a26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c2d59a26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c2d59a26

Branch: refs/heads/master
Commit: c2d59a268ccf905e65286d0ad4b027fbc59d3dbb
Parents: c3ae9c8
Author: mfranklin <mf...@apache.org>
Authored: Wed Jul 30 11:48:04 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Wed Jul 30 11:48:04 2014 -0400

----------------------------------------------------------------------
 .../twitter/provider/TwitterStreamProvider.java  | 19 ++++++++-----------
 1 file changed, 8 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c2d59a26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index 98d0acb..17f13e9 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -58,6 +58,7 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
     public final static String STREAMS_ID = "TwitterStreamProvider";
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProvider.class);
+    public static final int MAX_BATCH = 1000;
 
     private TwitterStreamConfiguration config;
 
@@ -77,7 +78,6 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
     protected BasicClient client;
     protected AtomicBoolean running = new AtomicBoolean(false);
     protected TwitterStreamProcessor processor = new TwitterStreamProcessor(this);
-    private final ReadWriteLock lock = new ReentrantReadWriteLock();
     private DatumStatusCounter countersCurrent = new DatumStatusCounter();
     private DatumStatusCounter countersTotal = new DatumStatusCounter();
 
@@ -106,9 +106,7 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
     public synchronized StreamsResultSet readCurrent() {
 
         StreamsResultSet current;
-
-        try {
-            lock.writeLock().lock();
+        synchronized(this) {
             Queue<StreamsDatum> drain = Queues.newLinkedBlockingDeque();
             drainTo(drain);
             current = new StreamsResultSet(drain);
@@ -116,8 +114,6 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
             current.getCounter().add(countersCurrent);
             countersTotal.add(countersCurrent);
             countersCurrent = new DatumStatusCounter();
-        } finally {
-            lock.writeLock().unlock();
         }
 
         return current;
@@ -212,7 +208,7 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
 
         LOGGER.debug("host={}\tendpoint={}\taut={}", new Object[] {hosebirdHosts,endpoint,auth});
 
-        providerQueue = new LinkedBlockingQueue<Future<List<StreamsDatum>>>(1000);
+        providerQueue = new LinkedBlockingQueue<Future<List<StreamsDatum>>>(MAX_BATCH);
 
         client = new ClientBuilder()
             .name("apache/streams/streams-contrib/streams-provider-twitter")
@@ -239,21 +235,22 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
 
     protected boolean addDatum(Future<List<StreamsDatum>> future) {
         try {
-            lock.readLock().lock();
             ComponentUtils.offerUntilSuccess(future, providerQueue);
+            countersCurrent.incrementStatus(DatumStatus.SUCCESS);
             return true;
         } catch (Exception e) {
+            countersCurrent.incrementStatus(DatumStatus.FAIL);
             LOGGER.warn("Unable to enqueue item from Twitter stream");
             return false;
-        }finally {
-            lock.readLock().unlock();
         }
     }
 
     protected void drainTo(Queue<StreamsDatum> drain) {
-        while(!providerQueue.isEmpty()) {
+        int count = 0;
+        while(!providerQueue.isEmpty() && count <= MAX_BATCH) {
             for(StreamsDatum datum : pollForDatum()) {
                 ComponentUtils.offerUntilSuccess(datum, drain);
+                count++;
             }
         }
     }