You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/05/05 20:54:09 UTC

[22/52] [abbrv] git commit: Changes

Changes


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d8feb5ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d8feb5ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d8feb5ba

Branch: refs/heads/sblackmon
Commit: d8feb5ba06afae4716d3122d79796e2347679640
Parents: 5ff406f
Author: rebanks <re...@w2odigital.com>
Authored: Wed Apr 2 12:01:10 2014 -0500
Committer: rebanks <re...@w2odigital.com>
Committed: Wed Apr 2 12:01:10 2014 -0500

----------------------------------------------------------------------
 .../elasticsearch/ElasticsearchPersistWriter.java |  3 ++-
 .../streams/twitter/test/SimpleTweetTest.java     | 18 +++++++++++-------
 .../streams/local/builders/StreamComponent.java   | 16 ++++++++++++----
 .../streams/local/tasks/BaseStreamsTask.java      |  3 ++-
 4 files changed, 27 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d8feb5ba/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 9390219..0e16c2c 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -7,6 +7,7 @@ import com.google.common.base.Preconditions;
 import com.typesafe.config.Config;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.*;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@@ -97,7 +98,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
 
     protected volatile Queue<StreamsDatum> persistQueue;
 
-    private ObjectMapper mapper = new ObjectMapper();
+    private ObjectMapper mapper = new StreamsJacksonMapper();
 
     private ElasticsearchWriterConfiguration config;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d8feb5ba/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
index 8988de0..b8bfe1a 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
@@ -3,14 +3,11 @@ package org.apache.streams.twitter.test;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Optional;
-import org.apache.commons.lang.StringUtils;
+import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.exceptions.ActivitySerializerException;
 import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.pojo.Delete;
-import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.processor.TwitterTypeConverter;
 import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
 import org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer;
 import org.junit.Assert;
@@ -24,8 +21,6 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 
 import static org.hamcrest.CoreMatchers.*;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 /**
@@ -82,6 +77,15 @@ public class SimpleTweetTest {
             Assert.fail();
         }
 
+        try {
+            TwitterTypeConverter converter = new TwitterTypeConverter(String.class, Activity.class);
+            converter.prepare(null);
+            converter.process(new StreamsDatum(TWITTER_JSON));
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail();
+        }
+
         assertThat(activity, is(not(nullValue())));
 
         assertThat(activity.getId(), is(not(nullValue())));

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d8feb5ba/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
index 6319ba8..f5e9978 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
@@ -165,10 +165,18 @@ public class StreamComponent {
     public StreamsTask createConnectedTask() {
         StreamsTask task;
         if(this.processor != null) {
-            task =  new StreamsProcessorTask((StreamsProcessor)SerializationUtil.cloneBySerialization(this.processor));
-            task.addInputQueue(this.inQueue);
-            for(Queue<StreamsDatum> q : this.outBound.values()) {
-                task.addOutputQueue(q);
+            if(this.numTasks > 1) {
+                task =  new StreamsProcessorTask((StreamsProcessor)SerializationUtil.cloneBySerialization(this.processor));
+                task.addInputQueue(this.inQueue);
+                for(Queue<StreamsDatum> q : this.outBound.values()) {
+                    task.addOutputQueue(q);
+                }
+            } else {
+                task = new StreamsProcessorTask(this.processor);
+                task.addInputQueue(this.inQueue);
+                for(Queue<StreamsDatum> q : this.outBound.values()) {
+                    task.addOutputQueue(q);
+                }
             }
         }
         else if(this.writer != null) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d8feb5ba/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
index 3799480..694cb76 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
@@ -3,6 +3,7 @@ package org.apache.streams.local.tasks;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.util.SerializationUtil;
 import org.slf4j.Logger;
@@ -27,7 +28,7 @@ public abstract class BaseStreamsTask implements StreamsTask {
     private ObjectMapper mapper;
 
     public BaseStreamsTask() {
-        this.mapper = new ObjectMapper();
+        this.mapper = new StreamsJacksonMapper();
         this.mapper.registerSubtypes(Activity.class);
     }