You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/22 00:20:26 UTC

[48/71] [abbrv] git commit: Adding a missing package

Adding a missing package

git-svn-id: https://svn.apache.org/repos/asf/incubator/streams/branches/STREAMS-26@1572764 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2eb90cd6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2eb90cd6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2eb90cd6

Branch: refs/heads/master
Commit: 2eb90cd654e8c80fd073a570fbc573d07f644a56
Parents: 728a225
Author: sblackmon <sb...@unknown>
Authored: Thu Feb 27 22:37:40 2014 +0000
Committer: sblackmon <sb...@unknown>
Committed: Thu Feb 27 22:37:40 2014 +0000

----------------------------------------------------------------------
 .../processor/TwitterEventProcessor.java        | 195 +++++++++++++++++
 .../processor/TwitterProfileProcessor.java      | 109 ++++++++++
 .../twitter/processor/TwitterTypeConverter.java | 217 +++++++++++++++++++
 3 files changed, 521 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2eb90cd6/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
new file mode 100644
index 0000000..c3707cb
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
@@ -0,0 +1,195 @@
+package org.apache.streams.twitter.processor;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class TwitterEventProcessor implements StreamsProcessor, Runnable {
+
+    private final static String STREAMS_ID = "TwitterEventProcessor";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterEventProcessor.class);
+
+    private ObjectMapper mapper = new ObjectMapper();
+
+    private BlockingQueue<String> inQueue;
+    private Queue<StreamsDatum> outQueue;
+
+    private Class inClass;
+    private Class outClass;
+
+    private TwitterJsonTweetActivitySerializer twitterJsonTweetActivitySerializer = new TwitterJsonTweetActivitySerializer();
+    private TwitterJsonRetweetActivitySerializer twitterJsonRetweetActivitySerializer = new TwitterJsonRetweetActivitySerializer();
+    private TwitterJsonDeleteActivitySerializer twitterJsonDeleteActivitySerializer = new TwitterJsonDeleteActivitySerializer();
+
+    public final static String TERMINATE = new String("TERMINATE");
+
+    public TwitterEventProcessor(BlockingQueue<String> inQueue, Queue<StreamsDatum> outQueue, Class inClass, Class outClass) {
+        this.inQueue = inQueue;
+        this.outQueue = outQueue;
+        this.inClass = inClass;
+        this.outClass = outClass;
+    }
+
+    public TwitterEventProcessor(BlockingQueue<String> inQueue, Queue<StreamsDatum> outQueue, Class outClass) {
+        this.inQueue = inQueue;
+        this.outQueue = outQueue;
+        this.outClass = outClass;
+    }
+
+    public void run() {
+
+        while(true) {
+            String item;
+            try {
+                item = inQueue.poll();
+                if(item instanceof String && item.equals(TERMINATE)) {
+                    LOGGER.info("Terminating!");
+                    break;
+                }
+
+                ObjectNode objectNode = (ObjectNode) mapper.readTree(item);
+
+                StreamsDatum rawDatum = new StreamsDatum(objectNode);
+
+                for( StreamsDatum entry : process(rawDatum)) {
+                    outQueue.offer(entry);
+                }
+
+            } catch (Exception e) {
+                e.printStackTrace();
+
+            }
+        }
+    }
+
+    public Object convert(ObjectNode event, Class inClass, Class outClass) {
+
+        Object result = null;
+
+        if( outClass.equals( Activity.class )) {
+            if( inClass.equals( Delete.class )) {
+                LOGGER.debug("ACTIVITY DELETE");
+                result = twitterJsonDeleteActivitySerializer.convert(event);
+            } else if ( inClass.equals( Retweet.class )) {
+                LOGGER.debug("ACTIVITY RETWEET");
+                result = twitterJsonRetweetActivitySerializer.convert(event);
+            } else if ( inClass.equals( Tweet.class )) {
+                LOGGER.debug("ACTIVITY TWEET");
+                result = twitterJsonTweetActivitySerializer.convert(event);
+            } else {
+                return null;
+            }
+        } else if( outClass.equals( Tweet.class )) {
+            if ( inClass.equals( Tweet.class )) {
+                LOGGER.debug("TWEET");
+                result = mapper.convertValue(event, Tweet.class);
+            }
+        } else if( outClass.equals( Retweet.class )) {
+            if ( inClass.equals( Retweet.class )) {
+                LOGGER.debug("RETWEET");
+                result = mapper.convertValue(event, Retweet.class);
+            }
+        } else if( outClass.equals( Delete.class )) {
+            if ( inClass.equals( Delete.class )) {
+                LOGGER.debug("DELETE");
+                result = mapper.convertValue(event, Delete.class);
+            }
+        } else if( outClass.equals( ObjectNode.class )) {
+            LOGGER.debug("OBJECTNODE");
+            result = mapper.convertValue(event, ObjectNode.class);
+        }
+
+            // no supported conversion were applied
+        if( result != null )
+            return result;
+
+        LOGGER.debug("CONVERT FAILED");
+
+        return null;
+
+    }
+
+    public boolean validate(Object document, Class klass) {
+
+        // TODO
+        return true;
+    }
+
+    public boolean isValidJSON(final String json) {
+        boolean valid = false;
+        try {
+            final JsonParser parser = new ObjectMapper().getJsonFactory()
+                    .createJsonParser(json);
+            while (parser.nextToken() != null) {
+            }
+            valid = true;
+        } catch (JsonParseException jpe) {
+            LOGGER.warn("validate: {}", jpe);
+        } catch (IOException ioe) {
+            LOGGER.warn("validate: {}", ioe);
+        }
+
+        return valid;
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+
+        // first check for valid json
+        ObjectNode node = (ObjectNode) entry.getDocument();
+
+        LOGGER.debug("{} processing {}", STREAMS_ID, node.getClass());
+
+        String json = node.asText();
+
+        // since data is coming from outside provider, we don't know what type the events are
+        Class inClass = TwitterEventClassifier.detectClass(json);
+
+        // if the target is string, just pass-through
+        if( java.lang.String.class.equals(outClass))
+            return Lists.newArrayList(new StreamsDatum(json));
+        else {
+            // convert to desired format
+            Object out = convert(node, inClass, outClass);
+
+            if( out != null && validate(out, outClass))
+                return Lists.newArrayList(new StreamsDatum(out));
+        }
+
+        return Lists.newArrayList();
+
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2eb90cd6/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
new file mode 100644
index 0000000..97bcbaf
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
@@ -0,0 +1,109 @@
+package org.apache.streams.twitter.processor;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Queue;
+import java.util.Random;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class TwitterProfileProcessor implements StreamsProcessor, Runnable {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterProfileProcessor.class);
+
+    private ObjectMapper mapper = new ObjectMapper();
+
+    private Queue<StreamsDatum> inQueue;
+    private Queue<StreamsDatum> outQueue;
+
+    public final static String TERMINATE = new String("TERMINATE");
+
+    @Override
+    public void run() {
+
+        while(true) {
+            StreamsDatum item;
+                try {
+                    item = inQueue.poll();
+                    if(item.getDocument() instanceof String && item.equals(TERMINATE)) {
+                        LOGGER.info("Terminating!");
+                        break;
+                    }
+
+                    Thread.sleep(new Random().nextInt(100));
+
+                    for( StreamsDatum entry : process(item)) {
+                        outQueue.offer(entry);
+                    }
+
+
+            } catch (Exception e) {
+                e.printStackTrace();
+
+            }
+        }
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+
+        List<StreamsDatum> result = Lists.newArrayList();
+        String item;
+        try {
+            // first check for valid json
+            // since data is coming from outside provider, we don't know what type the events are
+            if( entry.getDocument() instanceof String) {
+                item = (String) entry.getDocument();
+            } else {
+                item = mapper.writeValueAsString((ObjectNode)entry.getDocument());
+            }
+
+            Class inClass = TwitterEventClassifier.detectClass(item);
+
+            User user;
+
+            if ( inClass.equals( Tweet.class )) {
+                LOGGER.debug("TWEET");
+                Tweet tweet = mapper.readValue(item, Tweet.class);
+                user = tweet.getUser();
+                result.add(new StreamsDatum(user));
+            }
+            else if ( inClass.equals( Retweet.class )) {
+                LOGGER.debug("RETWEET");
+                Retweet retweet = mapper.readValue(item, Retweet.class);
+                user = retweet.getRetweetedStatus().getUser();
+                result.add(new StreamsDatum(user));
+            } else {
+                return Lists.newArrayList();
+            }
+
+            return result;
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOGGER.warn("Error processing " + entry.toString());
+            return Lists.newArrayList();
+        }
+    }
+
+    @Override
+    public void prepare(Object o) {
+
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2eb90cd6/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
new file mode 100644
index 0000000..73744c1
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
@@ -0,0 +1,217 @@
+package org.apache.streams.twitter.processor;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class TwitterTypeConverter implements StreamsProcessor {
+
+    private final static String STREAMS_ID = "TwitterTypeConverter";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTypeConverter.class);
+
+    private ObjectMapper mapper = new ObjectMapper();
+
+    private Queue<StreamsDatum> inQueue;
+    private Queue<StreamsDatum> outQueue;
+
+    private Class inClass;
+    private Class outClass;
+
+    private TwitterJsonTweetActivitySerializer twitterJsonTweetActivitySerializer = new TwitterJsonTweetActivitySerializer();
+    private TwitterJsonRetweetActivitySerializer twitterJsonRetweetActivitySerializer = new TwitterJsonRetweetActivitySerializer();
+    private TwitterJsonDeleteActivitySerializer twitterJsonDeleteActivitySerializer = new TwitterJsonDeleteActivitySerializer();
+
+    public final static String TERMINATE = new String("TERMINATE");
+
+    public TwitterTypeConverter(Class inClass, Class outClass) {
+        this.inClass = inClass;
+        this.outClass = outClass;
+    }
+
+    public Queue<StreamsDatum> getProcessorOutputQueue() {
+        return outQueue;
+    }
+
+    public void setProcessorInputQueue(Queue<StreamsDatum> inputQueue) {
+        inQueue = inputQueue;
+    }
+
+    public Object convert(ObjectNode event, Class inClass, Class outClass) {
+
+        Object result = null;
+
+        if( outClass.equals( Activity.class )) {
+            if( inClass.equals( Delete.class )) {
+                LOGGER.debug("ACTIVITY DELETE");
+                result = twitterJsonDeleteActivitySerializer.convert(event);
+            } else if ( inClass.equals( Retweet.class )) {
+                LOGGER.debug("ACTIVITY RETWEET");
+                result = twitterJsonRetweetActivitySerializer.convert(event);
+            } else if ( inClass.equals( Tweet.class )) {
+                LOGGER.debug("ACTIVITY TWEET");
+                result = twitterJsonTweetActivitySerializer.convert(event);
+            } else {
+                return null;
+            }
+        } else if( outClass.equals( Tweet.class )) {
+            if ( inClass.equals( Tweet.class )) {
+                LOGGER.debug("TWEET");
+                result = mapper.convertValue(event, Tweet.class);
+            }
+        } else if( outClass.equals( Retweet.class )) {
+            if ( inClass.equals( Retweet.class )) {
+                LOGGER.debug("RETWEET");
+                result = mapper.convertValue(event, Retweet.class);
+            }
+        } else if( outClass.equals( Delete.class )) {
+            if ( inClass.equals( Delete.class )) {
+                LOGGER.debug("DELETE");
+                result = mapper.convertValue(event, Delete.class);
+            }
+        } else if( outClass.equals( ObjectNode.class )) {
+            LOGGER.debug("OBJECTNODE");
+            result = mapper.convertValue(event, ObjectNode.class);
+        }
+
+            // no supported conversion were applied
+        if( result != null )
+            return result;
+
+        LOGGER.debug("CONVERT FAILED");
+
+        return null;
+
+    }
+
+    public boolean validate(Object document, Class klass) {
+
+        // TODO
+        return true;
+    }
+
+    public boolean isValidJSON(final String json) {
+        boolean valid = false;
+        try {
+            final JsonParser parser = new ObjectMapper().getJsonFactory()
+                    .createJsonParser(json);
+            while (parser.nextToken() != null) {
+            }
+            valid = true;
+        } catch (JsonParseException jpe) {
+            LOGGER.warn("validate: {}", jpe);
+        } catch (IOException ioe) {
+            LOGGER.warn("validate: {}", ioe);
+        }
+
+        return valid;
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+
+        StreamsDatum result = null;
+
+        try {
+
+            Object item = entry.getDocument();
+            ObjectNode node;
+
+            LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
+
+            if( item instanceof String ) {
+
+                // if the target is string, just pass-through
+                if( String.class.equals(outClass)) {
+                    result = entry;
+                }
+                else {
+                    // first check for valid json
+                    node = (ObjectNode)mapper.readTree((String)item);
+
+                    // since data is coming from outside provider, we don't know what type the events are
+                    Class inClass = TwitterEventClassifier.detectClass((String) item);
+
+                    Object out = convert(node, inClass, outClass);
+
+                    if( out != null && validate(out, outClass))
+                        result = new StreamsDatum(out);
+                }
+
+            } else if( item instanceof ObjectNode ) {
+
+                // first check for valid json
+                node = (ObjectNode)mapper.valueToTree(item);
+
+                // since data is coming from outside provider, we don't know what type the events are
+                Class inClass = TwitterEventClassifier.detectClass((String)item);
+
+                Object out = convert(node, inClass, outClass);
+
+                if( out != null && validate(out, outClass))
+                    result = new StreamsDatum(out);
+
+            }
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        if( result != null )
+            return Lists.newArrayList(result);
+        else
+            return Lists.newArrayList();
+    }
+
+    @Override
+    public void prepare(Object o) {
+
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+
+//    public void run() {
+//        while(true) {
+//            StreamsDatum item;
+//            try {
+//                item = inQueue.poll();
+//                if(item.getDocument() instanceof String && item.equals(TERMINATE)) {
+//                    LOGGER.info("Terminating!");
+//                    break;
+//                }
+//
+//                for( StreamsDatum entry : process(item)) {
+//                    outQueue.offer(entry);
+//                }
+//
+//            } catch (Exception e) {
+//                e.printStackTrace();
+//
+//            }
+//        }
+//    }
+}