You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/05/05 20:54:09 UTC
[22/52] [abbrv] git commit: Changes
Changes
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d8feb5ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d8feb5ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d8feb5ba
Branch: refs/heads/sblackmon
Commit: d8feb5ba06afae4716d3122d79796e2347679640
Parents: 5ff406f
Author: rebanks <re...@w2odigital.com>
Authored: Wed Apr 2 12:01:10 2014 -0500
Committer: rebanks <re...@w2odigital.com>
Committed: Wed Apr 2 12:01:10 2014 -0500
----------------------------------------------------------------------
.../elasticsearch/ElasticsearchPersistWriter.java | 3 ++-
.../streams/twitter/test/SimpleTweetTest.java | 18 +++++++++++-------
.../streams/local/builders/StreamComponent.java | 16 ++++++++++++----
.../streams/local/tasks/BaseStreamsTask.java | 3 ++-
4 files changed, 27 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d8feb5ba/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 9390219..0e16c2c 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -7,6 +7,7 @@ import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.*;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@@ -97,7 +98,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
protected volatile Queue<StreamsDatum> persistQueue;
- private ObjectMapper mapper = new ObjectMapper();
+ private ObjectMapper mapper = new StreamsJacksonMapper();
private ElasticsearchWriterConfiguration config;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d8feb5ba/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
index 8988de0..b8bfe1a 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
@@ -3,14 +3,11 @@ package org.apache.streams.twitter.test;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Optional;
-import org.apache.commons.lang.StringUtils;
+import org.apache.streams.core.StreamsDatum;
import org.apache.streams.exceptions.ActivitySerializerException;
import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.pojo.Delete;
-import org.apache.streams.twitter.pojo.Retweet;
import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.processor.TwitterTypeConverter;
import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
import org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer;
import org.junit.Assert;
@@ -24,8 +21,6 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import static org.hamcrest.CoreMatchers.*;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
/**
@@ -82,6 +77,15 @@ public class SimpleTweetTest {
Assert.fail();
}
+ try {
+ TwitterTypeConverter converter = new TwitterTypeConverter(String.class, Activity.class);
+ converter.prepare(null);
+ converter.process(new StreamsDatum(TWITTER_JSON));
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+
assertThat(activity, is(not(nullValue())));
assertThat(activity.getId(), is(not(nullValue())));
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d8feb5ba/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
index 6319ba8..f5e9978 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
@@ -165,10 +165,18 @@ public class StreamComponent {
public StreamsTask createConnectedTask() {
StreamsTask task;
if(this.processor != null) {
- task = new StreamsProcessorTask((StreamsProcessor)SerializationUtil.cloneBySerialization(this.processor));
- task.addInputQueue(this.inQueue);
- for(Queue<StreamsDatum> q : this.outBound.values()) {
- task.addOutputQueue(q);
+ if(this.numTasks > 1) {
+ task = new StreamsProcessorTask((StreamsProcessor)SerializationUtil.cloneBySerialization(this.processor));
+ task.addInputQueue(this.inQueue);
+ for(Queue<StreamsDatum> q : this.outBound.values()) {
+ task.addOutputQueue(q);
+ }
+ } else {
+ task = new StreamsProcessorTask(this.processor);
+ task.addInputQueue(this.inQueue);
+ for(Queue<StreamsDatum> q : this.outBound.values()) {
+ task.addOutputQueue(q);
+ }
}
}
else if(this.writer != null) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d8feb5ba/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
index 3799480..694cb76 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
@@ -3,6 +3,7 @@ package org.apache.streams.local.tasks;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.util.SerializationUtil;
import org.slf4j.Logger;
@@ -27,7 +28,7 @@ public abstract class BaseStreamsTask implements StreamsTask {
private ObjectMapper mapper;
public BaseStreamsTask() {
- this.mapper = new ObjectMapper();
+ this.mapper = new StreamsJacksonMapper();
this.mapper.registerSubtypes(Activity.class);
}