You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/04/02 23:32:28 UTC
git commit: Tweaks to Twitter & ES to restore examples
Repository: incubator-streams
Updated Branches:
refs/heads/springcleaning da2d80c74 -> b8fef9d1b
Tweaks to Twitter & ES to restore examples
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b8fef9d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b8fef9d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b8fef9d1
Branch: refs/heads/springcleaning
Commit: b8fef9d1b04734e959cf2aa01d5314f6aa0e844d
Parents: da2d80c
Author: sblackmon <sb...@w2odigital.com>
Authored: Wed Apr 2 16:32:26 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Wed Apr 2 16:32:26 2014 -0500
----------------------------------------------------------------------
.../ElasticsearchConfigurator.java | 31 ++++++++++++++++
.../ElasticsearchPersistReader.java | 4 +-
.../ElasticsearchPersistWriter.java | 2 +
.../ElasticsearchReaderConfiguration.json | 18 ++++++---
.../processor/TwitterEventProcessor.java | 39 ++++++--------------
.../twitter/processor/TwitterTypeConverter.java | 7 ++--
.../provider/TwitterStreamConfigurator.java | 27 ++++++++++----
.../twitter/provider/TwitterStreamProvider.java | 19 ++++++++--
.../provider/TwitterTimelineProvider.java | 1 +
.../streams/jackson/StreamsJacksonMapper.java | 5 ++-
.../streams/jackson/StreamsJacksonModule.java | 5 +++
.../streams/local/tasks/BaseStreamsTask.java | 24 ++++++++----
.../org/apache/streams/util/ComponentUtils.java | 18 +++++++++
13 files changed, 143 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
index 224f7da..20b5c08 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
@@ -1,5 +1,6 @@
package org.apache.streams.elasticsearch;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -13,6 +14,8 @@ public class ElasticsearchConfigurator {
private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfigurator.class);
+ private final static ObjectMapper mapper = new ObjectMapper();
+
public static ElasticsearchConfiguration detectConfiguration(Config elasticsearch) {
List<String> hosts = elasticsearch.getStringList("hosts");
Long port = elasticsearch.getLong("port");
@@ -27,4 +30,32 @@ public class ElasticsearchConfigurator {
return elasticsearchConfiguration;
}
+ public static ElasticsearchReaderConfiguration detectReaderConfiguration(Config elasticsearch) {
+
+ ElasticsearchConfiguration elasticsearchConfiguration = detectConfiguration(elasticsearch);
+ ElasticsearchReaderConfiguration elasticsearchReaderConfiguration = mapper.convertValue(elasticsearchConfiguration, ElasticsearchReaderConfiguration.class);
+
+ List<String> indexes = elasticsearch.getStringList("indexes");
+ List<String> types = elasticsearch.getStringList("types");
+
+ elasticsearchReaderConfiguration.setIndexes(indexes);
+ elasticsearchReaderConfiguration.setTypes(types);
+
+ return elasticsearchReaderConfiguration;
+ }
+
+ public static ElasticsearchWriterConfiguration detectWriterConfiguration(Config elasticsearch) {
+
+ ElasticsearchConfiguration elasticsearchConfiguration = detectConfiguration(elasticsearch);
+ ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = mapper.convertValue(elasticsearchConfiguration, ElasticsearchWriterConfiguration.class);
+
+ String index = elasticsearch.getString("index");
+ String type = elasticsearch.getString("type");
+
+ elasticsearchWriterConfiguration.setIndex(index);
+ elasticsearchWriterConfiguration.setType(type);
+
+ return elasticsearchWriterConfiguration;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
index 8ffcbd5..cc9b3fc 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
@@ -105,8 +105,8 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Iterabl
}
public ElasticsearchPersistReader(ElasticsearchReaderConfiguration elasticsearchConfiguration) {
this.elasticsearchClientManager = new ElasticsearchClientManager(elasticsearchConfiguration);
- indexes.add(elasticsearchConfiguration.getIndex());
- types.add(elasticsearchConfiguration.getType());
+ indexes.addAll(elasticsearchConfiguration.getIndexes());
+ types.addAll(elasticsearchConfiguration.getTypes());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 9390219..ab35edd 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -61,6 +61,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
private volatile int totalSent = 0;
private volatile int totalSeconds = 0;
+ private volatile int totalAttempted = 0;
private volatile int totalOk = 0;
private volatile int totalFailed = 0;
private volatile int totalBatchCount = 0;
@@ -310,6 +311,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
thisOk++;
}
+ totalAttempted += thisSent;
totalOk += thisOk;
totalFailed += thisFailed;
totalSeconds += (thisMillis / 1000);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json
index 698da1a..1f1c720 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json
@@ -6,13 +6,19 @@
"extends": {"$ref":"ElasticsearchConfiguration.json"},
"javaInterfaces": ["java.io.Serializable"],
"properties": {
- "index": {
- "type": "string",
- "description": "Index to write to"
+ "indexes": {
+ "type": "array",
+ "items": {
+ "type": "string"
+ },
+ "description": "Indexes to read from"
},
- "type": {
- "type": "string",
- "description": "Type to write as"
+ "types": {
+ "type": "array",
+ "items": {
+ "type": "string"
+ },
+ "description": "Types to read from"
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
index abc0c1a..270172f 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
@@ -16,6 +16,7 @@ import org.apache.streams.twitter.pojo.Retweet;
import org.apache.streams.twitter.pojo.Tweet;
import org.apache.streams.twitter.provider.TwitterEventClassifier;
import org.apache.streams.twitter.serializer.*;
+import org.apache.streams.util.ComponentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,9 +42,7 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable {
private Class inClass;
private Class outClass;
- private TwitterJsonTweetActivitySerializer twitterJsonTweetActivitySerializer = new TwitterJsonTweetActivitySerializer();
- private TwitterJsonRetweetActivitySerializer twitterJsonRetweetActivitySerializer = new TwitterJsonRetweetActivitySerializer();
- private TwitterJsonDeleteActivitySerializer twitterJsonDeleteActivitySerializer = new TwitterJsonDeleteActivitySerializer();
+ private TwitterJsonActivitySerializer twitterJsonActivitySerializer;
public final static String TERMINATE = new String("TERMINATE");
@@ -65,22 +64,20 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable {
while(true) {
String item;
try {
- item = inQueue.take();
+
+ item = ComponentUtils.pollUntilStringNotEmpty(inQueue);
+
if(item instanceof String && item.equals(TERMINATE)) {
LOGGER.info("Terminating!");
break;
}
- System.out.println(item);
+ ObjectNode objectNode = (ObjectNode) mapper.readTree(item);
- if( StringUtils.isNotEmpty(item) ) {
- ObjectNode objectNode = (ObjectNode) mapper.readTree(item);
+ StreamsDatum rawDatum = new StreamsDatum(objectNode);
- StreamsDatum rawDatum = new StreamsDatum(objectNode);
-
- for (StreamsDatum entry : process(rawDatum)) {
- outQueue.offer(entry);
- }
+ for (StreamsDatum entry : process(rawDatum)) {
+ ComponentUtils.offerUntilSuccess(entry, outQueue);
}
} catch (Exception e) {
@@ -95,21 +92,9 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable {
Object result = null;
if( outClass.equals( Activity.class )) {
- if( inClass.equals( Delete.class )) {
- LOGGER.debug("ACTIVITY DELETE");
- result = twitterJsonDeleteActivitySerializer.deserialize(
- mapper.writeValueAsString(event));
- } else if ( inClass.equals( Retweet.class )) {
- LOGGER.debug("ACTIVITY RETWEET");
- result = twitterJsonRetweetActivitySerializer.deserialize(
+ LOGGER.debug("ACTIVITY");
+ result = twitterJsonActivitySerializer.deserialize(
mapper.writeValueAsString(event));
- } else if ( inClass.equals( Tweet.class )) {
- LOGGER.debug("ACTIVITY TWEET");
- result = twitterJsonTweetActivitySerializer.deserialize(
- mapper.writeValueAsString(event));
- } else {
- return null;
- }
} else if( outClass.equals( Tweet.class )) {
if ( inClass.equals( Tweet.class )) {
LOGGER.debug("TWEET");
@@ -210,7 +195,7 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable {
@Override
public void prepare(Object configurationObject) {
-
+ twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
index 1c1e2fb..68820c9 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
@@ -31,7 +31,7 @@ public class TwitterTypeConverter implements StreamsProcessor {
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTypeConverter.class);
- private ObjectMapper mapper = new StreamsTwitterMapper();
+ private ObjectMapper mapper;
private Queue<StreamsDatum> inQueue;
private Queue<StreamsDatum> outQueue;
@@ -39,7 +39,7 @@ public class TwitterTypeConverter implements StreamsProcessor {
private Class inClass;
private Class outClass;
- private TwitterJsonActivitySerializer twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
+ private TwitterJsonActivitySerializer twitterJsonActivitySerializer;
public final static String TERMINATE = new String("TERMINATE");
@@ -176,7 +176,8 @@ public class TwitterTypeConverter implements StreamsProcessor {
@Override
public void prepare(Object o) {
-
+ mapper = new StreamsTwitterMapper();
+ twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
index 2ae8d59..b1a1a07 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
@@ -4,6 +4,7 @@ import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.twitter.TwitterBasicAuthConfiguration;
import org.apache.streams.twitter.TwitterOAuthConfiguration;
import org.apache.streams.twitter.TwitterStreamConfiguration;
import org.slf4j.Logger;
@@ -19,23 +20,35 @@ public class TwitterStreamConfigurator {
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterStreamConfigurator.class);
public static TwitterStreamConfiguration detectConfiguration(Config twitter) {
- Config oauth = StreamsConfigurator.config.getConfig("twitter.oauth");
TwitterStreamConfiguration twitterStreamConfiguration = new TwitterStreamConfiguration();
twitterStreamConfiguration.setProtocol(twitter.getString("protocol"));
twitterStreamConfiguration.setHost(twitter.getString("host"));
twitterStreamConfiguration.setPort(twitter.getLong("port"));
twitterStreamConfiguration.setVersion(twitter.getString("version"));
- TwitterOAuthConfiguration twitterOAuthConfiguration = new TwitterOAuthConfiguration();
- twitterOAuthConfiguration.setConsumerKey(oauth.getString("consumerKey"));
- twitterOAuthConfiguration.setConsumerSecret(oauth.getString("consumerSecret"));
- twitterOAuthConfiguration.setAccessToken(oauth.getString("accessToken"));
- twitterOAuthConfiguration.setAccessTokenSecret(oauth.getString("accessTokenSecret"));
- twitterStreamConfiguration.setOauth(twitterOAuthConfiguration);
+
+ try {
+ Config basicauth = StreamsConfigurator.config.getConfig("twitter.basicauth");
+ TwitterBasicAuthConfiguration twitterBasicAuthConfiguration = new TwitterBasicAuthConfiguration();
+ twitterBasicAuthConfiguration.setUsername(basicauth.getString("username"));
+ twitterBasicAuthConfiguration.setPassword(basicauth.getString("password"));
+ twitterStreamConfiguration.setBasicauth(twitterBasicAuthConfiguration);
+ } catch( ConfigException ce ) {}
+
+ try {
+ Config oauth = StreamsConfigurator.config.getConfig("twitter.oauth");
+ TwitterOAuthConfiguration twitterOAuthConfiguration = new TwitterOAuthConfiguration();
+ twitterOAuthConfiguration.setConsumerKey(oauth.getString("consumerKey"));
+ twitterOAuthConfiguration.setConsumerSecret(oauth.getString("consumerSecret"));
+ twitterOAuthConfiguration.setAccessToken(oauth.getString("accessToken"));
+ twitterOAuthConfiguration.setAccessTokenSecret(oauth.getString("accessTokenSecret"));
+ twitterStreamConfiguration.setOauth(twitterOAuthConfiguration);
+ } catch( ConfigException ce ) {}
try {
twitterStreamConfiguration.setTrack(twitter.getStringList("track"));
} catch( ConfigException ce ) {}
+
try {
List<Long> follows = Lists.newArrayList();
for( Integer id : twitter.getIntList("follow"))
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index 6a3def6..2b8a2f1 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -13,6 +13,7 @@ import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint;
import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
import com.twitter.hbc.core.endpoint.StreamingEndpoint;
+import com.twitter.hbc.core.endpoint.UserstreamEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.BasicClient;
import com.twitter.hbc.httpclient.auth.Authentication;
@@ -135,7 +136,18 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable {
Preconditions.checkNotNull(this.klass);
Preconditions.checkNotNull(config.getEndpoint());
- if(config.getEndpoint().endsWith("sample.json") ) {
+
+ if(config.getEndpoint().endsWith("user.json") ) {
+ endpoint = new UserstreamEndpoint();
+
+ Optional<String> with = Optional.fromNullable(config.getWith());
+ Optional<String> replies = Optional.fromNullable(config.getReplies());
+
+ if( with.isPresent() ) endpoint.addPostParameter("with", with.get());
+ if( replies.isPresent() ) endpoint.addPostParameter("replies", replies.get());
+
+ }
+ else if(config.getEndpoint().endsWith("sample.json") ) {
endpoint = new StatusesSampleEndpoint();
Optional<List<String>> track = Optional.fromNullable(config.getTrack());
@@ -143,6 +155,7 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable {
if( track.isPresent() ) endpoint.addPostParameter("track", Joiner.on(",").join(track.get()));
if( follow.isPresent() ) endpoint.addPostParameter("follow", Joiner.on(",").join(follow.get()));
+
}
else if( config.getEndpoint().endsWith("firehose.json"))
endpoint = new StatusesFirehoseEndpoint();
@@ -174,12 +187,10 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable {
return;
}
- endpoint.addPostParameter("with", config.getWith());
- endpoint.addPostParameter("replies", config.getReplies());
client = new ClientBuilder()
.name("apache/streams/streams-contrib/streams-provider-twitter")
- .hosts(Constants.STREAM_HOST)
+ .hosts(config.getProtocol() + "://" + config.getHost())
.endpoint(endpoint)
.authentication(auth)
.processor(new StringDelimitedProcessor(inQueue))
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 40fb961..242d943 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -254,6 +254,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
Preconditions.checkNotNull(config.getFollow());
+ Preconditions.checkArgument(config.getHost().equals("api.twitter.com"));
Preconditions.checkArgument(config.getEndpoint().equals("statuses/user_timeline"));
Boolean jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
index 275ed7e..7ef74ee 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
@@ -7,6 +7,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.module.SimpleModule;
import org.joda.time.DateTime;
@@ -37,7 +38,9 @@ public class StreamsJacksonMapper extends ObjectMapper {
configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.FALSE);
configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, Boolean.TRUE);
- setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
+ // If a user has an 'object' that does not have an explicit mapping, don't cause the serialization to fail.
+ configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, Boolean.FALSE);
+ setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.DEFAULT);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
index 323e40c..bcf0f65 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
@@ -2,6 +2,7 @@ package org.apache.streams.jackson;
import com.fasterxml.jackson.databind.module.SimpleModule;
import org.joda.time.DateTime;
+import org.joda.time.Period;
/**
* Created by sblackmon on 3/27/14.
@@ -12,6 +13,10 @@ public class StreamsJacksonModule extends SimpleModule {
super();
addSerializer(DateTime.class, new StreamsDateTimeSerializer(DateTime.class));
addDeserializer(DateTime.class, new StreamsDateTimeDeserializer(DateTime.class));
+
+ addSerializer(Period.class, new StreamsPeriodSerializer(Period.class));
+ addDeserializer(Period.class, new StreamsPeriodDeserializer(Period.class));
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
index 694cb76..8006560 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
@@ -10,10 +10,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
+import java.util.*;
/**
*
@@ -116,13 +113,13 @@ public abstract class BaseStreamsTask implements StreamsTask {
try {
if(datum.document instanceof ObjectNode) {
- return new StreamsDatum(((ObjectNode) datum.document).deepCopy(), datum.timestamp, datum.sequenceid);
+ return copyMetaData(datum, new StreamsDatum(((ObjectNode) datum.document).deepCopy(), datum.timestamp, datum.sequenceid));
}
else if(datum.document instanceof Activity) {
- return new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document), Activity.class),
+ return copyMetaData(datum, new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document), Activity.class),
datum.timestamp,
- datum.sequenceid);
+ datum.sequenceid));
}
// else if(this.mapper.canSerialize(datum.document.getClass())){
// return new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document), datum.document.getClass()),
@@ -156,4 +153,17 @@ public abstract class BaseStreamsTask implements StreamsTask {
}
while( !success );
}
+
+ private StreamsDatum copyMetaData(StreamsDatum copyFrom, StreamsDatum copyTo) {
+ Map<String, Object> fromMeta = copyFrom.getMetadata();
+ Map<String, Object> toMeta = copyTo.getMetadata();
+ for(String key : fromMeta.keySet()) {
+ Object value = fromMeta.get(key);
+ if(value instanceof Serializable)
+ toMeta.put(key, SerializationUtil.cloneBySerialization(value));
+ else //hope for the best - should be serializable
+ toMeta.put(key, value);
+ }
+ return copyTo;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
index 5bf1d53..609d113 100644
--- a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
+++ b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
@@ -1,5 +1,7 @@
package org.apache.streams.util;
+import org.apache.commons.lang3.StringUtils;
+
import java.util.Queue;
/**
@@ -19,4 +21,20 @@ public class ComponentUtils {
while( !success );
}
+ public static String pollUntilStringNotEmpty(Queue queue) {
+
+ String result = null;
+ do {
+ synchronized( ComponentUtils.class ) {
+ try {
+ result = (String) queue.remove();
+ } catch( Exception e ) {}
+ }
+ Thread.yield();
+ }
+ while( result == null && !StringUtils.isNotEmpty(result) );
+
+ return result;
+ }
+
}