You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/22 00:20:26 UTC
[48/71] [abbrv] git commit: Adding a missing package
Adding a missing package
git-svn-id: https://svn.apache.org/repos/asf/incubator/streams/branches/STREAMS-26@1572764 13f79535-47bb-0310-9956-ffa450edef68
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2eb90cd6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2eb90cd6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2eb90cd6
Branch: refs/heads/master
Commit: 2eb90cd654e8c80fd073a570fbc573d07f644a56
Parents: 728a225
Author: sblackmon <sb...@unknown>
Authored: Thu Feb 27 22:37:40 2014 +0000
Committer: sblackmon <sb...@unknown>
Committed: Thu Feb 27 22:37:40 2014 +0000
----------------------------------------------------------------------
.../processor/TwitterEventProcessor.java | 195 +++++++++++++++++
.../processor/TwitterProfileProcessor.java | 109 ++++++++++
.../twitter/processor/TwitterTypeConverter.java | 217 +++++++++++++++++++
3 files changed, 521 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2eb90cd6/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
new file mode 100644
index 0000000..c3707cb
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
@@ -0,0 +1,195 @@
+package org.apache.streams.twitter.processor;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class TwitterEventProcessor implements StreamsProcessor, Runnable {
+
+ private final static String STREAMS_ID = "TwitterEventProcessor";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(TwitterEventProcessor.class);
+
+ private ObjectMapper mapper = new ObjectMapper();
+
+ private BlockingQueue<String> inQueue;
+ private Queue<StreamsDatum> outQueue;
+
+ private Class inClass;
+ private Class outClass;
+
+ private TwitterJsonTweetActivitySerializer twitterJsonTweetActivitySerializer = new TwitterJsonTweetActivitySerializer();
+ private TwitterJsonRetweetActivitySerializer twitterJsonRetweetActivitySerializer = new TwitterJsonRetweetActivitySerializer();
+ private TwitterJsonDeleteActivitySerializer twitterJsonDeleteActivitySerializer = new TwitterJsonDeleteActivitySerializer();
+
+ public final static String TERMINATE = new String("TERMINATE");
+
+ public TwitterEventProcessor(BlockingQueue<String> inQueue, Queue<StreamsDatum> outQueue, Class inClass, Class outClass) {
+ this.inQueue = inQueue;
+ this.outQueue = outQueue;
+ this.inClass = inClass;
+ this.outClass = outClass;
+ }
+
+ public TwitterEventProcessor(BlockingQueue<String> inQueue, Queue<StreamsDatum> outQueue, Class outClass) {
+ this.inQueue = inQueue;
+ this.outQueue = outQueue;
+ this.outClass = outClass;
+ }
+
+ public void run() {
+
+ while(true) {
+ String item;
+ try {
+ item = inQueue.poll();
+ if(item instanceof String && item.equals(TERMINATE)) {
+ LOGGER.info("Terminating!");
+ break;
+ }
+
+ ObjectNode objectNode = (ObjectNode) mapper.readTree(item);
+
+ StreamsDatum rawDatum = new StreamsDatum(objectNode);
+
+ for( StreamsDatum entry : process(rawDatum)) {
+ outQueue.offer(entry);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+
+ }
+ }
+ }
+
+ public Object convert(ObjectNode event, Class inClass, Class outClass) {
+
+ Object result = null;
+
+ if( outClass.equals( Activity.class )) {
+ if( inClass.equals( Delete.class )) {
+ LOGGER.debug("ACTIVITY DELETE");
+ result = twitterJsonDeleteActivitySerializer.convert(event);
+ } else if ( inClass.equals( Retweet.class )) {
+ LOGGER.debug("ACTIVITY RETWEET");
+ result = twitterJsonRetweetActivitySerializer.convert(event);
+ } else if ( inClass.equals( Tweet.class )) {
+ LOGGER.debug("ACTIVITY TWEET");
+ result = twitterJsonTweetActivitySerializer.convert(event);
+ } else {
+ return null;
+ }
+ } else if( outClass.equals( Tweet.class )) {
+ if ( inClass.equals( Tweet.class )) {
+ LOGGER.debug("TWEET");
+ result = mapper.convertValue(event, Tweet.class);
+ }
+ } else if( outClass.equals( Retweet.class )) {
+ if ( inClass.equals( Retweet.class )) {
+ LOGGER.debug("RETWEET");
+ result = mapper.convertValue(event, Retweet.class);
+ }
+ } else if( outClass.equals( Delete.class )) {
+ if ( inClass.equals( Delete.class )) {
+ LOGGER.debug("DELETE");
+ result = mapper.convertValue(event, Delete.class);
+ }
+ } else if( outClass.equals( ObjectNode.class )) {
+ LOGGER.debug("OBJECTNODE");
+ result = mapper.convertValue(event, ObjectNode.class);
+ }
+
+ // no supported conversion were applied
+ if( result != null )
+ return result;
+
+ LOGGER.debug("CONVERT FAILED");
+
+ return null;
+
+ }
+
+ public boolean validate(Object document, Class klass) {
+
+ // TODO
+ return true;
+ }
+
+ public boolean isValidJSON(final String json) {
+ boolean valid = false;
+ try {
+ final JsonParser parser = new ObjectMapper().getJsonFactory()
+ .createJsonParser(json);
+ while (parser.nextToken() != null) {
+ }
+ valid = true;
+ } catch (JsonParseException jpe) {
+ LOGGER.warn("validate: {}", jpe);
+ } catch (IOException ioe) {
+ LOGGER.warn("validate: {}", ioe);
+ }
+
+ return valid;
+ }
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+
+ // first check for valid json
+ ObjectNode node = (ObjectNode) entry.getDocument();
+
+ LOGGER.debug("{} processing {}", STREAMS_ID, node.getClass());
+
+ String json = node.asText();
+
+ // since data is coming from outside provider, we don't know what type the events are
+ Class inClass = TwitterEventClassifier.detectClass(json);
+
+ // if the target is string, just pass-through
+ if( java.lang.String.class.equals(outClass))
+ return Lists.newArrayList(new StreamsDatum(json));
+ else {
+ // convert to desired format
+ Object out = convert(node, inClass, outClass);
+
+ if( out != null && validate(out, outClass))
+ return Lists.newArrayList(new StreamsDatum(out));
+ }
+
+ return Lists.newArrayList();
+
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
+};
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2eb90cd6/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
new file mode 100644
index 0000000..97bcbaf
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
@@ -0,0 +1,109 @@
+package org.apache.streams.twitter.processor;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Queue;
+import java.util.Random;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class TwitterProfileProcessor implements StreamsProcessor, Runnable {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(TwitterProfileProcessor.class);
+
+ private ObjectMapper mapper = new ObjectMapper();
+
+ private Queue<StreamsDatum> inQueue;
+ private Queue<StreamsDatum> outQueue;
+
+ public final static String TERMINATE = new String("TERMINATE");
+
+ @Override
+ public void run() {
+
+ while(true) {
+ StreamsDatum item;
+ try {
+ item = inQueue.poll();
+ if(item.getDocument() instanceof String && item.equals(TERMINATE)) {
+ LOGGER.info("Terminating!");
+ break;
+ }
+
+ Thread.sleep(new Random().nextInt(100));
+
+ for( StreamsDatum entry : process(item)) {
+ outQueue.offer(entry);
+ }
+
+
+ } catch (Exception e) {
+ e.printStackTrace();
+
+ }
+ }
+ }
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+
+ List<StreamsDatum> result = Lists.newArrayList();
+ String item;
+ try {
+ // first check for valid json
+ // since data is coming from outside provider, we don't know what type the events are
+ if( entry.getDocument() instanceof String) {
+ item = (String) entry.getDocument();
+ } else {
+ item = mapper.writeValueAsString((ObjectNode)entry.getDocument());
+ }
+
+ Class inClass = TwitterEventClassifier.detectClass(item);
+
+ User user;
+
+ if ( inClass.equals( Tweet.class )) {
+ LOGGER.debug("TWEET");
+ Tweet tweet = mapper.readValue(item, Tweet.class);
+ user = tweet.getUser();
+ result.add(new StreamsDatum(user));
+ }
+ else if ( inClass.equals( Retweet.class )) {
+ LOGGER.debug("RETWEET");
+ Retweet retweet = mapper.readValue(item, Retweet.class);
+ user = retweet.getRetweetedStatus().getUser();
+ result.add(new StreamsDatum(user));
+ } else {
+ return Lists.newArrayList();
+ }
+
+ return result;
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn("Error processing " + entry.toString());
+ return Lists.newArrayList();
+ }
+ }
+
+ @Override
+ public void prepare(Object o) {
+
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
+};
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2eb90cd6/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
new file mode 100644
index 0000000..73744c1
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
@@ -0,0 +1,217 @@
+package org.apache.streams.twitter.processor;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class TwitterTypeConverter implements StreamsProcessor {
+
+ private final static String STREAMS_ID = "TwitterTypeConverter";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTypeConverter.class);
+
+ private ObjectMapper mapper = new ObjectMapper();
+
+ private Queue<StreamsDatum> inQueue;
+ private Queue<StreamsDatum> outQueue;
+
+ private Class inClass;
+ private Class outClass;
+
+ private TwitterJsonTweetActivitySerializer twitterJsonTweetActivitySerializer = new TwitterJsonTweetActivitySerializer();
+ private TwitterJsonRetweetActivitySerializer twitterJsonRetweetActivitySerializer = new TwitterJsonRetweetActivitySerializer();
+ private TwitterJsonDeleteActivitySerializer twitterJsonDeleteActivitySerializer = new TwitterJsonDeleteActivitySerializer();
+
+ public final static String TERMINATE = new String("TERMINATE");
+
+ public TwitterTypeConverter(Class inClass, Class outClass) {
+ this.inClass = inClass;
+ this.outClass = outClass;
+ }
+
+ public Queue<StreamsDatum> getProcessorOutputQueue() {
+ return outQueue;
+ }
+
+ public void setProcessorInputQueue(Queue<StreamsDatum> inputQueue) {
+ inQueue = inputQueue;
+ }
+
+ public Object convert(ObjectNode event, Class inClass, Class outClass) {
+
+ Object result = null;
+
+ if( outClass.equals( Activity.class )) {
+ if( inClass.equals( Delete.class )) {
+ LOGGER.debug("ACTIVITY DELETE");
+ result = twitterJsonDeleteActivitySerializer.convert(event);
+ } else if ( inClass.equals( Retweet.class )) {
+ LOGGER.debug("ACTIVITY RETWEET");
+ result = twitterJsonRetweetActivitySerializer.convert(event);
+ } else if ( inClass.equals( Tweet.class )) {
+ LOGGER.debug("ACTIVITY TWEET");
+ result = twitterJsonTweetActivitySerializer.convert(event);
+ } else {
+ return null;
+ }
+ } else if( outClass.equals( Tweet.class )) {
+ if ( inClass.equals( Tweet.class )) {
+ LOGGER.debug("TWEET");
+ result = mapper.convertValue(event, Tweet.class);
+ }
+ } else if( outClass.equals( Retweet.class )) {
+ if ( inClass.equals( Retweet.class )) {
+ LOGGER.debug("RETWEET");
+ result = mapper.convertValue(event, Retweet.class);
+ }
+ } else if( outClass.equals( Delete.class )) {
+ if ( inClass.equals( Delete.class )) {
+ LOGGER.debug("DELETE");
+ result = mapper.convertValue(event, Delete.class);
+ }
+ } else if( outClass.equals( ObjectNode.class )) {
+ LOGGER.debug("OBJECTNODE");
+ result = mapper.convertValue(event, ObjectNode.class);
+ }
+
+ // no supported conversion were applied
+ if( result != null )
+ return result;
+
+ LOGGER.debug("CONVERT FAILED");
+
+ return null;
+
+ }
+
+ public boolean validate(Object document, Class klass) {
+
+ // TODO
+ return true;
+ }
+
+ public boolean isValidJSON(final String json) {
+ boolean valid = false;
+ try {
+ final JsonParser parser = new ObjectMapper().getJsonFactory()
+ .createJsonParser(json);
+ while (parser.nextToken() != null) {
+ }
+ valid = true;
+ } catch (JsonParseException jpe) {
+ LOGGER.warn("validate: {}", jpe);
+ } catch (IOException ioe) {
+ LOGGER.warn("validate: {}", ioe);
+ }
+
+ return valid;
+ }
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+
+ StreamsDatum result = null;
+
+ try {
+
+ Object item = entry.getDocument();
+ ObjectNode node;
+
+ LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
+
+ if( item instanceof String ) {
+
+ // if the target is string, just pass-through
+ if( String.class.equals(outClass)) {
+ result = entry;
+ }
+ else {
+ // first check for valid json
+ node = (ObjectNode)mapper.readTree((String)item);
+
+ // since data is coming from outside provider, we don't know what type the events are
+ Class inClass = TwitterEventClassifier.detectClass((String) item);
+
+ Object out = convert(node, inClass, outClass);
+
+ if( out != null && validate(out, outClass))
+ result = new StreamsDatum(out);
+ }
+
+ } else if( item instanceof ObjectNode ) {
+
+ // first check for valid json
+ node = (ObjectNode)mapper.valueToTree(item);
+
+ // since data is coming from outside provider, we don't know what type the events are
+ Class inClass = TwitterEventClassifier.detectClass((String)item);
+
+ Object out = convert(node, inClass, outClass);
+
+ if( out != null && validate(out, outClass))
+ result = new StreamsDatum(out);
+
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ if( result != null )
+ return Lists.newArrayList(result);
+ else
+ return Lists.newArrayList();
+ }
+
+ @Override
+ public void prepare(Object o) {
+
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
+
+// public void run() {
+// while(true) {
+// StreamsDatum item;
+// try {
+// item = inQueue.poll();
+// if(item.getDocument() instanceof String && item.equals(TERMINATE)) {
+// LOGGER.info("Terminating!");
+// break;
+// }
+//
+// for( StreamsDatum entry : process(item)) {
+// outQueue.offer(entry);
+// }
+//
+// } catch (Exception e) {
+// e.printStackTrace();
+//
+// }
+// }
+// }
+}