You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/05/05 20:54:10 UTC
[23/52] [abbrv] git commit: Merging ryan and matt's changes Tweaks to
enable twitter-userstream-local
Merging ryan and matt's changes
Tweaks to enable twitter-userstream-local
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f1518b3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f1518b3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f1518b3d
Branch: refs/heads/sblackmon
Commit: f1518b3ddcc798c7c4baae9a16667678e2554b55
Parents: 8883b43
Author: sblackmon <sb...@w2odigital.com>
Authored: Wed Apr 2 12:17:48 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Wed Apr 2 12:17:48 2014 -0500
----------------------------------------------------------------------
pom.xml | 2 +-
streams-contrib/pom.xml | 1 +
.../processor/TwitterEventProcessor.java | 65 ++++++++++++--------
.../twitter/processor/TwitterTypeConverter.java | 48 ++-------------
.../provider/TwitterEventClassifier.java | 42 ++++++++++---
.../serializer/StreamsTwitterMapper.java | 2 +-
.../streams/twitter/test/SimpleTweetTest.java | 14 ++++-
.../streams/jackson/StreamsJacksonModule.java | 1 -
.../streams/local/builders/StreamComponent.java | 16 +++--
.../streams/local/tasks/BaseStreamsTask.java | 3 +-
streams-util/pom.xml | 4 ++
11 files changed, 110 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d8a21df..01f2c2d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,7 +87,7 @@
<kafka.version>0.8.1</kafka.version>
<zookeeper.version>3.4.5-cdh4.5.0</zookeeper.version>
<netty.version>3.8.0.Final</netty.version>
- <json-path.version>0.9.0</json-path.version>
+ <json-path.version>0.9.1</json-path.version>
<build-helper.version>1.8</build-helper.version>
</properties>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index 38f02f6..2d2d27c 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -44,6 +44,7 @@
<module>streams-persist-hdfs</module>
<module>streams-persist-kafka</module>
<module>streams-persist-mongo</module>
+ <module>streams-processor-tika</module>
<module>streams-processor-urls</module>
<module>streams-provider-datasift</module>
<module>streams-provider-facebook</module>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
index 2f2194f..abc0c1a 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
@@ -6,6 +6,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.exceptions.ActivitySerializerException;
@@ -32,7 +33,7 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable {
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterEventProcessor.class);
- private ObjectMapper mapper = StreamsTwitterMapper.getInstance();
+ private ObjectMapper mapper = new StreamsTwitterMapper();
private BlockingQueue<String> inQueue;
private Queue<StreamsDatum> outQueue;
@@ -64,18 +65,22 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable {
while(true) {
String item;
try {
- item = inQueue.poll();
+ item = inQueue.take();
if(item instanceof String && item.equals(TERMINATE)) {
LOGGER.info("Terminating!");
break;
}
- ObjectNode objectNode = (ObjectNode) mapper.readTree(item);
+ System.out.println(item);
- StreamsDatum rawDatum = new StreamsDatum(objectNode);
+ if( StringUtils.isNotEmpty(item) ) {
+ ObjectNode objectNode = (ObjectNode) mapper.readTree(item);
- for( StreamsDatum entry : process(rawDatum)) {
- outQueue.offer(entry);
+ StreamsDatum rawDatum = new StreamsDatum(objectNode);
+
+ for (StreamsDatum entry : process(rawDatum)) {
+ outQueue.offer(entry);
+ }
}
} catch (Exception e) {
@@ -166,29 +171,37 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable {
LOGGER.debug("{} processing {}", STREAMS_ID, node.getClass());
- String json = node.asText();
+ String json = null;
+ try {
+ json = mapper.writeValueAsString(node);
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ }
- // since data is coming from outside provider, we don't know what type the events are
- Class inClass = TwitterEventClassifier.detectClass(json);
+ if( StringUtils.isNotEmpty(json)) {
+
+ // since data is coming from outside provider, we don't know what type the events are
+ Class inClass = TwitterEventClassifier.detectClass(json);
+
+ // if the target is string, just pass-through
+ if (java.lang.String.class.equals(outClass))
+ return Lists.newArrayList(new StreamsDatum(json));
+ else {
+ // convert to desired format
+ Object out = null;
+ try {
+ out = convert(node, inClass, outClass);
+ } catch (ActivitySerializerException e) {
+ LOGGER.warn("Failed deserializing", e);
+ return Lists.newArrayList();
+ } catch (JsonProcessingException e) {
+ LOGGER.warn("Failed parsing JSON", e);
+ return Lists.newArrayList();
+ }
- // if the target is string, just pass-through
- if( java.lang.String.class.equals(outClass))
- return Lists.newArrayList(new StreamsDatum(json));
- else {
- // convert to desired format
- Object out = null;
- try {
- out = convert(node, inClass, outClass);
- } catch (ActivitySerializerException e) {
- LOGGER.warn("Failed deserializing", e);
- return Lists.newArrayList();
- } catch (JsonProcessingException e) {
- LOGGER.warn("Failed parsing JSON", e);
- return Lists.newArrayList();
+ if (out != null && validate(out, outClass))
+ return Lists.newArrayList(new StreamsDatum(out));
}
-
- if( out != null && validate(out, outClass))
- return Lists.newArrayList(new StreamsDatum(out));
}
return Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
index 60f2ae7..1c1e2fb 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
@@ -14,10 +14,7 @@ import org.apache.streams.twitter.pojo.Delete;
import org.apache.streams.twitter.pojo.Retweet;
import org.apache.streams.twitter.pojo.Tweet;
import org.apache.streams.twitter.provider.TwitterEventClassifier;
-import org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
+import org.apache.streams.twitter.serializer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,7 +31,7 @@ public class TwitterTypeConverter implements StreamsProcessor {
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTypeConverter.class);
- private ObjectMapper mapper = new ObjectMapper();
+ private ObjectMapper mapper = new StreamsTwitterMapper();
private Queue<StreamsDatum> inQueue;
private Queue<StreamsDatum> outQueue;
@@ -42,9 +39,7 @@ public class TwitterTypeConverter implements StreamsProcessor {
private Class inClass;
private Class outClass;
- private TwitterJsonTweetActivitySerializer twitterJsonTweetActivitySerializer = new TwitterJsonTweetActivitySerializer();
- private TwitterJsonRetweetActivitySerializer twitterJsonRetweetActivitySerializer = new TwitterJsonRetweetActivitySerializer();
- private TwitterJsonDeleteActivitySerializer twitterJsonDeleteActivitySerializer = new TwitterJsonDeleteActivitySerializer();
+ private TwitterJsonActivitySerializer twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
public final static String TERMINATE = new String("TERMINATE");
@@ -66,21 +61,10 @@ public class TwitterTypeConverter implements StreamsProcessor {
Object result = null;
if( outClass.equals( Activity.class )) {
- if( inClass.equals( Delete.class )) {
- LOGGER.debug("ACTIVITY DELETE");
- result = twitterJsonDeleteActivitySerializer.deserialize(
+ LOGGER.debug("ACTIVITY");
+ result = twitterJsonActivitySerializer.deserialize(
mapper.writeValueAsString(event));
- } else if ( inClass.equals( Retweet.class )) {
- LOGGER.debug("ACTIVITY RETWEET");
- result = twitterJsonRetweetActivitySerializer.deserialize(
- mapper.writeValueAsString(event));
- } else if ( inClass.equals( Tweet.class )) {
- LOGGER.debug("ACTIVITY TWEET");
- result = twitterJsonTweetActivitySerializer.deserialize(
- mapper.writeValueAsString(event));
- } else {
- return null;
- }
+ return result;
} else if( outClass.equals( Tweet.class )) {
if ( inClass.equals( Tweet.class )) {
LOGGER.debug("TWEET");
@@ -200,24 +184,4 @@ public class TwitterTypeConverter implements StreamsProcessor {
}
-// public void run() {
-// while(true) {
-// StreamsDatum item;
-// try {
-// item = inQueue.poll();
-// if(item.getDocument() instanceof String && item.equals(TERMINATE)) {
-// LOGGER.info("Terminating!");
-// break;
-// }
-//
-// for( StreamsDatum entry : process(item)) {
-// outQueue.offer(entry);
-// }
-//
-// } catch (Exception e) {
-// e.printStackTrace();
-//
-// }
-// }
-// }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
index d31c346..b577e42 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
@@ -1,9 +1,15 @@
package org.apache.streams.twitter.provider;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
import com.jayway.jsonassert.JsonAssert;
+import org.apache.commons.lang.StringUtils;
import org.apache.streams.twitter.pojo.Delete;
import org.apache.streams.twitter.pojo.Retweet;
import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
+
+import java.io.IOException;
/**
* Created by sblackmon on 12/13/13.
@@ -12,18 +18,36 @@ public class TwitterEventClassifier {
public static Class detectClass( String json ) {
- try {
- JsonAssert.with(json).assertNull("$.delete");
- } catch( AssertionError ae ) {
- return Delete.class;
- }
+ Preconditions.checkNotNull(json);
+ Preconditions.checkArgument(StringUtils.isNotEmpty(json));
+// try {
+// JsonAssert.with(json).assertNull("$.delete");
+// } catch( AssertionError ae ) {
+// return Delete.class;
+// }
+//
+// try {
+// JsonAssert.with(json).assertNull("$.retweeted_status");
+// } catch( AssertionError ae ) {
+// return Retweet.class;
+// }
+//
+// return Tweet.class;
+
+ ObjectNode objectNode;
try {
- JsonAssert.with(json).assertNull("$.retweeted_status");
- } catch( AssertionError ae ) {
- return Retweet.class;
+ objectNode = (ObjectNode) StreamsTwitterMapper.getInstance().readTree(json);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return null;
}
- return Tweet.class;
+ if( objectNode.findValue("retweeted_status") != null )
+ return Retweet.class;
+ else if( objectNode.findValue("delete") != null )
+ return Delete.class;
+ else
+ return Tweet.class;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
index 004e174..6b61036 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
@@ -30,7 +30,7 @@ public class StreamsTwitterMapper extends StreamsJacksonMapper {
return INSTANCE;
}
- private StreamsTwitterMapper() {
+ public StreamsTwitterMapper() {
super();
registerModule(new SimpleModule()
{
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
index 8988de0..31ddfce 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
@@ -3,14 +3,13 @@ package org.apache.streams.twitter.test;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Optional;
-import org.apache.commons.lang.StringUtils;
+import org.apache.streams.core.StreamsDatum;
import org.apache.streams.exceptions.ActivitySerializerException;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.twitter.pojo.Delete;
import org.apache.streams.twitter.pojo.Retweet;
import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.processor.TwitterTypeConverter;
import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
import org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer;
import org.junit.Assert;
@@ -82,6 +81,15 @@ public class SimpleTweetTest {
Assert.fail();
}
+ try {
+ TwitterTypeConverter converter = new TwitterTypeConverter(String.class, Activity.class);
+ converter.prepare(null);
+ converter.process(new StreamsDatum(TWITTER_JSON));
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+
assertThat(activity, is(not(nullValue())));
assertThat(activity.getId(), is(not(nullValue())));
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
index 4c8a5aa..323e40c 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
@@ -14,5 +14,4 @@ public class StreamsJacksonModule extends SimpleModule {
addDeserializer(DateTime.class, new StreamsDateTimeDeserializer(DateTime.class));
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
index 6319ba8..f5e9978 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
@@ -165,10 +165,18 @@ public class StreamComponent {
public StreamsTask createConnectedTask() {
StreamsTask task;
if(this.processor != null) {
- task = new StreamsProcessorTask((StreamsProcessor)SerializationUtil.cloneBySerialization(this.processor));
- task.addInputQueue(this.inQueue);
- for(Queue<StreamsDatum> q : this.outBound.values()) {
- task.addOutputQueue(q);
+ if(this.numTasks > 1) {
+ task = new StreamsProcessorTask((StreamsProcessor)SerializationUtil.cloneBySerialization(this.processor));
+ task.addInputQueue(this.inQueue);
+ for(Queue<StreamsDatum> q : this.outBound.values()) {
+ task.addOutputQueue(q);
+ }
+ } else {
+ task = new StreamsProcessorTask(this.processor);
+ task.addInputQueue(this.inQueue);
+ for(Queue<StreamsDatum> q : this.outBound.values()) {
+ task.addOutputQueue(q);
+ }
}
}
else if(this.writer != null) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
index 3799480..694cb76 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
@@ -3,6 +3,7 @@ package org.apache.streams.local.tasks;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.util.SerializationUtil;
import org.slf4j.Logger;
@@ -27,7 +28,7 @@ public abstract class BaseStreamsTask implements StreamsTask {
private ObjectMapper mapper;
public BaseStreamsTask() {
- this.mapper = new ObjectMapper();
+ this.mapper = new StreamsJacksonMapper();
this.mapper.registerSubtypes(Activity.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-util/pom.xml
----------------------------------------------------------------------
diff --git a/streams-util/pom.xml b/streams-util/pom.xml
index cd6d031..0a48ec9 100644
--- a/streams-util/pom.xml
+++ b/streams-util/pom.xml
@@ -39,6 +39,10 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file