You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/08/01 14:22:55 UTC
[3/5] git commit: STREAMS-135 | Reduced number of
serialization/deserialization steps
STREAMS-135 | Reduced number of serialization/deserialization steps
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/04054d43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/04054d43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/04054d43
Branch: refs/heads/master
Commit: 04054d43664267f51dda59996c8698a7c4a6e7c4
Parents: 28406d9
Author: mfranklin <mf...@apache.org>
Authored: Wed Jul 30 08:17:57 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Wed Jul 30 08:17:57 2014 -0400
----------------------------------------------------------------------
.../processor/TwitterEventProcessor.java | 26 ++------------
.../provider/TwitterStreamProcessor.java | 37 ++++++++++++++++----
.../twitter/provider/TwitterStreamProvider.java | 20 +++++++++--
3 files changed, 50 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/04054d43/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
index 3a42af9..fb4615f 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
@@ -49,7 +49,7 @@ import java.util.concurrent.Callable;
/**
* Created by sblackmon on 12/10/13.
*/
-public class TwitterEventProcessor implements StreamsProcessor, Callable<List<StreamsDatum>> {
+public class TwitterEventProcessor implements StreamsProcessor {
private final static String STREAMS_ID = "TwitterEventProcessor";
@@ -59,36 +59,16 @@ public class TwitterEventProcessor implements StreamsProcessor, Callable<List<St
private Class inClass;
private Class outClass;
- private String item;
private TwitterJsonActivitySerializer twitterJsonActivitySerializer;
- public TwitterEventProcessor(Class inClass, Class outClass, String item) {
+ public TwitterEventProcessor(Class inClass, Class outClass) {
this.inClass = inClass;
this.outClass = outClass;
- this.item = item;
- }
-
- public TwitterEventProcessor(Class inClass, Class outClass) {
- this(inClass, outClass, null);
}
public TwitterEventProcessor( Class outClass) {
- this(null, outClass, null);
- }
-
- public TwitterEventProcessor( Class outClass, String item) {
- this(null, outClass, item);
- }
-
- @Override
- public List<StreamsDatum> call() throws Exception {
- if(item != null) {
- ObjectNode objectNode = (ObjectNode) mapper.readTree(item);
- StreamsDatum rawDatum = new StreamsDatum(objectNode);
- return process(rawDatum);
- }
- return Lists.newArrayList();
+ this(null, outClass);
}
public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/04054d43/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
index ec9ecc6..8fe67b5 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
@@ -18,20 +18,21 @@
package org.apache.streams.twitter.provider;
-import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool;
-import com.twitter.hbc.core.processor.HosebirdMessageProcessor;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
-import org.apache.streams.twitter.processor.TwitterEventProcessor;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
import org.apache.streams.util.ComponentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
import java.io.IOException;
-import java.io.InputStream;
+import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
/**
*
@@ -64,10 +65,32 @@ public class TwitterStreamProcessor extends StringDelimitedProcessor {
Thread.sleep(10);
} while(msg == null);
- return provider.addDatum(service.submit(new TwitterEventProcessor(String.class, msg)));
+ //Deserializing to an ObjectNode can take time. Parallelize the task to improve throughput
+ return provider.addDatum(service.submit(new StreamDeserializer(msg)));
}
public void cleanUp() {
ComponentUtils.shutdownExecutor(service, 1, 30);
}
+
+ protected static class StreamDeserializer implements Callable<List<StreamsDatum>> {
+
+ protected static final ObjectMapper mapper = StreamsTwitterMapper.getInstance();
+
+ protected String item;
+
+ public StreamDeserializer(String item) {
+ this.item = item;
+ }
+
+ @Override
+ public List<StreamsDatum> call() throws Exception {
+ if(item != null) {
+ ObjectNode objectNode = (ObjectNode) mapper.readTree(item);
+ StreamsDatum rawDatum = new StreamsDatum(objectNode);
+ return Lists.newArrayList(rawDatum);
+ }
+ return Lists.newArrayList();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/04054d43/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index 9e1ce45..fb1e55f 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -47,6 +47,8 @@ import java.util.List;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Created by sblackmon on 12/10/13.
@@ -75,6 +77,7 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
protected BasicClient client;
protected AtomicBoolean running = new AtomicBoolean(false);
protected TwitterStreamProcessor processor = new TwitterStreamProcessor(this);
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
private DatumStatusCounter countersCurrent = new DatumStatusCounter();
private DatumStatusCounter countersTotal = new DatumStatusCounter();
@@ -104,7 +107,8 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
StreamsResultSet current;
- synchronized( TwitterStreamProvider.class ) {
+ try {
+ lock.writeLock().lock();
Queue<StreamsDatum> drain = Queues.newLinkedBlockingDeque();
drainTo(drain);
current = new StreamsResultSet(drain);
@@ -112,6 +116,8 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
current.getCounter().add(countersCurrent);
countersTotal.add(countersCurrent);
countersCurrent = new DatumStatusCounter();
+ } finally {
+ lock.writeLock().unlock();
}
return current;
@@ -232,8 +238,16 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
}
protected boolean addDatum(Future<List<StreamsDatum>> future) {
- ComponentUtils.offerUntilSuccess(future, providerQueue);
- return true;
+ try {
+ lock.readLock().lock();
+ ComponentUtils.offerUntilSuccess(future, providerQueue);
+ return true;
+ } catch (Exception e) {
+ LOGGER.warn("Unable to enqueue item from Twitter stream");
+ return false;
+ }finally {
+ lock.readLock().unlock();
+ }
}
protected void drainTo(Queue<StreamsDatum> drain) {