You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/22 00:20:17 UTC
[39/71] [abbrv] git commit: adding a few missing files
adding a few missing files
git-svn-id: https://svn.apache.org/repos/asf/incubator/streams/branches/STREAMS-26@1571490 13f79535-47bb-0310-9956-ffa450edef68
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c0281362
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c0281362
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c0281362
Branch: refs/heads/master
Commit: c0281362be2062cc9199c48070b4bd3e16b9c142
Parents: 3d7ff8f
Author: sblackmon <sb...@unknown>
Authored: Mon Feb 24 23:34:13 2014 +0000
Committer: sblackmon <sb...@unknown>
Committed: Mon Feb 24 23:34:13 2014 +0000
----------------------------------------------------------------------
.../streams-provider-rss.iml | 16 +-
.../provider/TwitterProfileProcessor.java | 111 +++++++++++
.../twitter/provider/TwitterTypeConverter.java | 199 +++++++++++++++++++
.../apache/streams/util/SerializationUtil.java | 50 +++++
4 files changed, 371 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c0281362/streams-contrib/streams-provider-rss/streams-provider-rss.iml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/streams-provider-rss.iml b/streams-contrib/streams-provider-rss/streams-provider-rss.iml
index f27b242..2846a74 100644
--- a/streams-contrib/streams-provider-rss/streams-provider-rss.iml
+++ b/streams-contrib/streams-provider-rss/streams-provider-rss.iml
@@ -1,15 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
-<module type="JAVA_MODULE" version="4">
+<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_6" inherit-compiler-output="false">
<output url="file://$MODULE_DIR$/target/classes" />
<output-test url="file://$MODULE_DIR$/target/test-classes" />
<content url="file://$MODULE_DIR$">
- <sourceFolder url="file://$MODULE_DIR$/target/generated-sources/jsonschema2pojo" isTestSource="false" generated="true" />
<sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/src/main/resources" type="java-resource" />
<sourceFolder url="file://$MODULE_DIR$/src/test/resources" type="java-test-resource" />
- <excludeFolder url="file://$MODULE_DIR$/target" />
+ <sourceFolder url="file://$MODULE_DIR$/target/generated-sources/jsonschema2pojo" isTestSource="false" generated="true" />
+ <excludeFolder url="file://$MODULE_DIR$/target/classes" />
+ <excludeFolder url="file://$MODULE_DIR$/target/maven-archiver" />
+ <excludeFolder url="file://$MODULE_DIR$/target/maven-shared-archive-resources" />
+ <excludeFolder url="file://$MODULE_DIR$/target/maven-status" />
+ <excludeFolder url="file://$MODULE_DIR$/target/surefire-reports" />
+ <excludeFolder url="file://$MODULE_DIR$/target/test-classes" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
@@ -30,6 +35,9 @@
<orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.11" level="project" />
<orderEntry type="library" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
<orderEntry type="module" module-name="streams-core" />
+ <orderEntry type="module" module-name="streams-util" />
+ <orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.1" level="project" />
+ <orderEntry type="library" name="Maven: com.google.guava:guava:16.0.1" level="project" />
<orderEntry type="library" name="Maven: ch.qos.logback:logback-classic:1.0.9" level="project" />
<orderEntry type="library" name="Maven: ch.qos.logback:logback-core:1.0.9" level="project" />
<orderEntry type="module" module-name="streams-pojo" />
@@ -49,8 +57,6 @@
<orderEntry type="library" name="Maven: javax.xml.bind:jsr173_api:1.0" level="project" />
<orderEntry type="library" name="Maven: commons-io:commons-io:2.4" level="project" />
<orderEntry type="module" module-name="streams-config" />
- <orderEntry type="library" name="Maven: com.google.guava:guava:15.0" level="project" />
- <orderEntry type="library" name="Maven: com.google.collections:google-collections:1.0" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.2.1" level="project" />
<orderEntry type="library" name="Maven: com.jayway.jsonpath:json-path:0.9.0" level="project" />
<orderEntry type="library" name="Maven: net.minidev:json-smart:1.2" level="project" />
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c0281362/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProfileProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProfileProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProfileProcessor.java
new file mode 100644
index 0000000..3f9c24b
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProfileProcessor.java
@@ -0,0 +1,111 @@
+package org.apache.streams.twitter.provider;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.User;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class TwitterProfileProcessor implements StreamsProcessor, Runnable {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(TwitterProfileProcessor.class);
+
+ private ObjectMapper mapper = new ObjectMapper();
+
+ private Queue<StreamsDatum> inQueue;
+ private Queue<StreamsDatum> outQueue;
+
+ public final static String TERMINATE = new String("TERMINATE");
+
+ @Override
+ public void run() {
+
+ while(true) {
+ StreamsDatum item;
+ try {
+ item = inQueue.poll();
+ if(item.getDocument() instanceof String && item.equals(TERMINATE)) {
+ LOGGER.info("Terminating!");
+ break;
+ }
+
+ Thread.sleep(new Random().nextInt(100));
+
+ for( StreamsDatum entry : process(item)) {
+ outQueue.offer(entry);
+ }
+
+
+ } catch (Exception e) {
+ e.printStackTrace();
+
+ }
+ }
+ }
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+
+ List<StreamsDatum> result = Lists.newArrayList();
+ String item;
+ try {
+ // first check for valid json
+ // since data is coming from outside provider, we don't know what type the events are
+ if( entry.getDocument() instanceof String) {
+ item = (String) entry.getDocument();
+ } else {
+ item = mapper.writeValueAsString((ObjectNode)entry.getDocument());
+ }
+
+ Class inClass = TwitterEventClassifier.detectClass(item);
+
+ User user;
+
+ if ( inClass.equals( Tweet.class )) {
+ LOGGER.debug("TWEET");
+ Tweet tweet = mapper.readValue(item, Tweet.class);
+ user = tweet.getUser();
+ result.add(new StreamsDatum(user));
+ }
+ else if ( inClass.equals( Retweet.class )) {
+ LOGGER.debug("RETWEET");
+ Retweet retweet = mapper.readValue(item, Retweet.class);
+ user = retweet.getRetweetedStatus().getUser();
+ result.add(new StreamsDatum(user));
+ } else {
+ return Lists.newArrayList();
+ }
+
+ return result;
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn("Error processing " + entry.toString());
+ return Lists.newArrayList();
+ }
+ }
+
+ @Override
+ public void prepare(Object o) {
+
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
+};
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c0281362/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTypeConverter.java
new file mode 100644
index 0000000..0b0507d
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTypeConverter.java
@@ -0,0 +1,199 @@
+package org.apache.streams.twitter.provider;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class TwitterTypeConverter implements StreamsProcessor, Runnable {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTypeConverter.class);
+
+ private ObjectMapper mapper = new ObjectMapper();
+
+ private Queue<StreamsDatum> inQueue;
+ private Queue<StreamsDatum> outQueue;
+
+ private Class inClass;
+ private Class outClass;
+
+ private TwitterJsonTweetActivitySerializer twitterJsonTweetActivitySerializer = new TwitterJsonTweetActivitySerializer();
+ private TwitterJsonRetweetActivitySerializer twitterJsonRetweetActivitySerializer = new TwitterJsonRetweetActivitySerializer();
+ private TwitterJsonDeleteActivitySerializer twitterJsonDeleteActivitySerializer = new TwitterJsonDeleteActivitySerializer();
+
+ public final static String TERMINATE = new String("TERMINATE");
+
+ public TwitterTypeConverter(Class inClass, Class outClass) {
+ this.inClass = inClass;
+ this.outClass = outClass;
+ }
+
+ public Queue<StreamsDatum> getProcessorOutputQueue() {
+ return outQueue;
+ }
+
+ public void setProcessorInputQueue(Queue<StreamsDatum> inputQueue) {
+ inQueue = inputQueue;
+ }
+
+ public Object convert(ObjectNode event, Class inClass, Class outClass) {
+
+ LOGGER.debug(event.toString());
+
+ Object result = null;
+
+ if( outClass.equals( Activity.class )) {
+ if( inClass.equals( Delete.class )) {
+ LOGGER.debug("ACTIVITY DELETE");
+ result = twitterJsonDeleteActivitySerializer.convert(event);
+ } else if ( inClass.equals( Retweet.class )) {
+ LOGGER.debug("ACTIVITY RETWEET");
+ result = twitterJsonRetweetActivitySerializer.convert(event);
+ } else if ( inClass.equals( Tweet.class )) {
+ LOGGER.debug("ACTIVITY TWEET");
+ result = twitterJsonTweetActivitySerializer.convert(event);
+ } else {
+ return null;
+ }
+ } else if( outClass.equals( Tweet.class )) {
+ if ( inClass.equals( Tweet.class )) {
+ LOGGER.debug("TWEET");
+ result = mapper.convertValue(event, Tweet.class);
+ }
+ } else if( outClass.equals( Retweet.class )) {
+ if ( inClass.equals( Retweet.class )) {
+ LOGGER.debug("RETWEET");
+ result = mapper.convertValue(event, Retweet.class);
+ }
+ } else if( outClass.equals( Delete.class )) {
+ if ( inClass.equals( Delete.class )) {
+ LOGGER.debug("DELETE");
+ result = mapper.convertValue(event, Delete.class);
+ }
+ } else if( outClass.equals( ObjectNode.class )) {
+ LOGGER.debug("OBJECTNODE");
+ result = mapper.convertValue(event, ObjectNode.class);
+ }
+
+ // no supported conversion were applied
+ if( result != null )
+ return result;
+
+ LOGGER.debug("CONVERT FAILED");
+
+ return null;
+
+ }
+
+ public boolean validate(Object document, Class klass) {
+
+ // TODO
+ return true;
+ }
+
+ public boolean isValidJSON(final String json) {
+ boolean valid = false;
+ try {
+ final JsonParser parser = new ObjectMapper().getJsonFactory()
+ .createJsonParser(json);
+ while (parser.nextToken() != null) {
+ }
+ valid = true;
+ } catch (JsonParseException jpe) {
+ LOGGER.warn("validate: {}", jpe);
+ } catch (IOException ioe) {
+ LOGGER.warn("validate: {}", ioe);
+ }
+
+ return valid;
+ }
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+
+ StreamsDatum result = null;
+
+ try {
+
+ Object item = entry.getDocument();
+ ObjectNode node;
+
+ if( item instanceof String ) {
+
+ // if the target is string, just pass-through
+ if( String.class.equals(outClass))
+ outQueue.offer(entry);
+ else {
+ // first check for valid json
+ node = (ObjectNode)mapper.readTree((String)item);
+
+ // since data is coming from outside provider, we don't know what type the events are
+ Class inClass = TwitterEventClassifier.detectClass((String)item);
+
+ Object out = convert(node, inClass, outClass);
+
+ if( out != null && validate(out, outClass))
+ result = new StreamsDatum(out);
+ }
+
+ } else if( item instanceof ObjectNode ) {
+
+ // first check for valid json
+ node = (ObjectNode)mapper.valueToTree(item);
+
+ // since data is coming from outside provider, we don't know what type the events are
+ Class inClass = TwitterEventClassifier.detectClass((String)item);
+
+ Object out = convert(node, inClass, outClass);
+
+ if( out != null && validate(out, outClass))
+ result = new StreamsDatum(out);
+
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ if( result != null )
+ return Lists.newArrayList(result);
+ else
+ return Lists.newArrayList();
+ }
+
+ @Override
+ public void prepare(Object o) {
+
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
+
+ @Override
+ public void run() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c0281362/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java b/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java
new file mode 100644
index 0000000..f5c41b0
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java
@@ -0,0 +1,50 @@
+package org.apache.streams.util;
+
+import java.io.*;
+
+/**
+ * Created by rebanks on 2/18/14.
+ */
+public class SerializationUtil {
+
+ /**
+ * BORROwED FROM APACHE STORM PROJECT
+ * @param obj
+ * @return
+ */
+ public static byte[] serialize(Object obj) {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(obj);
+ oos.close();
+ return bos.toByteArray();
+ } catch(IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ /**
+ * BORROwED FROM APACHE STORM PROJECT
+ * @param serialized
+ * @return
+ */
+ public static Object deserialize(byte[] serialized) {
+ try {
+ ByteArrayInputStream bis = new ByteArrayInputStream(serialized);
+ ObjectInputStream ois = new ObjectInputStream(bis);
+ Object ret = ois.readObject();
+ ois.close();
+ return ret;
+ } catch(IOException ioe) {
+ throw new RuntimeException(ioe);
+ } catch(ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ public static Object cloneBySerialization(Object obj) {
+ return deserialize(serialize(obj));
+ }
+}