You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/08/01 14:22:55 UTC

[3/5] git commit: STREAMS-135 | Reduced number of serialization/deserialization steps

STREAMS-135 | Reduced number of serialization/deserialization steps


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/04054d43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/04054d43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/04054d43

Branch: refs/heads/master
Commit: 04054d43664267f51dda59996c8698a7c4a6e7c4
Parents: 28406d9
Author: mfranklin <mf...@apache.org>
Authored: Wed Jul 30 08:17:57 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Wed Jul 30 08:17:57 2014 -0400

----------------------------------------------------------------------
 .../processor/TwitterEventProcessor.java        | 26 ++------------
 .../provider/TwitterStreamProcessor.java        | 37 ++++++++++++++++----
 .../twitter/provider/TwitterStreamProvider.java | 20 +++++++++--
 3 files changed, 50 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/04054d43/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
index 3a42af9..fb4615f 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
@@ -49,7 +49,7 @@ import java.util.concurrent.Callable;
 /**
  * Created by sblackmon on 12/10/13.
  */
-public class TwitterEventProcessor implements StreamsProcessor, Callable<List<StreamsDatum>> {
+public class TwitterEventProcessor implements StreamsProcessor {
 
     private final static String STREAMS_ID = "TwitterEventProcessor";
 
@@ -59,36 +59,16 @@ public class TwitterEventProcessor implements StreamsProcessor, Callable<List<St
 
     private Class inClass;
     private Class outClass;
-    private String item;
 
     private TwitterJsonActivitySerializer twitterJsonActivitySerializer;
 
-    public TwitterEventProcessor(Class inClass, Class outClass, String item) {
+    public TwitterEventProcessor(Class inClass, Class outClass) {
         this.inClass = inClass;
         this.outClass = outClass;
-        this.item = item;
-    }
-
-    public TwitterEventProcessor(Class inClass, Class outClass) {
-        this(inClass, outClass, null);
     }
 
     public TwitterEventProcessor( Class outClass) {
-        this(null, outClass, null);
-    }
-
-    public TwitterEventProcessor( Class outClass, String item) {
-        this(null, outClass, item);
-    }
-
-    @Override
-    public List<StreamsDatum> call() throws Exception {
-        if(item != null) {
-            ObjectNode objectNode = (ObjectNode) mapper.readTree(item);
-            StreamsDatum rawDatum = new StreamsDatum(objectNode);
-            return process(rawDatum);
-        }
-        return Lists.newArrayList();
+        this(null, outClass);
     }
 
     public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/04054d43/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
index ec9ecc6..8fe67b5 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java
@@ -18,20 +18,21 @@
 
 package org.apache.streams.twitter.provider;
 
-import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool;
-import com.twitter.hbc.core.processor.HosebirdMessageProcessor;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
 import com.twitter.hbc.core.processor.StringDelimitedProcessor;
-import org.apache.streams.twitter.processor.TwitterEventProcessor;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
 import org.apache.streams.util.ComponentUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
 import java.io.IOException;
-import java.io.InputStream;
+import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
 /**
  *
@@ -64,10 +65,32 @@ public class TwitterStreamProcessor extends StringDelimitedProcessor {
             Thread.sleep(10);
         } while(msg == null);
 
-        return provider.addDatum(service.submit(new TwitterEventProcessor(String.class, msg)));
+        //Deserializing to an ObjectNode can take time.  Parallelize the task to improve throughput
+        return provider.addDatum(service.submit(new StreamDeserializer(msg)));
     }
 
     public void cleanUp() {
         ComponentUtils.shutdownExecutor(service, 1, 30);
     }
+
+    protected static class StreamDeserializer implements Callable<List<StreamsDatum>> {
+
+        protected static final ObjectMapper mapper = StreamsTwitterMapper.getInstance();
+
+        protected String item;
+
+        public StreamDeserializer(String item) {
+            this.item = item;
+        }
+
+        @Override
+        public List<StreamsDatum> call() throws Exception {
+            if(item != null) {
+                ObjectNode objectNode = (ObjectNode) mapper.readTree(item);
+                StreamsDatum rawDatum = new StreamsDatum(objectNode);
+                return Lists.newArrayList(rawDatum);
+            }
+            return Lists.newArrayList();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/04054d43/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index 9e1ce45..fb1e55f 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -47,6 +47,8 @@ import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * Created by sblackmon on 12/10/13.
@@ -75,6 +77,7 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
     protected BasicClient client;
     protected AtomicBoolean running = new AtomicBoolean(false);
     protected TwitterStreamProcessor processor = new TwitterStreamProcessor(this);
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
     private DatumStatusCounter countersCurrent = new DatumStatusCounter();
     private DatumStatusCounter countersTotal = new DatumStatusCounter();
 
@@ -104,7 +107,8 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
 
         StreamsResultSet current;
 
-        synchronized( TwitterStreamProvider.class ) {
+        try {
+            lock.writeLock().lock();
             Queue<StreamsDatum> drain = Queues.newLinkedBlockingDeque();
             drainTo(drain);
             current = new StreamsResultSet(drain);
@@ -112,6 +116,8 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
             current.getCounter().add(countersCurrent);
             countersTotal.add(countersCurrent);
             countersCurrent = new DatumStatusCounter();
+        } finally {
+            lock.writeLock().unlock();
         }
 
         return current;
@@ -232,8 +238,16 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
     }
 
     protected boolean addDatum(Future<List<StreamsDatum>> future) {
-        ComponentUtils.offerUntilSuccess(future, providerQueue);
-        return true;
+        try {
+            lock.readLock().lock();
+            ComponentUtils.offerUntilSuccess(future, providerQueue);
+            return true;
+        } catch (Exception e) {
+            LOGGER.warn("Unable to enqueue item from Twitter stream");
+            return false;
+        }finally {
+            lock.readLock().unlock();
+        }
     }
 
     protected void drainTo(Queue<StreamsDatum> drain) {