You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/04/02 23:32:28 UTC

git commit: Tweaks to Twitter & ES to restore examples

Repository: incubator-streams
Updated Branches:
  refs/heads/springcleaning da2d80c74 -> b8fef9d1b


Tweaks to Twitter & ES to restore examples


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b8fef9d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b8fef9d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b8fef9d1

Branch: refs/heads/springcleaning
Commit: b8fef9d1b04734e959cf2aa01d5314f6aa0e844d
Parents: da2d80c
Author: sblackmon <sb...@w2odigital.com>
Authored: Wed Apr 2 16:32:26 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Wed Apr 2 16:32:26 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchConfigurator.java              | 31 ++++++++++++++++
 .../ElasticsearchPersistReader.java             |  4 +-
 .../ElasticsearchPersistWriter.java             |  2 +
 .../ElasticsearchReaderConfiguration.json       | 18 ++++++---
 .../processor/TwitterEventProcessor.java        | 39 ++++++--------------
 .../twitter/processor/TwitterTypeConverter.java |  7 ++--
 .../provider/TwitterStreamConfigurator.java     | 27 ++++++++++----
 .../twitter/provider/TwitterStreamProvider.java | 19 ++++++++--
 .../provider/TwitterTimelineProvider.java       |  1 +
 .../streams/jackson/StreamsJacksonMapper.java   |  5 ++-
 .../streams/jackson/StreamsJacksonModule.java   |  5 +++
 .../streams/local/tasks/BaseStreamsTask.java    | 24 ++++++++----
 .../org/apache/streams/util/ComponentUtils.java | 18 +++++++++
 13 files changed, 143 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
index 224f7da..20b5c08 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
@@ -1,5 +1,6 @@
 package org.apache.streams.elasticsearch;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.typesafe.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -13,6 +14,8 @@ public class ElasticsearchConfigurator {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfigurator.class);
 
+    private final static ObjectMapper mapper = new ObjectMapper();
+
     public static ElasticsearchConfiguration detectConfiguration(Config elasticsearch) {
         List<String> hosts = elasticsearch.getStringList("hosts");
         Long port = elasticsearch.getLong("port");
@@ -27,4 +30,32 @@ public class ElasticsearchConfigurator {
         return elasticsearchConfiguration;
     }
 
+    public static ElasticsearchReaderConfiguration detectReaderConfiguration(Config elasticsearch) {
+
+        ElasticsearchConfiguration elasticsearchConfiguration = detectConfiguration(elasticsearch);
+        ElasticsearchReaderConfiguration elasticsearchReaderConfiguration  = mapper.convertValue(elasticsearchConfiguration, ElasticsearchReaderConfiguration.class);
+
+        List<String> indexes = elasticsearch.getStringList("indexes");
+        List<String> types = elasticsearch.getStringList("types");
+
+        elasticsearchReaderConfiguration.setIndexes(indexes);
+        elasticsearchReaderConfiguration.setTypes(types);
+
+        return elasticsearchReaderConfiguration;
+    }
+
+    public static ElasticsearchWriterConfiguration detectWriterConfiguration(Config elasticsearch) {
+
+        ElasticsearchConfiguration elasticsearchConfiguration = detectConfiguration(elasticsearch);
+        ElasticsearchWriterConfiguration elasticsearchWriterConfiguration  = mapper.convertValue(elasticsearchConfiguration, ElasticsearchWriterConfiguration.class);
+
+        String index = elasticsearch.getString("index");
+        String type = elasticsearch.getString("type");
+
+        elasticsearchWriterConfiguration.setIndex(index);
+        elasticsearchWriterConfiguration.setType(type);
+
+        return elasticsearchWriterConfiguration;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
index 8ffcbd5..cc9b3fc 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
@@ -105,8 +105,8 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Iterabl
     }
     public ElasticsearchPersistReader(ElasticsearchReaderConfiguration elasticsearchConfiguration) {
         this.elasticsearchClientManager = new ElasticsearchClientManager(elasticsearchConfiguration);
-        indexes.add(elasticsearchConfiguration.getIndex());
-        types.add(elasticsearchConfiguration.getType());
+        indexes.addAll(elasticsearchConfiguration.getIndexes());
+        types.addAll(elasticsearchConfiguration.getTypes());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 9390219..ab35edd 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -61,6 +61,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
 
     private volatile int totalSent = 0;
     private volatile int totalSeconds = 0;
+    private volatile int totalAttempted = 0;
     private volatile int totalOk = 0;
     private volatile int totalFailed = 0;
     private volatile int totalBatchCount = 0;
@@ -310,6 +311,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
                         thisOk++;
                 }
 
+                totalAttempted += thisSent;
                 totalOk += thisOk;
                 totalFailed += thisFailed;
                 totalSeconds += (thisMillis / 1000);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json
index 698da1a..1f1c720 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json
@@ -6,13 +6,19 @@
     "extends": {"$ref":"ElasticsearchConfiguration.json"},
     "javaInterfaces": ["java.io.Serializable"],
     "properties": {
-        "index": {
-            "type": "string",
-            "description": "Index to write to"
+        "indexes": {
+            "type": "array",
+            "items": {
+                "type": "string"
+            },
+            "description": "Indexes to read from"
         },
-        "type": {
-            "type": "string",
-            "description": "Type to write as"
+        "types": {
+            "type": "array",
+            "items": {
+                "type": "string"
+            },
+            "description": "Types to read from"
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
index abc0c1a..270172f 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
@@ -16,6 +16,7 @@ import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.twitter.provider.TwitterEventClassifier;
 import org.apache.streams.twitter.serializer.*;
+import org.apache.streams.util.ComponentUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,9 +42,7 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable {
     private Class inClass;
     private Class outClass;
 
-    private TwitterJsonTweetActivitySerializer twitterJsonTweetActivitySerializer = new TwitterJsonTweetActivitySerializer();
-    private TwitterJsonRetweetActivitySerializer twitterJsonRetweetActivitySerializer = new TwitterJsonRetweetActivitySerializer();
-    private TwitterJsonDeleteActivitySerializer twitterJsonDeleteActivitySerializer = new TwitterJsonDeleteActivitySerializer();
+    private TwitterJsonActivitySerializer twitterJsonActivitySerializer;
 
     public final static String TERMINATE = new String("TERMINATE");
 
@@ -65,22 +64,20 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable {
         while(true) {
             String item;
             try {
-                item = inQueue.take();
+
+                item = ComponentUtils.pollUntilStringNotEmpty(inQueue);
+
                 if(item instanceof String && item.equals(TERMINATE)) {
                     LOGGER.info("Terminating!");
                     break;
                 }
 
-                System.out.println(item);
+                ObjectNode objectNode = (ObjectNode) mapper.readTree(item);
 
-                if( StringUtils.isNotEmpty(item) ) {
-                    ObjectNode objectNode = (ObjectNode) mapper.readTree(item);
+                StreamsDatum rawDatum = new StreamsDatum(objectNode);
 
-                    StreamsDatum rawDatum = new StreamsDatum(objectNode);
-
-                    for (StreamsDatum entry : process(rawDatum)) {
-                        outQueue.offer(entry);
-                    }
+                for (StreamsDatum entry : process(rawDatum)) {
+                    ComponentUtils.offerUntilSuccess(entry, outQueue);
                 }
 
             } catch (Exception e) {
@@ -95,21 +92,9 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable {
         Object result = null;
 
         if( outClass.equals( Activity.class )) {
-            if( inClass.equals( Delete.class )) {
-                LOGGER.debug("ACTIVITY DELETE");
-                result = twitterJsonDeleteActivitySerializer.deserialize(
-                        mapper.writeValueAsString(event));
-            } else if ( inClass.equals( Retweet.class )) {
-                LOGGER.debug("ACTIVITY RETWEET");
-                result = twitterJsonRetweetActivitySerializer.deserialize(
+                LOGGER.debug("ACTIVITY");
+                result = twitterJsonActivitySerializer.deserialize(
                         mapper.writeValueAsString(event));
-            } else if ( inClass.equals( Tweet.class )) {
-                LOGGER.debug("ACTIVITY TWEET");
-                result = twitterJsonTweetActivitySerializer.deserialize(
-                        mapper.writeValueAsString(event));
-            } else {
-                return null;
-            }
         } else if( outClass.equals( Tweet.class )) {
             if ( inClass.equals( Tweet.class )) {
                 LOGGER.debug("TWEET");
@@ -210,7 +195,7 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable {
 
     @Override
     public void prepare(Object configurationObject) {
-
+        twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
index 1c1e2fb..68820c9 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
@@ -31,7 +31,7 @@ public class TwitterTypeConverter implements StreamsProcessor {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTypeConverter.class);
 
-    private ObjectMapper mapper = new StreamsTwitterMapper();
+    private ObjectMapper mapper;
 
     private Queue<StreamsDatum> inQueue;
     private Queue<StreamsDatum> outQueue;
@@ -39,7 +39,7 @@ public class TwitterTypeConverter implements StreamsProcessor {
     private Class inClass;
     private Class outClass;
 
-    private TwitterJsonActivitySerializer twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
+    private TwitterJsonActivitySerializer twitterJsonActivitySerializer;
 
     public final static String TERMINATE = new String("TERMINATE");
 
@@ -176,7 +176,8 @@ public class TwitterTypeConverter implements StreamsProcessor {
 
     @Override
     public void prepare(Object o) {
-
+        mapper = new StreamsTwitterMapper();
+        twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
index 2ae8d59..b1a1a07 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
@@ -4,6 +4,7 @@ import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigException;
 import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.twitter.TwitterBasicAuthConfiguration;
 import org.apache.streams.twitter.TwitterOAuthConfiguration;
 import org.apache.streams.twitter.TwitterStreamConfiguration;
 import org.slf4j.Logger;
@@ -19,23 +20,35 @@ public class TwitterStreamConfigurator {
     private final static Logger LOGGER = LoggerFactory.getLogger(TwitterStreamConfigurator.class);
 
     public static TwitterStreamConfiguration detectConfiguration(Config twitter) {
-        Config oauth = StreamsConfigurator.config.getConfig("twitter.oauth");
 
         TwitterStreamConfiguration twitterStreamConfiguration = new TwitterStreamConfiguration();
         twitterStreamConfiguration.setProtocol(twitter.getString("protocol"));
         twitterStreamConfiguration.setHost(twitter.getString("host"));
         twitterStreamConfiguration.setPort(twitter.getLong("port"));
         twitterStreamConfiguration.setVersion(twitter.getString("version"));
-        TwitterOAuthConfiguration twitterOAuthConfiguration = new TwitterOAuthConfiguration();
-        twitterOAuthConfiguration.setConsumerKey(oauth.getString("consumerKey"));
-        twitterOAuthConfiguration.setConsumerSecret(oauth.getString("consumerSecret"));
-        twitterOAuthConfiguration.setAccessToken(oauth.getString("accessToken"));
-        twitterOAuthConfiguration.setAccessTokenSecret(oauth.getString("accessTokenSecret"));
-        twitterStreamConfiguration.setOauth(twitterOAuthConfiguration);
+
+        try {
+            Config basicauth = StreamsConfigurator.config.getConfig("twitter.basicauth");
+            TwitterBasicAuthConfiguration twitterBasicAuthConfiguration = new TwitterBasicAuthConfiguration();
+            twitterBasicAuthConfiguration.setUsername(basicauth.getString("username"));
+            twitterBasicAuthConfiguration.setPassword(basicauth.getString("password"));
+            twitterStreamConfiguration.setBasicauth(twitterBasicAuthConfiguration);
+        } catch( ConfigException ce ) {}
+
+        try {
+            Config oauth = StreamsConfigurator.config.getConfig("twitter.oauth");
+            TwitterOAuthConfiguration twitterOAuthConfiguration = new TwitterOAuthConfiguration();
+            twitterOAuthConfiguration.setConsumerKey(oauth.getString("consumerKey"));
+            twitterOAuthConfiguration.setConsumerSecret(oauth.getString("consumerSecret"));
+            twitterOAuthConfiguration.setAccessToken(oauth.getString("accessToken"));
+            twitterOAuthConfiguration.setAccessTokenSecret(oauth.getString("accessTokenSecret"));
+            twitterStreamConfiguration.setOauth(twitterOAuthConfiguration);
+        } catch( ConfigException ce ) {}
 
         try {
             twitterStreamConfiguration.setTrack(twitter.getStringList("track"));
         } catch( ConfigException ce ) {}
+
         try {
             List<Long> follows = Lists.newArrayList();
             for( Integer id : twitter.getIntList("follow"))

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index 6a3def6..2b8a2f1 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -13,6 +13,7 @@ import com.twitter.hbc.core.Constants;
 import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint;
 import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
 import com.twitter.hbc.core.endpoint.StreamingEndpoint;
+import com.twitter.hbc.core.endpoint.UserstreamEndpoint;
 import com.twitter.hbc.core.processor.StringDelimitedProcessor;
 import com.twitter.hbc.httpclient.BasicClient;
 import com.twitter.hbc.httpclient.auth.Authentication;
@@ -135,7 +136,18 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable {
         Preconditions.checkNotNull(this.klass);
 
         Preconditions.checkNotNull(config.getEndpoint());
-        if(config.getEndpoint().endsWith("sample.json") ) {
+
+        if(config.getEndpoint().endsWith("user.json") ) {
+            endpoint = new UserstreamEndpoint();
+
+            Optional<String> with = Optional.fromNullable(config.getWith());
+            Optional<String> replies = Optional.fromNullable(config.getReplies());
+
+            if( with.isPresent() ) endpoint.addPostParameter("with", with.get());
+            if( replies.isPresent() ) endpoint.addPostParameter("replies", replies.get());
+
+        }
+        else if(config.getEndpoint().endsWith("sample.json") ) {
             endpoint = new StatusesSampleEndpoint();
 
             Optional<List<String>> track = Optional.fromNullable(config.getTrack());
@@ -143,6 +155,7 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable {
 
             if( track.isPresent() ) endpoint.addPostParameter("track", Joiner.on(",").join(track.get()));
             if( follow.isPresent() ) endpoint.addPostParameter("follow", Joiner.on(",").join(follow.get()));
+
         }
         else if( config.getEndpoint().endsWith("firehose.json"))
             endpoint = new StatusesFirehoseEndpoint();
@@ -174,12 +187,10 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable {
             return;
         }
 
-        endpoint.addPostParameter("with", config.getWith());
-        endpoint.addPostParameter("replies", config.getReplies());
 
         client = new ClientBuilder()
                 .name("apache/streams/streams-contrib/streams-provider-twitter")
-                .hosts(Constants.STREAM_HOST)
+                .hosts(config.getProtocol() + "://" + config.getHost())
                 .endpoint(endpoint)
                 .authentication(auth)
                 .processor(new StringDelimitedProcessor(inQueue))

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 40fb961..242d943 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -254,6 +254,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
 
         Preconditions.checkNotNull(config.getFollow());
 
+        Preconditions.checkArgument(config.getHost().equals("api.twitter.com"));
         Preconditions.checkArgument(config.getEndpoint().equals("statuses/user_timeline"));
 
         Boolean jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
index 275ed7e..7ef74ee 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
@@ -7,6 +7,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.DeserializationContext;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
 import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import org.joda.time.DateTime;
@@ -37,7 +38,9 @@ public class StreamsJacksonMapper extends ObjectMapper {
         configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
         configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.FALSE);
         configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, Boolean.TRUE);
-        setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
+        // If a user has an 'object' that does not have an explicit mapping, don't cause the serialization to fail.
+        configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, Boolean.FALSE);
+        setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.DEFAULT);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
index 323e40c..bcf0f65 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
@@ -2,6 +2,7 @@ package org.apache.streams.jackson;
 
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import org.joda.time.DateTime;
+import org.joda.time.Period;
 
 /**
  * Created by sblackmon on 3/27/14.
@@ -12,6 +13,10 @@ public class StreamsJacksonModule extends SimpleModule {
         super();
         addSerializer(DateTime.class, new StreamsDateTimeSerializer(DateTime.class));
         addDeserializer(DateTime.class, new StreamsDateTimeDeserializer(DateTime.class));
+
+        addSerializer(Period.class, new StreamsPeriodSerializer(Period.class));
+        addDeserializer(Period.class, new StreamsPeriodDeserializer(Period.class));
     }
 
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
index 694cb76..8006560 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
@@ -10,10 +10,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
+import java.util.*;
 
 /**
  *
@@ -116,13 +113,13 @@ public abstract class BaseStreamsTask implements StreamsTask {
         try {
 
             if(datum.document instanceof ObjectNode) {
-                return new StreamsDatum(((ObjectNode) datum.document).deepCopy(), datum.timestamp, datum.sequenceid);
+                return copyMetaData(datum, new StreamsDatum(((ObjectNode) datum.document).deepCopy(), datum.timestamp, datum.sequenceid));
             }
             else if(datum.document instanceof Activity) {
 
-                return new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document), Activity.class),
+                return copyMetaData(datum, new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document), Activity.class),
                                         datum.timestamp,
-                                        datum.sequenceid);
+                                        datum.sequenceid));
             }
 //            else if(this.mapper.canSerialize(datum.document.getClass())){
 //                return new StreamsDatum(this.mapper.readValue(this.mapper.writeValueAsString(datum.document), datum.document.getClass()),
@@ -156,4 +153,17 @@ public abstract class BaseStreamsTask implements StreamsTask {
         }
         while( !success );
     }
+
+    private StreamsDatum copyMetaData(StreamsDatum copyFrom, StreamsDatum copyTo) {
+        Map<String, Object> fromMeta = copyFrom.getMetadata();
+        Map<String, Object> toMeta = copyTo.getMetadata();
+        for(String key : fromMeta.keySet()) {
+            Object value = fromMeta.get(key);
+            if(value instanceof Serializable)
+                toMeta.put(key, SerializationUtil.cloneBySerialization(value));
+            else //hope for the best - should be serializable
+                toMeta.put(key, value);
+        }
+        return copyTo;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b8fef9d1/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
index 5bf1d53..609d113 100644
--- a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
+++ b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
@@ -1,5 +1,7 @@
 package org.apache.streams.util;
 
+import org.apache.commons.lang3.StringUtils;
+
 import java.util.Queue;
 
 /**
@@ -19,4 +21,20 @@ public class ComponentUtils {
         while( !success );
     }
 
+    public static String pollUntilStringNotEmpty(Queue queue) {
+
+        String result = null;
+        do {
+            synchronized( ComponentUtils.class ) {
+                try {
+                    result = (String) queue.remove();
+                } catch( Exception e ) {}
+            }
+            Thread.yield();
+        }
+        while( result == null && !StringUtils.isNotEmpty(result) );
+
+        return result;
+    }
+
 }