You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/04/17 22:28:09 UTC

[36/53] [abbrv] git commit: Merging ryan and matt's changes Tweaks to enable twitter-userstream-local

Merging ryan and matt's changes
Tweaks to enable twitter-userstream-local


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f1518b3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f1518b3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f1518b3d

Branch: refs/heads/master
Commit: f1518b3ddcc798c7c4baae9a16667678e2554b55
Parents: 8883b43
Author: sblackmon <sb...@w2odigital.com>
Authored: Wed Apr 2 12:17:48 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Wed Apr 2 12:17:48 2014 -0500

----------------------------------------------------------------------
 pom.xml                                         |  2 +-
 streams-contrib/pom.xml                         |  1 +
 .../processor/TwitterEventProcessor.java        | 65 ++++++++++++--------
 .../twitter/processor/TwitterTypeConverter.java | 48 ++-------------
 .../provider/TwitterEventClassifier.java        | 42 ++++++++++---
 .../serializer/StreamsTwitterMapper.java        |  2 +-
 .../streams/twitter/test/SimpleTweetTest.java   | 14 ++++-
 .../streams/jackson/StreamsJacksonModule.java   |  1 -
 .../streams/local/builders/StreamComponent.java | 16 +++--
 .../streams/local/tasks/BaseStreamsTask.java    |  3 +-
 streams-util/pom.xml                            |  4 ++
 11 files changed, 110 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d8a21df..01f2c2d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,7 +87,7 @@
         <kafka.version>0.8.1</kafka.version>
         <zookeeper.version>3.4.5-cdh4.5.0</zookeeper.version>
         <netty.version>3.8.0.Final</netty.version>
-        <json-path.version>0.9.0</json-path.version>
+        <json-path.version>0.9.1</json-path.version>
         <build-helper.version>1.8</build-helper.version>
     </properties>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index 38f02f6..2d2d27c 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -44,6 +44,7 @@
         <module>streams-persist-hdfs</module>
         <module>streams-persist-kafka</module>
         <module>streams-persist-mongo</module>
+        <module>streams-processor-tika</module>
         <module>streams-processor-urls</module>
         <module>streams-provider-datasift</module>
         <module>streams-provider-facebook</module>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
index 2f2194f..abc0c1a 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
@@ -6,6 +6,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.exceptions.ActivitySerializerException;
@@ -32,7 +33,7 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TwitterEventProcessor.class);
 
-    private ObjectMapper mapper = StreamsTwitterMapper.getInstance();
+    private ObjectMapper mapper = new StreamsTwitterMapper();
 
     private BlockingQueue<String> inQueue;
     private Queue<StreamsDatum> outQueue;
@@ -64,18 +65,22 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable {
         while(true) {
             String item;
             try {
-                item = inQueue.poll();
+                item = inQueue.take();
                 if(item instanceof String && item.equals(TERMINATE)) {
                     LOGGER.info("Terminating!");
                     break;
                 }
 
-                ObjectNode objectNode = (ObjectNode) mapper.readTree(item);
+                System.out.println(item);
 
-                StreamsDatum rawDatum = new StreamsDatum(objectNode);
+                if( StringUtils.isNotEmpty(item) ) {
+                    ObjectNode objectNode = (ObjectNode) mapper.readTree(item);
 
-                for( StreamsDatum entry : process(rawDatum)) {
-                    outQueue.offer(entry);
+                    StreamsDatum rawDatum = new StreamsDatum(objectNode);
+
+                    for (StreamsDatum entry : process(rawDatum)) {
+                        outQueue.offer(entry);
+                    }
                 }
 
             } catch (Exception e) {
@@ -166,29 +171,37 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable {
 
         LOGGER.debug("{} processing {}", STREAMS_ID, node.getClass());
 
-        String json = node.asText();
+        String json = null;
+        try {
+            json = mapper.writeValueAsString(node);
+        } catch (JsonProcessingException e) {
+            e.printStackTrace();
+        }
 
-        // since data is coming from outside provider, we don't know what type the events are
-        Class inClass = TwitterEventClassifier.detectClass(json);
+        if( StringUtils.isNotEmpty(json)) {
+
+            // since data is coming from outside provider, we don't know what type the events are
+            Class inClass = TwitterEventClassifier.detectClass(json);
+
+            // if the target is string, just pass-through
+            if (java.lang.String.class.equals(outClass))
+                return Lists.newArrayList(new StreamsDatum(json));
+            else {
+                // convert to desired format
+                Object out = null;
+                try {
+                    out = convert(node, inClass, outClass);
+                } catch (ActivitySerializerException e) {
+                    LOGGER.warn("Failed deserializing", e);
+                    return Lists.newArrayList();
+                } catch (JsonProcessingException e) {
+                    LOGGER.warn("Failed parsing JSON", e);
+                    return Lists.newArrayList();
+                }
 
-        // if the target is string, just pass-through
-        if( java.lang.String.class.equals(outClass))
-            return Lists.newArrayList(new StreamsDatum(json));
-        else {
-            // convert to desired format
-            Object out = null;
-            try {
-                out = convert(node, inClass, outClass);
-            } catch (ActivitySerializerException e) {
-                LOGGER.warn("Failed deserializing", e);
-                return Lists.newArrayList();
-            } catch (JsonProcessingException e) {
-                LOGGER.warn("Failed parsing JSON", e);
-                return Lists.newArrayList();
+                if (out != null && validate(out, outClass))
+                    return Lists.newArrayList(new StreamsDatum(out));
             }
-
-            if( out != null && validate(out, outClass))
-                return Lists.newArrayList(new StreamsDatum(out));
         }
 
         return Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
index 60f2ae7..1c1e2fb 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
@@ -14,10 +14,7 @@ import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.twitter.provider.TwitterEventClassifier;
-import org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
+import org.apache.streams.twitter.serializer.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,7 +31,7 @@ public class TwitterTypeConverter implements StreamsProcessor {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTypeConverter.class);
 
-    private ObjectMapper mapper = new ObjectMapper();
+    private ObjectMapper mapper = new StreamsTwitterMapper();
 
     private Queue<StreamsDatum> inQueue;
     private Queue<StreamsDatum> outQueue;
@@ -42,9 +39,7 @@ public class TwitterTypeConverter implements StreamsProcessor {
     private Class inClass;
     private Class outClass;
 
-    private TwitterJsonTweetActivitySerializer twitterJsonTweetActivitySerializer = new TwitterJsonTweetActivitySerializer();
-    private TwitterJsonRetweetActivitySerializer twitterJsonRetweetActivitySerializer = new TwitterJsonRetweetActivitySerializer();
-    private TwitterJsonDeleteActivitySerializer twitterJsonDeleteActivitySerializer = new TwitterJsonDeleteActivitySerializer();
+    private TwitterJsonActivitySerializer twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
 
     public final static String TERMINATE = new String("TERMINATE");
 
@@ -66,21 +61,10 @@ public class TwitterTypeConverter implements StreamsProcessor {
         Object result = null;
 
         if( outClass.equals( Activity.class )) {
-            if( inClass.equals( Delete.class )) {
-                LOGGER.debug("ACTIVITY DELETE");
-                result = twitterJsonDeleteActivitySerializer.deserialize(
+                LOGGER.debug("ACTIVITY");
+                result = twitterJsonActivitySerializer.deserialize(
                         mapper.writeValueAsString(event));
-            } else if ( inClass.equals( Retweet.class )) {
-                LOGGER.debug("ACTIVITY RETWEET");
-                result = twitterJsonRetweetActivitySerializer.deserialize(
-                        mapper.writeValueAsString(event));
-            } else if ( inClass.equals( Tweet.class )) {
-                LOGGER.debug("ACTIVITY TWEET");
-                result = twitterJsonTweetActivitySerializer.deserialize(
-                        mapper.writeValueAsString(event));
-            } else {
-                return null;
-            }
+                return result;
         } else if( outClass.equals( Tweet.class )) {
             if ( inClass.equals( Tweet.class )) {
                 LOGGER.debug("TWEET");
@@ -200,24 +184,4 @@ public class TwitterTypeConverter implements StreamsProcessor {
 
     }
 
-//    public void run() {
-//        while(true) {
-//            StreamsDatum item;
-//            try {
-//                item = inQueue.poll();
-//                if(item.getDocument() instanceof String && item.equals(TERMINATE)) {
-//                    LOGGER.info("Terminating!");
-//                    break;
-//                }
-//
-//                for( StreamsDatum entry : process(item)) {
-//                    outQueue.offer(entry);
-//                }
-//
-//            } catch (Exception e) {
-//                e.printStackTrace();
-//
-//            }
-//        }
-//    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
index d31c346..b577e42 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
@@ -1,9 +1,15 @@
 package org.apache.streams.twitter.provider;
 
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
 import com.jayway.jsonassert.JsonAssert;
+import org.apache.commons.lang.StringUtils;
 import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
+
+import java.io.IOException;
 
 /**
  * Created by sblackmon on 12/13/13.
@@ -12,18 +18,36 @@ public class TwitterEventClassifier {
 
     public static Class detectClass( String json ) {
 
-        try {
-            JsonAssert.with(json).assertNull("$.delete");
-        } catch( AssertionError ae ) {
-            return Delete.class;
-        }
+        Preconditions.checkNotNull(json);
+        Preconditions.checkArgument(StringUtils.isNotEmpty(json));
 
+//        try {
+//            JsonAssert.with(json).assertNull("$.delete");
+//        } catch( AssertionError ae ) {
+//            return Delete.class;
+//        }
+//
+//        try {
+//            JsonAssert.with(json).assertNull("$.retweeted_status");
+//        } catch( AssertionError ae ) {
+//            return Retweet.class;
+//        }
+//
+//        return Tweet.class;
+
+        ObjectNode objectNode;
         try {
-            JsonAssert.with(json).assertNull("$.retweeted_status");
-        } catch( AssertionError ae ) {
-            return Retweet.class;
+            objectNode = (ObjectNode) StreamsTwitterMapper.getInstance().readTree(json);
+        } catch (IOException e) {
+            e.printStackTrace();
+            return null;
         }
 
-        return Tweet.class;
+        if( objectNode.findValue("retweeted_status") != null )
+            return Retweet.class;
+        else if( objectNode.findValue("delete") != null )
+            return Delete.class;
+        else
+            return Tweet.class;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
index 004e174..6b61036 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
@@ -30,7 +30,7 @@ public class StreamsTwitterMapper extends StreamsJacksonMapper {
         return INSTANCE;
     }
 
-    private StreamsTwitterMapper() {
+    public StreamsTwitterMapper() {
         super();
         registerModule(new SimpleModule()
         {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
index 8988de0..31ddfce 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
@@ -3,14 +3,13 @@ package org.apache.streams.twitter.test;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Optional;
-import org.apache.commons.lang.StringUtils;
+import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.exceptions.ActivitySerializerException;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.processor.TwitterTypeConverter;
 import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
 import org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer;
 import org.junit.Assert;
@@ -82,6 +81,15 @@ public class SimpleTweetTest {
             Assert.fail();
         }
 
+        try {
+            TwitterTypeConverter converter = new TwitterTypeConverter(String.class, Activity.class);
+            converter.prepare(null);
+            converter.process(new StreamsDatum(TWITTER_JSON));
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail();
+        }
+
         assertThat(activity, is(not(nullValue())));
 
         assertThat(activity.getId(), is(not(nullValue())));

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
index 4c8a5aa..323e40c 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
@@ -14,5 +14,4 @@ public class StreamsJacksonModule extends SimpleModule {
         addDeserializer(DateTime.class, new StreamsDateTimeDeserializer(DateTime.class));
     }
 
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
index 6319ba8..f5e9978 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
@@ -165,10 +165,18 @@ public class StreamComponent {
     public StreamsTask createConnectedTask() {
         StreamsTask task;
         if(this.processor != null) {
-            task =  new StreamsProcessorTask((StreamsProcessor)SerializationUtil.cloneBySerialization(this.processor));
-            task.addInputQueue(this.inQueue);
-            for(Queue<StreamsDatum> q : this.outBound.values()) {
-                task.addOutputQueue(q);
+            if(this.numTasks > 1) {
+                task =  new StreamsProcessorTask((StreamsProcessor)SerializationUtil.cloneBySerialization(this.processor));
+                task.addInputQueue(this.inQueue);
+                for(Queue<StreamsDatum> q : this.outBound.values()) {
+                    task.addOutputQueue(q);
+                }
+            } else {
+                task = new StreamsProcessorTask(this.processor);
+                task.addInputQueue(this.inQueue);
+                for(Queue<StreamsDatum> q : this.outBound.values()) {
+                    task.addOutputQueue(q);
+                }
             }
         }
         else if(this.writer != null) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
index 3799480..694cb76 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
@@ -3,6 +3,7 @@ package org.apache.streams.local.tasks;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.util.SerializationUtil;
 import org.slf4j.Logger;
@@ -27,7 +28,7 @@ public abstract class BaseStreamsTask implements StreamsTask {
     private ObjectMapper mapper;
 
     public BaseStreamsTask() {
-        this.mapper = new ObjectMapper();
+        this.mapper = new StreamsJacksonMapper();
         this.mapper.registerSubtypes(Activity.class);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-util/pom.xml
----------------------------------------------------------------------
diff --git a/streams-util/pom.xml b/streams-util/pom.xml
index cd6d031..0a48ec9 100644
--- a/streams-util/pom.xml
+++ b/streams-util/pom.xml
@@ -39,6 +39,10 @@
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
+        <dependency>
+            <groupId>joda-time</groupId>
+            <artifactId>joda-time</artifactId>
+        </dependency>
 
     </dependencies>
 </project>
\ No newline at end of file