You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/05/13 17:22:57 UTC

[07/14] git commit: fixes to get streams-examples working

fixes to get streams-examples working


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7e4e1095
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7e4e1095
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7e4e1095

Branch: refs/heads/master
Commit: 7e4e1095f09321210d6ce3acb56f4041680f2d6d
Parents: 7293594
Author: sblackmon <sb...@w2odigital.com>
Authored: Thu May 8 13:41:28 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Thu May 8 13:41:28 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchPersistWriter.java             | 80 ++---------------
 .../streams-provider-twitter/pom.xml            | 13 +++
 .../provider/TwitterEventClassifier.java        | 10 ++-
 .../twitter/provider/TwitterStreamProvider.java |  7 +-
 .../provider/TwitterTimelineProvider.java       |  7 +-
 .../TwitterJsonActivitySerializer.java          |  3 +-
 .../TwitterJsonDeleteActivitySerializer.java    |  3 +-
 .../TwitterJsonRetweetActivitySerializer.java   |  3 +-
 .../TwitterJsonTweetActivitySerializer.java     |  7 +-
 ...erJsonUserstreameventActivitySerializer.java | 92 ++++++++++++++++++++
 .../main/jsonschema/com/twitter/FriendList.json | 20 +++++
 .../jsonschema/com/twitter/UserstreamEvent.json | 47 ++++++++++
 .../jackson/StreamsDateTimeDeserializer.java    |  3 +-
 .../jackson/StreamsDateTimeSerializer.java      |  5 +-
 .../jackson/StreamsPeriodDeserializer.java      |  3 +-
 .../jackson/StreamsPeriodSerializer.java        |  3 +-
 16 files changed, 211 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 80d2775..e99c4ff 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -47,10 +47,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
     private static final NumberFormat NUMBER_FORMAT = new DecimalFormat("###,###,###,###");
     private static final Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l;
     private static final long WAITING_DOCS_LIMIT = 10000;
-    private static final int BYTES_IN_MB = 1024 * 1024;
-    private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
-
-    private final List<String> affectedIndexes = new ArrayList<String>();
+    private static final int  BYTES_IN_MB = 1024*1024;
+    private static final int  BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
 
     private ObjectMapper mapper = new StreamsJacksonMapper();
     private ElasticsearchClientManager manager;
@@ -62,8 +60,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
 
     private long batchSize;
     private boolean veryLargeBulk;  // by default this setting is set to false
-    private int totalRecordsWritten = 0;
-    
+
     protected Thread task;
 
     protected volatile Queue<StreamsDatum> persistQueue;
@@ -79,8 +76,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
     private volatile long totalSizeInBytes = 0;
     private volatile long batchSizeInBytes = 0;
     private volatile int batchItemsSent = 0;
-    private volatile int totalByteCount = 0;
-    private volatile int byteCount = 0;
+    private volatile int  totalByteCount = 0;
+    private volatile int  byteCount = 0;
 
     public ElasticsearchPersistWriter() {
         Config config = StreamsConfigurator.config.getConfig("elasticsearch");
@@ -121,8 +118,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
         return totalOk;
     }
     
-    private ObjectMapper mapper = new StreamsJacksonMapper();
-
     public int getTotalFailed() {
         return totalFailed;
     }
@@ -155,13 +150,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
         return (client != null);
     }
 
-    private ElasticsearchWriterConfiguration config;
-
-    private static final int  BYTES_IN_MB = 1024*1024;
-    private static final int  BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
-    private volatile int  totalByteCount = 0;
-    private volatile int  byteCount = 0;
-    
     @Override
     public void write(StreamsDatum streamsDatum) {
 
@@ -254,6 +242,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
     @Override
     public void prepare(Object configurationObject) {
         mapper = StreamsJacksonMapper.getInstance();
+        veryLargeBulk = this.config.getBulk();
+        batchSize = this.config.getBatchSize();
         start();
     }
 
@@ -395,18 +385,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
         }
     }
 
-    private void trackItemAndBytesWritten(long sizeInBytes)
-    {
-        currentItems++;
-        batchItemsSent++;
-        batchSizeInBytes += sizeInBytes;
-
-        // If our queue is larger than our flush threashold, then we should flush the queue.
-        if( (batchSizeInBytes > flushThresholdSizeInBytes) ||
-                (currentItems >= batchSize) )
-            flushInternal();
-    }
-
     private void checkAndCreateBulkRequest()
     {
         // Synchronize to ensure that we don't lose any records
@@ -512,14 +490,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
         return toReturn;
     }
 
-    @Override
-    public void prepare(Object configurationObject) {
-        mapper = StreamsJacksonMapper.getInstance();
-        veryLargeBulk = this.config.getBulk();
-        batchSize = this.config.getBatchSize();
-        start();
-    }
-    
     private void flush(final BulkRequestBuilder bulkRequest, final Integer thisSent, final Long thisSizeInBytes) {
         bulkRequest.execute().addListener(new ActionListener<BulkResponse>() {
             @Override
@@ -571,40 +541,4 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
             flushInternal();
     }
 
-    private void checkAndCreateBulkRequest() {
-        // Synchronize to ensure that we don't lose any records
-        synchronized (this) {
-            if (bulkRequest == null)
-                bulkRequest = this.manager.getClient().prepareBulk();
-        }
-    }
-
-    private void checkIndexImplications(String indexName) {
-
-        // check to see if we have seen this index before.
-        if (this.affectedIndexes.contains(indexName))
-            return;
-
-        // we haven't log this index.
-        this.affectedIndexes.add(indexName);
-
-        // Check to see if we are in 'veryLargeBulk' mode
-        // if we aren't, exit early
-        if (!this.veryLargeBulk)
-            return;
-
-
-        // They are in 'very large bulk' mode we want to turn off refreshing the index.
-
-        // Create a request then add the setting to tell it to stop refreshing the interval
-        UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
-        updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", -1));
-
-        // submit to ElasticSearch
-        this.manager.getClient()
-                .admin()
-                .indices()
-                .updateSettings(updateSettingsRequest)
-                .actionGet();
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml
index 8a41ca5..c104810 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -49,6 +49,11 @@
             <artifactId>streams-config</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-util</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
@@ -70,6 +75,12 @@
             <artifactId>twitter4j-core</artifactId>
             <version>[4.0,)</version>
         </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
@@ -116,6 +127,8 @@
                         <sourcePath>src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json</sourcePath>
                         <sourcePath>src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json</sourcePath>
                         <sourcePath>src/main/jsonschema/com/twitter/Delete.json</sourcePath>
+                        <sourcePath>src/main/jsonschema/com/twitter/UserstreamEvent.json</sourcePath>
+                        <sourcePath>src/main/jsonschema/com/twitter/FriendList.json</sourcePath>
                         <sourcePath>src/main/jsonschema/com/twitter/Retweet.json</sourcePath>
                         <sourcePath>src/main/jsonschema/com/twitter/tweet.json</sourcePath>
                     </sourcePaths>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
index b577e42..3a8bf6d 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
@@ -2,11 +2,8 @@ package org.apache.streams.twitter.provider;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
-import com.jayway.jsonassert.JsonAssert;
 import org.apache.commons.lang.StringUtils;
-import org.apache.streams.twitter.pojo.Delete;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.*;
 import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
 
 import java.io.IOException;
@@ -47,6 +44,11 @@ public class TwitterEventClassifier {
             return Retweet.class;
         else if( objectNode.findValue("delete") != null )
             return Delete.class;
+        else if( objectNode.findValue("friends") != null ||
+                 objectNode.findValue("friends_str") != null )
+            return FriendList.class;
+        else if( objectNode.findValue("target_object") != null )
+            return UserstreamEvent.class;
         else
             return Tweet.class;
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index ba88803..fb160ef 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -50,9 +50,9 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
         this.config = config;
     }
 
-    protected BlockingQueue hosebirdQueue = new LinkedBlockingQueue<String>(1000);
+    protected BlockingQueue<String> hosebirdQueue;
 
-    protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>(1000);
+    protected volatile Queue<StreamsDatum> providerQueue;
 
     protected Hosts hosebirdHosts;
     protected Authentication auth;
@@ -194,6 +194,9 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
 
         LOGGER.debug("host={}\tendpoint={}\taut={}", new Object[] {hosebirdHosts,endpoint,auth});
 
+        hosebirdQueue = new LinkedBlockingQueue<String>(1000);
+        providerQueue = new LinkedBlockingQueue<StreamsDatum>(1000);
+
         client = new ClientBuilder()
             .name("apache/streams/streams-contrib/streams-provider-twitter")
             .hosts(hosebirdHosts)

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index db1ec76..06ba7be 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -23,9 +23,7 @@ import twitter4j.json.DataObjectFactory;
 
 import java.io.Serializable;
 import java.math.BigInteger;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
+import java.util.*;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -260,7 +258,8 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
                 .setAsyncNumThreads(3)
                 .setRestBaseURL(baseUrl)
                 .setIncludeMyRetweetEnabled(Boolean.TRUE)
-                .setIncludeRTsEnabled(Boolean.TRUE)
+                // not sure where this method went...
+                //.setIncludeRTsEnabled(Boolean.TRUE)
                 .setPrettyDebugEnabled(Boolean.TRUE);
 
         return new TwitterFactory(builder.build()).getInstance();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
index bfceae0..0ab3448 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
@@ -31,13 +31,14 @@ import org.joda.time.format.DateTimeFormatter;
 import org.joda.time.format.ISODateTimeFormat;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 
 /**
  * Created by sblackmon on 3/26/14.
  */
-public class TwitterJsonActivitySerializer implements ActivitySerializer<String>
+public class TwitterJsonActivitySerializer implements ActivitySerializer<String>, Serializable
 {
 
     public TwitterJsonActivitySerializer() {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
index 40be0f6..4a0f348 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
@@ -13,6 +13,7 @@ import org.apache.streams.pojo.json.Actor;
 import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Tweet;
 
+import java.io.Serializable;
 import java.util.List;
 
 import static org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer.*;
@@ -24,7 +25,7 @@ import static org.apache.streams.twitter.serializer.TwitterJsonActivitySerialize
 * Time: 9:24 AM
 * To change this template use File | Settings | File Templates.
 */
-public class TwitterJsonDeleteActivitySerializer implements ActivitySerializer<String> {
+public class TwitterJsonDeleteActivitySerializer implements ActivitySerializer<String>, Serializable {
 
     @Override
     public String serializationFormat() {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
index 51e39b1..f8ade3f 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
@@ -17,6 +17,7 @@ import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.twitter.pojo.User;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -31,7 +32,7 @@ import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
 * Time: 9:24 AM
 * To change this template use File | Settings | File Templates.
 */
-public class TwitterJsonRetweetActivitySerializer implements ActivitySerializer<String> {
+public class TwitterJsonRetweetActivitySerializer implements ActivitySerializer<String>, Serializable {
 
     public TwitterJsonRetweetActivitySerializer() {
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
index b141482..1d00b7a 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
@@ -18,6 +18,7 @@ import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.twitter.pojo.User;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -32,11 +33,7 @@ import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
 * Time: 9:24 AM
 * To change this template use File | Settings | File Templates.
 */
-public class TwitterJsonTweetActivitySerializer implements ActivitySerializer<String> {
-
-    public TwitterJsonTweetActivitySerializer() {
-
-    }
+public class TwitterJsonTweetActivitySerializer implements ActivitySerializer<String>, Serializable {
 
     @Override
     public String serializationFormat() {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java
new file mode 100644
index 0000000..c0768e3
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java
@@ -0,0 +1,92 @@
+package org.apache.streams.twitter.serializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Strings;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.apache.streams.pojo.json.Actor;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.UserstreamEvent;
+
+import java.util.List;
+
+import static org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer.*;
+
+/**
+* Created with IntelliJ IDEA.
+* User: mdelaet
+* Date: 9/30/13
+* Time: 9:24 AM
+* To change this template use File | Settings | File Templates.
+*/
+public class TwitterJsonUserstreameventActivitySerializer implements ActivitySerializer<String> {
+
+    @Override
+    public String serializationFormat() {
+        return null;
+    }
+
+    @Override
+    public String serialize(Activity deserialized) throws ActivitySerializerException {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public Activity deserialize(String serialized) throws ActivitySerializerException {
+        return null;
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<String> serializedList) {
+        return null;
+    }
+
+    public Activity convert(ObjectNode item) throws ActivitySerializerException {
+
+        ObjectMapper mapper = StreamsTwitterMapper.getInstance();
+        UserstreamEvent event = null;
+        try {
+            event = mapper.treeToValue(item, UserstreamEvent.class);
+        } catch (JsonProcessingException e) {
+            e.printStackTrace();
+        }
+
+        Activity activity = new Activity();
+        activity.setActor(buildActor(event));
+        activity.setVerb(detectVerb(event));
+        activity.setObject(buildActivityObject(event));
+        activity.setId(TwitterJsonActivitySerializer.formatId(activity.getVerb()));
+        if(Strings.isNullOrEmpty(activity.getId()))
+            throw new ActivitySerializerException("Unable to determine activity id");
+        activity.setProvider(getProvider());
+        return activity;
+    }
+
+    public Actor buildActor(UserstreamEvent event) {
+        Actor actor = new Actor();
+        //actor.setId(formatId(delete.getDelete().getStatus().getUserIdStr()));
+        return actor;
+    }
+
+    public ActivityObject buildActivityObject(UserstreamEvent event) {
+        ActivityObject actObj = new ActivityObject();
+        //actObj.setId(formatId(delete.getDelete().getStatus().getIdStr()));
+        //actObj.setObjectType("tweet");
+        return actObj;
+    }
+
+    public String detectVerb(UserstreamEvent event) {
+        return null;
+    }
+
+    public ActivityObject buildTarget(UserstreamEvent event) {
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/FriendList.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/FriendList.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/FriendList.json
new file mode 100644
index 0000000..5dd7687
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/FriendList.json
@@ -0,0 +1,20 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.twitter.pojo.FriendList",
+    "properties": {
+        "friends": {
+            "type": "array",
+            "items": {
+                "type": "integer"
+            }
+        },
+        "friends_str": {
+            "type": "array",
+            "items": {
+                "type": "string"
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/UserstreamEvent.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/UserstreamEvent.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/UserstreamEvent.json
new file mode 100644
index 0000000..07b5883
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/UserstreamEvent.json
@@ -0,0 +1,47 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.twitter.pojo.UserstreamEvent",
+    "properties": {
+        "created_at": {
+            "type": "string",
+            "format" : "date-time"
+        },
+        "event_type": {
+            "type": "string",
+            "enum" : [
+                "access_revoked",
+                "block",
+                "unblock",
+                "favorite",
+                "unfavorite",
+                "follow",
+                "unfollow",
+                "list_created",
+                "list_destroyed",
+                "list_updated",
+                "list_member_added",
+                "list_member_removed",
+                "list_user_subscribed",
+                "list_user_unsubscribed",
+                "user_update"
+            ]
+        },
+        "source": {
+            "type": "string",
+            "items": {
+                "type": "integer"
+            }
+        },
+        "target": {
+            "type": "string",
+            "items": {
+                "type": "integer"
+            }
+        },
+        "target_object": {
+            "type": "object"
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
index 3286e74..28f297c 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
@@ -7,11 +7,12 @@ import org.apache.streams.data.util.RFC3339Utils;
 import org.joda.time.DateTime;
 
 import java.io.IOException;
+import java.io.Serializable;
 
 /**
  * Created by sblackmon on 3/27/14.
  */
-public class StreamsDateTimeDeserializer extends StdDeserializer<DateTime> {
+public class StreamsDateTimeDeserializer extends StdDeserializer<DateTime> implements Serializable {
 
     protected StreamsDateTimeDeserializer(Class<DateTime> dateTimeClass) {
         super(dateTimeClass);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java
index 26bc157..89f9100 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java
@@ -11,11 +11,14 @@ import org.joda.time.format.DateTimeFormatter;
 import org.joda.time.format.ISODateTimeFormat;
 
 import java.io.IOException;
+import java.io.Serializable;
 
 /**
  * Created by sblackmon on 3/27/14.
  */
-public class StreamsDateTimeSerializer extends StdSerializer<DateTime> {
+public class StreamsDateTimeSerializer extends StdSerializer<DateTime> implements Serializable {
+
+
 
     protected StreamsDateTimeSerializer(Class<DateTime> dateTimeClass) {
         super(dateTimeClass);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodDeserializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodDeserializer.java
index ff765f6..9402878 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodDeserializer.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodDeserializer.java
@@ -7,8 +7,9 @@ import org.apache.streams.data.util.RFC3339Utils;
 import org.joda.time.Period;
 
 import java.io.IOException;
+import java.io.Serializable;
 
-public class StreamsPeriodDeserializer extends StdDeserializer<Period>
+public class StreamsPeriodDeserializer extends StdDeserializer<Period> implements Serializable
 {
 
     protected StreamsPeriodDeserializer(Class<Period> dateTimeClass) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodSerializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodSerializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodSerializer.java
index 614cbdd..ef28c8e 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodSerializer.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodSerializer.java
@@ -8,8 +8,9 @@ import org.joda.time.DateTime;
 import org.joda.time.Period;
 
 import java.io.IOException;
+import java.io.Serializable;
 
-public class StreamsPeriodSerializer extends StdSerializer<Period>
+public class StreamsPeriodSerializer extends StdSerializer<Period> implements Serializable
 {
     protected StreamsPeriodSerializer(Class<Period> dateTimeClass) {
         super(dateTimeClass);