You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/08/01 14:22:57 UTC
[5/5] git commit: STREAMS-135 | Updated counting model and locking
STREAMS-135 | Updated counting model and locking
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c2d59a26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c2d59a26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c2d59a26
Branch: refs/heads/master
Commit: c2d59a268ccf905e65286d0ad4b027fbc59d3dbb
Parents: c3ae9c8
Author: mfranklin <mf...@apache.org>
Authored: Wed Jul 30 11:48:04 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Wed Jul 30 11:48:04 2014 -0400
----------------------------------------------------------------------
.../twitter/provider/TwitterStreamProvider.java | 19 ++++++++-----------
1 file changed, 8 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c2d59a26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index 98d0acb..17f13e9 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -58,6 +58,7 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
public final static String STREAMS_ID = "TwitterStreamProvider";
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProvider.class);
+ public static final int MAX_BATCH = 1000;
private TwitterStreamConfiguration config;
@@ -77,7 +78,6 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
protected BasicClient client;
protected AtomicBoolean running = new AtomicBoolean(false);
protected TwitterStreamProcessor processor = new TwitterStreamProcessor(this);
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
private DatumStatusCounter countersCurrent = new DatumStatusCounter();
private DatumStatusCounter countersTotal = new DatumStatusCounter();
@@ -106,9 +106,7 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
public synchronized StreamsResultSet readCurrent() {
StreamsResultSet current;
-
- try {
- lock.writeLock().lock();
+ synchronized(this) {
Queue<StreamsDatum> drain = Queues.newLinkedBlockingDeque();
drainTo(drain);
current = new StreamsResultSet(drain);
@@ -116,8 +114,6 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
current.getCounter().add(countersCurrent);
countersTotal.add(countersCurrent);
countersCurrent = new DatumStatusCounter();
- } finally {
- lock.writeLock().unlock();
}
return current;
@@ -212,7 +208,7 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
LOGGER.debug("host={}\tendpoint={}\taut={}", new Object[] {hosebirdHosts,endpoint,auth});
- providerQueue = new LinkedBlockingQueue<Future<List<StreamsDatum>>>(1000);
+ providerQueue = new LinkedBlockingQueue<Future<List<StreamsDatum>>>(MAX_BATCH);
client = new ClientBuilder()
.name("apache/streams/streams-contrib/streams-provider-twitter")
@@ -239,21 +235,22 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
protected boolean addDatum(Future<List<StreamsDatum>> future) {
try {
- lock.readLock().lock();
ComponentUtils.offerUntilSuccess(future, providerQueue);
+ countersCurrent.incrementStatus(DatumStatus.SUCCESS);
return true;
} catch (Exception e) {
+ countersCurrent.incrementStatus(DatumStatus.FAIL);
LOGGER.warn("Unable to enqueue item from Twitter stream");
return false;
- }finally {
- lock.readLock().unlock();
}
}
protected void drainTo(Queue<StreamsDatum> drain) {
- while(!providerQueue.isEmpty()) {
+ int count = 0;
+ while(!providerQueue.isEmpty() && count <= MAX_BATCH) {
for(StreamsDatum datum : pollForDatum()) {
ComponentUtils.offerUntilSuccess(datum, drain);
+ count++;
}
}
}