You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/05/13 17:22:57 UTC
[07/14] git commit: fixes to get streams-examples working
fixes to get streams-examples working
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7e4e1095
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7e4e1095
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7e4e1095
Branch: refs/heads/master
Commit: 7e4e1095f09321210d6ce3acb56f4041680f2d6d
Parents: 7293594
Author: sblackmon <sb...@w2odigital.com>
Authored: Thu May 8 13:41:28 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Thu May 8 13:41:28 2014 -0500
----------------------------------------------------------------------
.../ElasticsearchPersistWriter.java | 80 ++---------------
.../streams-provider-twitter/pom.xml | 13 +++
.../provider/TwitterEventClassifier.java | 10 ++-
.../twitter/provider/TwitterStreamProvider.java | 7 +-
.../provider/TwitterTimelineProvider.java | 7 +-
.../TwitterJsonActivitySerializer.java | 3 +-
.../TwitterJsonDeleteActivitySerializer.java | 3 +-
.../TwitterJsonRetweetActivitySerializer.java | 3 +-
.../TwitterJsonTweetActivitySerializer.java | 7 +-
...erJsonUserstreameventActivitySerializer.java | 92 ++++++++++++++++++++
.../main/jsonschema/com/twitter/FriendList.json | 20 +++++
.../jsonschema/com/twitter/UserstreamEvent.json | 47 ++++++++++
.../jackson/StreamsDateTimeDeserializer.java | 3 +-
.../jackson/StreamsDateTimeSerializer.java | 5 +-
.../jackson/StreamsPeriodDeserializer.java | 3 +-
.../jackson/StreamsPeriodSerializer.java | 3 +-
16 files changed, 211 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 80d2775..e99c4ff 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -47,10 +47,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
private static final NumberFormat NUMBER_FORMAT = new DecimalFormat("###,###,###,###");
private static final Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l;
private static final long WAITING_DOCS_LIMIT = 10000;
- private static final int BYTES_IN_MB = 1024 * 1024;
- private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
-
- private final List<String> affectedIndexes = new ArrayList<String>();
+ private static final int BYTES_IN_MB = 1024*1024;
+ private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
private ObjectMapper mapper = new StreamsJacksonMapper();
private ElasticsearchClientManager manager;
@@ -62,8 +60,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
private long batchSize;
private boolean veryLargeBulk; // by default this setting is set to false
- private int totalRecordsWritten = 0;
-
+
protected Thread task;
protected volatile Queue<StreamsDatum> persistQueue;
@@ -79,8 +76,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
private volatile long totalSizeInBytes = 0;
private volatile long batchSizeInBytes = 0;
private volatile int batchItemsSent = 0;
- private volatile int totalByteCount = 0;
- private volatile int byteCount = 0;
+ private volatile int totalByteCount = 0;
+ private volatile int byteCount = 0;
public ElasticsearchPersistWriter() {
Config config = StreamsConfigurator.config.getConfig("elasticsearch");
@@ -121,8 +118,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
return totalOk;
}
- private ObjectMapper mapper = new StreamsJacksonMapper();
-
public int getTotalFailed() {
return totalFailed;
}
@@ -155,13 +150,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
return (client != null);
}
- private ElasticsearchWriterConfiguration config;
-
- private static final int BYTES_IN_MB = 1024*1024;
- private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
- private volatile int totalByteCount = 0;
- private volatile int byteCount = 0;
-
@Override
public void write(StreamsDatum streamsDatum) {
@@ -254,6 +242,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
@Override
public void prepare(Object configurationObject) {
mapper = StreamsJacksonMapper.getInstance();
+ veryLargeBulk = this.config.getBulk();
+ batchSize = this.config.getBatchSize();
start();
}
@@ -395,18 +385,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
}
}
- private void trackItemAndBytesWritten(long sizeInBytes)
- {
- currentItems++;
- batchItemsSent++;
- batchSizeInBytes += sizeInBytes;
-
- // If our queue is larger than our flush threashold, then we should flush the queue.
- if( (batchSizeInBytes > flushThresholdSizeInBytes) ||
- (currentItems >= batchSize) )
- flushInternal();
- }
-
private void checkAndCreateBulkRequest()
{
// Synchronize to ensure that we don't lose any records
@@ -512,14 +490,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
return toReturn;
}
- @Override
- public void prepare(Object configurationObject) {
- mapper = StreamsJacksonMapper.getInstance();
- veryLargeBulk = this.config.getBulk();
- batchSize = this.config.getBatchSize();
- start();
- }
-
private void flush(final BulkRequestBuilder bulkRequest, final Integer thisSent, final Long thisSizeInBytes) {
bulkRequest.execute().addListener(new ActionListener<BulkResponse>() {
@Override
@@ -571,40 +541,4 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
flushInternal();
}
- private void checkAndCreateBulkRequest() {
- // Synchronize to ensure that we don't lose any records
- synchronized (this) {
- if (bulkRequest == null)
- bulkRequest = this.manager.getClient().prepareBulk();
- }
- }
-
- private void checkIndexImplications(String indexName) {
-
- // check to see if we have seen this index before.
- if (this.affectedIndexes.contains(indexName))
- return;
-
- // we haven't log this index.
- this.affectedIndexes.add(indexName);
-
- // Check to see if we are in 'veryLargeBulk' mode
- // if we aren't, exit early
- if (!this.veryLargeBulk)
- return;
-
-
- // They are in 'very large bulk' mode we want to turn off refreshing the index.
-
- // Create a request then add the setting to tell it to stop refreshing the interval
- UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
- updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", -1));
-
- // submit to ElasticSearch
- this.manager.getClient()
- .admin()
- .indices()
- .updateSettings(updateSettingsRequest)
- .actionGet();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml
index 8a41ca5..c104810 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -49,6 +49,11 @@
<artifactId>streams-config</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
@@ -70,6 +75,12 @@
<artifactId>twitter4j-core</artifactId>
<version>[4.0,)</version>
</dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
@@ -116,6 +127,8 @@
<sourcePath>src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json</sourcePath>
<sourcePath>src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json</sourcePath>
<sourcePath>src/main/jsonschema/com/twitter/Delete.json</sourcePath>
+ <sourcePath>src/main/jsonschema/com/twitter/UserstreamEvent.json</sourcePath>
+ <sourcePath>src/main/jsonschema/com/twitter/FriendList.json</sourcePath>
<sourcePath>src/main/jsonschema/com/twitter/Retweet.json</sourcePath>
<sourcePath>src/main/jsonschema/com/twitter/tweet.json</sourcePath>
</sourcePaths>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
index b577e42..3a8bf6d 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
@@ -2,11 +2,8 @@ package org.apache.streams.twitter.provider;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
-import com.jayway.jsonassert.JsonAssert;
import org.apache.commons.lang.StringUtils;
-import org.apache.streams.twitter.pojo.Delete;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.*;
import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
import java.io.IOException;
@@ -47,6 +44,11 @@ public class TwitterEventClassifier {
return Retweet.class;
else if( objectNode.findValue("delete") != null )
return Delete.class;
+ else if( objectNode.findValue("friends") != null ||
+ objectNode.findValue("friends_str") != null )
+ return FriendList.class;
+ else if( objectNode.findValue("target_object") != null )
+ return UserstreamEvent.class;
else
return Tweet.class;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index ba88803..fb160ef 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -50,9 +50,9 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
this.config = config;
}
- protected BlockingQueue hosebirdQueue = new LinkedBlockingQueue<String>(1000);
+ protected BlockingQueue<String> hosebirdQueue;
- protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>(1000);
+ protected volatile Queue<StreamsDatum> providerQueue;
protected Hosts hosebirdHosts;
protected Authentication auth;
@@ -194,6 +194,9 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
LOGGER.debug("host={}\tendpoint={}\taut={}", new Object[] {hosebirdHosts,endpoint,auth});
+ hosebirdQueue = new LinkedBlockingQueue<String>(1000);
+ providerQueue = new LinkedBlockingQueue<StreamsDatum>(1000);
+
client = new ClientBuilder()
.name("apache/streams/streams-contrib/streams-provider-twitter")
.hosts(hosebirdHosts)
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index db1ec76..06ba7be 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -23,9 +23,7 @@ import twitter4j.json.DataObjectFactory;
import java.io.Serializable;
import java.math.BigInteger;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
+import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
@@ -260,7 +258,8 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
.setAsyncNumThreads(3)
.setRestBaseURL(baseUrl)
.setIncludeMyRetweetEnabled(Boolean.TRUE)
- .setIncludeRTsEnabled(Boolean.TRUE)
+ // not sure where this method went...
+ //.setIncludeRTsEnabled(Boolean.TRUE)
.setPrettyDebugEnabled(Boolean.TRUE);
return new TwitterFactory(builder.build()).getInstance();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
index bfceae0..0ab3448 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
@@ -31,13 +31,14 @@ import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import java.io.IOException;
+import java.io.Serializable;
import java.util.List;
import java.util.Map;
/**
* Created by sblackmon on 3/26/14.
*/
-public class TwitterJsonActivitySerializer implements ActivitySerializer<String>
+public class TwitterJsonActivitySerializer implements ActivitySerializer<String>, Serializable
{
public TwitterJsonActivitySerializer() {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
index 40be0f6..4a0f348 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
@@ -13,6 +13,7 @@ import org.apache.streams.pojo.json.Actor;
import org.apache.streams.twitter.pojo.Delete;
import org.apache.streams.twitter.pojo.Tweet;
+import java.io.Serializable;
import java.util.List;
import static org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer.*;
@@ -24,7 +25,7 @@ import static org.apache.streams.twitter.serializer.TwitterJsonActivitySerialize
* Time: 9:24 AM
* To change this template use File | Settings | File Templates.
*/
-public class TwitterJsonDeleteActivitySerializer implements ActivitySerializer<String> {
+public class TwitterJsonDeleteActivitySerializer implements ActivitySerializer<String>, Serializable {
@Override
public String serializationFormat() {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
index 51e39b1..f8ade3f 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
@@ -17,6 +17,7 @@ import org.apache.streams.twitter.pojo.Tweet;
import org.apache.streams.twitter.pojo.User;
import java.io.IOException;
+import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -31,7 +32,7 @@ import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
* Time: 9:24 AM
* To change this template use File | Settings | File Templates.
*/
-public class TwitterJsonRetweetActivitySerializer implements ActivitySerializer<String> {
+public class TwitterJsonRetweetActivitySerializer implements ActivitySerializer<String>, Serializable {
public TwitterJsonRetweetActivitySerializer() {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
index b141482..1d00b7a 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
@@ -18,6 +18,7 @@ import org.apache.streams.twitter.pojo.Tweet;
import org.apache.streams.twitter.pojo.User;
import java.io.IOException;
+import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -32,11 +33,7 @@ import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
* Time: 9:24 AM
* To change this template use File | Settings | File Templates.
*/
-public class TwitterJsonTweetActivitySerializer implements ActivitySerializer<String> {
-
- public TwitterJsonTweetActivitySerializer() {
-
- }
+public class TwitterJsonTweetActivitySerializer implements ActivitySerializer<String>, Serializable {
@Override
public String serializationFormat() {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java
new file mode 100644
index 0000000..c0768e3
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java
@@ -0,0 +1,92 @@
+package org.apache.streams.twitter.serializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Strings;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.apache.streams.pojo.json.Actor;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.UserstreamEvent;
+
+import java.util.List;
+
+import static org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer.*;
+
+/**
+* Created with IntelliJ IDEA.
+* User: mdelaet
+* Date: 9/30/13
+* Time: 9:24 AM
+* To change this template use File | Settings | File Templates.
+*/
+public class TwitterJsonUserstreameventActivitySerializer implements ActivitySerializer<String> {
+
+ @Override
+ public String serializationFormat() {
+ return null;
+ }
+
+ @Override
+ public String serialize(Activity deserialized) throws ActivitySerializerException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Activity deserialize(String serialized) throws ActivitySerializerException {
+ return null;
+ }
+
+ @Override
+ public List<Activity> deserializeAll(List<String> serializedList) {
+ return null;
+ }
+
+ public Activity convert(ObjectNode item) throws ActivitySerializerException {
+
+ ObjectMapper mapper = StreamsTwitterMapper.getInstance();
+ UserstreamEvent event = null;
+ try {
+ event = mapper.treeToValue(item, UserstreamEvent.class);
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ }
+
+ Activity activity = new Activity();
+ activity.setActor(buildActor(event));
+ activity.setVerb(detectVerb(event));
+ activity.setObject(buildActivityObject(event));
+ activity.setId(TwitterJsonActivitySerializer.formatId(activity.getVerb()));
+ if(Strings.isNullOrEmpty(activity.getId()))
+ throw new ActivitySerializerException("Unable to determine activity id");
+ activity.setProvider(getProvider());
+ return activity;
+ }
+
+ public Actor buildActor(UserstreamEvent event) {
+ Actor actor = new Actor();
+ //actor.setId(formatId(delete.getDelete().getStatus().getUserIdStr()));
+ return actor;
+ }
+
+ public ActivityObject buildActivityObject(UserstreamEvent event) {
+ ActivityObject actObj = new ActivityObject();
+ //actObj.setId(formatId(delete.getDelete().getStatus().getIdStr()));
+ //actObj.setObjectType("tweet");
+ return actObj;
+ }
+
+ public String detectVerb(UserstreamEvent event) {
+ return null;
+ }
+
+ public ActivityObject buildTarget(UserstreamEvent event) {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/FriendList.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/FriendList.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/FriendList.json
new file mode 100644
index 0000000..5dd7687
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/FriendList.json
@@ -0,0 +1,20 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType" : "org.apache.streams.twitter.pojo.FriendList",
+ "properties": {
+ "friends": {
+ "type": "array",
+ "items": {
+ "type": "integer"
+ }
+ },
+ "friends_str": {
+ "type": "array",
+ "items": {
+ "type": "string"
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/UserstreamEvent.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/UserstreamEvent.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/UserstreamEvent.json
new file mode 100644
index 0000000..07b5883
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/UserstreamEvent.json
@@ -0,0 +1,47 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType" : "org.apache.streams.twitter.pojo.UserstreamEvent",
+ "properties": {
+ "created_at": {
+ "type": "string",
+ "format" : "date-time"
+ },
+ "event_type": {
+ "type": "string",
+ "enum" : [
+ "access_revoked",
+ "block",
+ "unblock",
+ "favorite",
+ "unfavorite",
+ "follow",
+ "unfollow",
+ "list_created",
+ "list_destroyed",
+ "list_updated",
+ "list_member_added",
+ "list_member_removed",
+ "list_user_subscribed",
+ "list_user_unsubscribed",
+ "user_update"
+ ]
+ },
+ "source": {
+ "type": "string",
+ "items": {
+ "type": "integer"
+ }
+ },
+ "target": {
+ "type": "string",
+ "items": {
+ "type": "integer"
+ }
+ },
+ "target_object": {
+ "type": "object"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
index 3286e74..28f297c 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
@@ -7,11 +7,12 @@ import org.apache.streams.data.util.RFC3339Utils;
import org.joda.time.DateTime;
import java.io.IOException;
+import java.io.Serializable;
/**
* Created by sblackmon on 3/27/14.
*/
-public class StreamsDateTimeDeserializer extends StdDeserializer<DateTime> {
+public class StreamsDateTimeDeserializer extends StdDeserializer<DateTime> implements Serializable {
protected StreamsDateTimeDeserializer(Class<DateTime> dateTimeClass) {
super(dateTimeClass);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java
index 26bc157..89f9100 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java
@@ -11,11 +11,14 @@ import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import java.io.IOException;
+import java.io.Serializable;
/**
* Created by sblackmon on 3/27/14.
*/
-public class StreamsDateTimeSerializer extends StdSerializer<DateTime> {
+public class StreamsDateTimeSerializer extends StdSerializer<DateTime> implements Serializable {
+
+
protected StreamsDateTimeSerializer(Class<DateTime> dateTimeClass) {
super(dateTimeClass);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodDeserializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodDeserializer.java
index ff765f6..9402878 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodDeserializer.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodDeserializer.java
@@ -7,8 +7,9 @@ import org.apache.streams.data.util.RFC3339Utils;
import org.joda.time.Period;
import java.io.IOException;
+import java.io.Serializable;
-public class StreamsPeriodDeserializer extends StdDeserializer<Period>
+public class StreamsPeriodDeserializer extends StdDeserializer<Period> implements Serializable
{
protected StreamsPeriodDeserializer(Class<Period> dateTimeClass) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7e4e1095/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodSerializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodSerializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodSerializer.java
index 614cbdd..ef28c8e 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodSerializer.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsPeriodSerializer.java
@@ -8,8 +8,9 @@ import org.joda.time.DateTime;
import org.joda.time.Period;
import java.io.IOException;
+import java.io.Serializable;
-public class StreamsPeriodSerializer extends StdSerializer<Period>
+public class StreamsPeriodSerializer extends StdSerializer<Period> implements Serializable
{
protected StreamsPeriodSerializer(Class<Period> dateTimeClass) {
super(dateTimeClass);