You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/22 00:20:17 UTC

[39/71] [abbrv] git commit: adding a few missing files

adding a few missing files

git-svn-id: https://svn.apache.org/repos/asf/incubator/streams/branches/STREAMS-26@1571490 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c0281362
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c0281362
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c0281362

Branch: refs/heads/master
Commit: c0281362be2062cc9199c48070b4bd3e16b9c142
Parents: 3d7ff8f
Author: sblackmon <sb...@unknown>
Authored: Mon Feb 24 23:34:13 2014 +0000
Committer: sblackmon <sb...@unknown>
Committed: Mon Feb 24 23:34:13 2014 +0000

----------------------------------------------------------------------
 .../streams-provider-rss.iml                    |  16 +-
 .../provider/TwitterProfileProcessor.java       | 111 +++++++++++
 .../twitter/provider/TwitterTypeConverter.java  | 199 +++++++++++++++++++
 .../apache/streams/util/SerializationUtil.java  |  50 +++++
 4 files changed, 371 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c0281362/streams-contrib/streams-provider-rss/streams-provider-rss.iml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/streams-provider-rss.iml b/streams-contrib/streams-provider-rss/streams-provider-rss.iml
index f27b242..2846a74 100644
--- a/streams-contrib/streams-provider-rss/streams-provider-rss.iml
+++ b/streams-contrib/streams-provider-rss/streams-provider-rss.iml
@@ -1,15 +1,20 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<module type="JAVA_MODULE" version="4">
+<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
   <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_6" inherit-compiler-output="false">
     <output url="file://$MODULE_DIR$/target/classes" />
     <output-test url="file://$MODULE_DIR$/target/test-classes" />
     <content url="file://$MODULE_DIR$">
-      <sourceFolder url="file://$MODULE_DIR$/target/generated-sources/jsonschema2pojo" isTestSource="false" generated="true" />
       <sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
       <sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
       <sourceFolder url="file://$MODULE_DIR$/src/main/resources" type="java-resource" />
       <sourceFolder url="file://$MODULE_DIR$/src/test/resources" type="java-test-resource" />
-      <excludeFolder url="file://$MODULE_DIR$/target" />
+      <sourceFolder url="file://$MODULE_DIR$/target/generated-sources/jsonschema2pojo" isTestSource="false" generated="true" />
+      <excludeFolder url="file://$MODULE_DIR$/target/classes" />
+      <excludeFolder url="file://$MODULE_DIR$/target/maven-archiver" />
+      <excludeFolder url="file://$MODULE_DIR$/target/maven-shared-archive-resources" />
+      <excludeFolder url="file://$MODULE_DIR$/target/maven-status" />
+      <excludeFolder url="file://$MODULE_DIR$/target/surefire-reports" />
+      <excludeFolder url="file://$MODULE_DIR$/target/test-classes" />
     </content>
     <orderEntry type="inheritedJdk" />
     <orderEntry type="sourceFolder" forTests="false" />
@@ -30,6 +35,9 @@
     <orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.11" level="project" />
     <orderEntry type="library" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
     <orderEntry type="module" module-name="streams-core" />
+    <orderEntry type="module" module-name="streams-util" />
+    <orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.1" level="project" />
+    <orderEntry type="library" name="Maven: com.google.guava:guava:16.0.1" level="project" />
     <orderEntry type="library" name="Maven: ch.qos.logback:logback-classic:1.0.9" level="project" />
     <orderEntry type="library" name="Maven: ch.qos.logback:logback-core:1.0.9" level="project" />
     <orderEntry type="module" module-name="streams-pojo" />
@@ -49,8 +57,6 @@
     <orderEntry type="library" name="Maven: javax.xml.bind:jsr173_api:1.0" level="project" />
     <orderEntry type="library" name="Maven: commons-io:commons-io:2.4" level="project" />
     <orderEntry type="module" module-name="streams-config" />
-    <orderEntry type="library" name="Maven: com.google.guava:guava:15.0" level="project" />
-    <orderEntry type="library" name="Maven: com.google.collections:google-collections:1.0" level="project" />
     <orderEntry type="library" name="Maven: com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.2.1" level="project" />
     <orderEntry type="library" name="Maven: com.jayway.jsonpath:json-path:0.9.0" level="project" />
     <orderEntry type="library" name="Maven: net.minidev:json-smart:1.2" level="project" />

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c0281362/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProfileProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProfileProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProfileProcessor.java
new file mode 100644
index 0000000..3f9c24b
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProfileProcessor.java
@@ -0,0 +1,111 @@
+package org.apache.streams.twitter.provider;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.User;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class TwitterProfileProcessor implements StreamsProcessor, Runnable {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterProfileProcessor.class);
+
+    private ObjectMapper mapper = new ObjectMapper();
+
+    private Queue<StreamsDatum> inQueue;
+    private Queue<StreamsDatum> outQueue;
+
+    public final static String TERMINATE = new String("TERMINATE");
+
+    @Override
+    public void run() {
+
+        while(true) {
+            StreamsDatum item;
+                try {
+                    item = inQueue.poll();
+                    if(item.getDocument() instanceof String && item.equals(TERMINATE)) {
+                        LOGGER.info("Terminating!");
+                        break;
+                    }
+
+                    Thread.sleep(new Random().nextInt(100));
+
+                    for( StreamsDatum entry : process(item)) {
+                        outQueue.offer(entry);
+                    }
+
+
+            } catch (Exception e) {
+                e.printStackTrace();
+
+            }
+        }
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+
+        List<StreamsDatum> result = Lists.newArrayList();
+        String item;
+        try {
+            // first check for valid json
+            // since data is coming from outside provider, we don't know what type the events are
+            if( entry.getDocument() instanceof String) {
+                item = (String) entry.getDocument();
+            } else {
+                item = mapper.writeValueAsString((ObjectNode)entry.getDocument());
+            }
+
+            Class inClass = TwitterEventClassifier.detectClass(item);
+
+            User user;
+
+            if ( inClass.equals( Tweet.class )) {
+                LOGGER.debug("TWEET");
+                Tweet tweet = mapper.readValue(item, Tweet.class);
+                user = tweet.getUser();
+                result.add(new StreamsDatum(user));
+            }
+            else if ( inClass.equals( Retweet.class )) {
+                LOGGER.debug("RETWEET");
+                Retweet retweet = mapper.readValue(item, Retweet.class);
+                user = retweet.getRetweetedStatus().getUser();
+                result.add(new StreamsDatum(user));
+            } else {
+                return Lists.newArrayList();
+            }
+
+            return result;
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOGGER.warn("Error processing " + entry.toString());
+            return Lists.newArrayList();
+        }
+    }
+
+    @Override
+    public void prepare(Object o) {
+
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c0281362/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTypeConverter.java
new file mode 100644
index 0000000..0b0507d
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTypeConverter.java
@@ -0,0 +1,199 @@
+package org.apache.streams.twitter.provider;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class TwitterTypeConverter implements StreamsProcessor, Runnable {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTypeConverter.class);
+
+    private ObjectMapper mapper = new ObjectMapper();
+
+    private Queue<StreamsDatum> inQueue;
+    private Queue<StreamsDatum> outQueue;
+
+    private Class inClass;
+    private Class outClass;
+
+    private TwitterJsonTweetActivitySerializer twitterJsonTweetActivitySerializer = new TwitterJsonTweetActivitySerializer();
+    private TwitterJsonRetweetActivitySerializer twitterJsonRetweetActivitySerializer = new TwitterJsonRetweetActivitySerializer();
+    private TwitterJsonDeleteActivitySerializer twitterJsonDeleteActivitySerializer = new TwitterJsonDeleteActivitySerializer();
+
+    public final static String TERMINATE = new String("TERMINATE");
+
+    public TwitterTypeConverter(Class inClass, Class outClass) {
+        this.inClass = inClass;
+        this.outClass = outClass;
+    }
+
+    public Queue<StreamsDatum> getProcessorOutputQueue() {
+        return outQueue;
+    }
+
+    public void setProcessorInputQueue(Queue<StreamsDatum> inputQueue) {
+        inQueue = inputQueue;
+    }
+
+    public Object convert(ObjectNode event, Class inClass, Class outClass) {
+
+        LOGGER.debug(event.toString());
+
+        Object result = null;
+
+        if( outClass.equals( Activity.class )) {
+            if( inClass.equals( Delete.class )) {
+                LOGGER.debug("ACTIVITY DELETE");
+                result = twitterJsonDeleteActivitySerializer.convert(event);
+            } else if ( inClass.equals( Retweet.class )) {
+                LOGGER.debug("ACTIVITY RETWEET");
+                result = twitterJsonRetweetActivitySerializer.convert(event);
+            } else if ( inClass.equals( Tweet.class )) {
+                LOGGER.debug("ACTIVITY TWEET");
+                result = twitterJsonTweetActivitySerializer.convert(event);
+            } else {
+                return null;
+            }
+        } else if( outClass.equals( Tweet.class )) {
+            if ( inClass.equals( Tweet.class )) {
+                LOGGER.debug("TWEET");
+                result = mapper.convertValue(event, Tweet.class);
+            }
+        } else if( outClass.equals( Retweet.class )) {
+            if ( inClass.equals( Retweet.class )) {
+                LOGGER.debug("RETWEET");
+                result = mapper.convertValue(event, Retweet.class);
+            }
+        } else if( outClass.equals( Delete.class )) {
+            if ( inClass.equals( Delete.class )) {
+                LOGGER.debug("DELETE");
+                result = mapper.convertValue(event, Delete.class);
+            }
+        } else if( outClass.equals( ObjectNode.class )) {
+            LOGGER.debug("OBJECTNODE");
+            result = mapper.convertValue(event, ObjectNode.class);
+        }
+
+            // no supported conversion were applied
+        if( result != null )
+            return result;
+
+        LOGGER.debug("CONVERT FAILED");
+
+        return null;
+
+    }
+
+    public boolean validate(Object document, Class klass) {
+
+        // TODO
+        return true;
+    }
+
+    public boolean isValidJSON(final String json) {
+        boolean valid = false;
+        try {
+            final JsonParser parser = new ObjectMapper().getJsonFactory()
+                    .createJsonParser(json);
+            while (parser.nextToken() != null) {
+            }
+            valid = true;
+        } catch (JsonParseException jpe) {
+            LOGGER.warn("validate: {}", jpe);
+        } catch (IOException ioe) {
+            LOGGER.warn("validate: {}", ioe);
+        }
+
+        return valid;
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+
+        StreamsDatum result = null;
+
+        try {
+
+            Object item = entry.getDocument();
+            ObjectNode node;
+
+            if( item instanceof String ) {
+
+                // if the target is string, just pass-through
+                if( String.class.equals(outClass))
+                    outQueue.offer(entry);
+                else {
+                    // first check for valid json
+                    node = (ObjectNode)mapper.readTree((String)item);
+
+                    // since data is coming from outside provider, we don't know what type the events are
+                    Class inClass = TwitterEventClassifier.detectClass((String)item);
+
+                    Object out = convert(node, inClass, outClass);
+
+                    if( out != null && validate(out, outClass))
+                        result = new StreamsDatum(out);
+                }
+
+            } else if( item instanceof ObjectNode ) {
+
+                // first check for valid json
+                node = (ObjectNode)mapper.valueToTree(item);
+
+                // since data is coming from outside provider, we don't know what type the events are
+                Class inClass = TwitterEventClassifier.detectClass((String)item);
+
+                Object out = convert(node, inClass, outClass);
+
+                if( out != null && validate(out, outClass))
+                    result = new StreamsDatum(out);
+
+            }
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        if( result != null )
+            return Lists.newArrayList(result);
+        else
+            return Lists.newArrayList();
+    }
+
+    @Override
+    public void prepare(Object o) {
+
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+
+    @Override
+    public void run() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c0281362/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java b/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java
new file mode 100644
index 0000000..f5c41b0
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java
@@ -0,0 +1,50 @@
+package org.apache.streams.util;
+
+import java.io.*;
+
+/**
+ * Created by rebanks on 2/18/14.
+ */
+public class SerializationUtil {
+
+    /**
+     * BORROwED FROM APACHE STORM PROJECT
+     * @param obj
+     * @return
+     */
+    public static byte[] serialize(Object obj) {
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            ObjectOutputStream oos = new ObjectOutputStream(bos);
+            oos.writeObject(obj);
+            oos.close();
+            return bos.toByteArray();
+        } catch(IOException ioe) {
+            throw new RuntimeException(ioe);
+        }
+    }
+
+    /**
+     * BORROwED FROM APACHE STORM PROJECT
+     * @param serialized
+     * @return
+     */
+    public static Object deserialize(byte[] serialized) {
+        try {
+            ByteArrayInputStream bis = new ByteArrayInputStream(serialized);
+            ObjectInputStream ois = new ObjectInputStream(bis);
+            Object ret = ois.readObject();
+            ois.close();
+            return ret;
+        } catch(IOException ioe) {
+            throw new RuntimeException(ioe);
+        } catch(ClassNotFoundException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+
+    public static Object cloneBySerialization(Object obj) {
+        return deserialize(serialize(obj));
+    }
+}