You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/29 01:28:57 UTC

[3/3] git commit: refactored to simplify vastly improved test of activity serialization fixed jackson mapper configuration issues causing better tests to fail

refactored to simplify
vastly improved test of activity serialization
fixed jackson mapper configuration issues causing better tests to fail


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b59bcd28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b59bcd28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b59bcd28

Branch: refs/heads/springcleaning
Commit: b59bcd28b15b2bd912c5216c59e9d09a0268d5fe
Parents: 68ab754
Author: sblackmon <sb...@w2odigital.com>
Authored: Fri Mar 28 20:28:47 2014 -0400
Committer: sblackmon <sb...@w2odigital.com>
Committed: Fri Mar 28 20:28:47 2014 -0400

----------------------------------------------------------------------
 streams-contrib/pom.xml                         |   1 +
 .../data/MoreoverJsonActivitySerializer.java    |   4 +
 .../processor/TwitterEventProcessor.java        |  16 +-
 .../twitter/processor/TwitterTypeConverter.java |  13 +-
 .../TwitterJsonDeleteActivitySerializer.java    |  34 +-
 .../TwitterJsonEventActivitySerializer.java     | 140 ----
 .../TwitterJsonRetweetActivitySerializer.java   |  57 +-
 .../TwitterJsonTweetActivitySerializer.java     |  46 +-
 .../src/main/jsonschema/com/twitter/tweet.json  |  13 +-
 .../streams/twitter/test/TweetSerDeTest.java    |  64 +-
 .../src/test/resources/twitter_jsons.txt        | 810 ++++++++++++++++---
 .../streams/pig/StreamsProcessorExec.java       |   2 +-
 .../streams/pig/StreamsSerializerExec.java      |   9 -
 13 files changed, 868 insertions(+), 341 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b59bcd28/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index c4efa83..4f7a22f 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -43,6 +43,7 @@
         <module>streams-persist-hdfs</module>
         <module>streams-persist-kafka</module>
         <module>streams-persist-mongo</module>
+        <module>streams-processor-urlredirect</module>
         <module>streams-provider-datasift</module>
         <module>streams-provider-facebook</module>
         <module>streams-provider-google</module>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b59bcd28/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java
index da8c496..71456f8 100644
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java
@@ -17,6 +17,10 @@ import java.util.List;
  * Deserializes Moreover JSON format into Activities
  */
 public class MoreoverJsonActivitySerializer implements ActivitySerializer<String> {
+
+    public MoreoverJsonActivitySerializer() {
+    }
+
     @Override
     public String serializationFormat() {
         return "application/json+vnd.moreover.com.v1";

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b59bcd28/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
index e76d47c..00c8032 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
@@ -2,6 +2,7 @@ package org.apache.streams.twitter.processor;
 
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Lists;
@@ -13,6 +14,7 @@ import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer;
 import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
 import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
 import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
@@ -86,20 +88,23 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable {
         }
     }
 
-    public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException {
+    public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException {
 
         Object result = null;
 
         if( outClass.equals( Activity.class )) {
             if( inClass.equals( Delete.class )) {
                 LOGGER.debug("ACTIVITY DELETE");
-                result = twitterJsonDeleteActivitySerializer.convert(event);
+                result = twitterJsonDeleteActivitySerializer.deserialize(
+                        TwitterJsonActivitySerializer.mapper.writeValueAsString(event));
             } else if ( inClass.equals( Retweet.class )) {
                 LOGGER.debug("ACTIVITY RETWEET");
-                result = twitterJsonRetweetActivitySerializer.convert(event);
+                result = twitterJsonRetweetActivitySerializer.deserialize(
+                        TwitterJsonActivitySerializer.mapper.writeValueAsString(event));
             } else if ( inClass.equals( Tweet.class )) {
                 LOGGER.debug("ACTIVITY TWEET");
-                result = twitterJsonTweetActivitySerializer.convert(event);
+                result = twitterJsonTweetActivitySerializer.deserialize(
+                        TwitterJsonActivitySerializer.mapper.writeValueAsString(event));
             } else {
                 return null;
             }
@@ -180,6 +185,9 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable {
             } catch (ActivitySerializerException e) {
                 LOGGER.warn("Failed deserializing", e);
                 return Lists.newArrayList();
+            } catch (JsonProcessingException e) {
+                LOGGER.warn("Failed parsing JSON", e);
+                return Lists.newArrayList();
             }
 
             if( out != null && validate(out, outClass))

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b59bcd28/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
index cd2d1ec..cc438b1 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
@@ -2,6 +2,7 @@ package org.apache.streams.twitter.processor;
 
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Lists;
@@ -13,6 +14,7 @@ import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer;
 import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
 import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
 import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
@@ -59,20 +61,23 @@ public class TwitterTypeConverter implements StreamsProcessor {
         inQueue = inputQueue;
     }
 
-    public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException {
+    public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException {
 
         Object result = null;
 
         if( outClass.equals( Activity.class )) {
             if( inClass.equals( Delete.class )) {
                 LOGGER.debug("ACTIVITY DELETE");
-                result = twitterJsonDeleteActivitySerializer.convert(event);
+                result = twitterJsonDeleteActivitySerializer.deserialize(
+                        TwitterJsonActivitySerializer.mapper.writeValueAsString(event));
             } else if ( inClass.equals( Retweet.class )) {
                 LOGGER.debug("ACTIVITY RETWEET");
-                result = twitterJsonRetweetActivitySerializer.convert(event);
+                result = twitterJsonRetweetActivitySerializer.deserialize(
+                        TwitterJsonActivitySerializer.mapper.writeValueAsString(event));
             } else if ( inClass.equals( Tweet.class )) {
                 LOGGER.debug("ACTIVITY TWEET");
-                result = twitterJsonTweetActivitySerializer.convert(event);
+                result = twitterJsonTweetActivitySerializer.deserialize(
+                        TwitterJsonActivitySerializer.mapper.writeValueAsString(event));
             } else {
                 return null;
             }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b59bcd28/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
index 4e302f7..b24bc39 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
@@ -3,6 +3,8 @@ package org.apache.streams.twitter.serializer;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Strings;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivitySerializer;
 import org.apache.streams.exceptions.ActivitySerializerException;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.ActivityObject;
@@ -10,6 +12,10 @@ import org.apache.streams.pojo.json.Actor;
 import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Tweet;
 
+import java.util.List;
+
+import static org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer.*;
+
 /**
 * Created with IntelliJ IDEA.
 * User: mdelaet
@@ -17,13 +23,33 @@ import org.apache.streams.twitter.pojo.Tweet;
 * Time: 9:24 AM
 * To change this template use File | Settings | File Templates.
 */
-public class TwitterJsonDeleteActivitySerializer extends TwitterJsonEventActivitySerializer {
+public class TwitterJsonDeleteActivitySerializer implements ActivitySerializer<String> {
+
+    @Override
+    public String serializationFormat() {
+        return null;
+    }
+
+    @Override
+    public String serialize(Activity deserialized) throws ActivitySerializerException {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public Activity deserialize(String serialized) throws ActivitySerializerException {
+        return null;
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<String> serializedList) {
+        return null;
+    }
 
     public Activity convert(ObjectNode event) throws ActivitySerializerException {
 
         Delete delete = null;
         try {
-            delete = mapper.treeToValue(event, Delete.class);
+            delete = TwitterJsonActivitySerializer.mapper.treeToValue(event, Delete.class);
         } catch (JsonProcessingException e) {
             e.printStackTrace();
         }
@@ -32,10 +58,10 @@ public class TwitterJsonDeleteActivitySerializer extends TwitterJsonEventActivit
         activity.setActor(buildActor(delete));
         activity.setVerb("delete");
         activity.setObject(buildActivityObject(delete));
-        activity.setId(formatId(activity.getVerb(), delete.getDelete().getStatus().getIdStr()));
+        activity.setId(TwitterJsonActivitySerializer.formatId(activity.getVerb(), delete.getDelete().getStatus().getIdStr()));
         if(Strings.isNullOrEmpty(activity.getId()))
             throw new ActivitySerializerException("Unable to determine activity id");
-        activity.setProvider(buildProvider(event));
+        activity.setProvider(getProvider());
         addTwitterExtension(activity, event);
         return activity;
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b59bcd28/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
deleted file mode 100644
index bc52359..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
+++ /dev/null
@@ -1,140 +0,0 @@
-package org.apache.streams.twitter.serializer;
-
-import com.fasterxml.jackson.core.JsonGenerationException;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.*;
-import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.fasterxml.jackson.databind.ser.std.StdSerializer;
-import com.fasterxml.jackson.datatype.joda.JodaModule;
-import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.jackson.StreamsJacksonModule;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.Generator;
-import org.apache.streams.pojo.json.Icon;
-import org.apache.streams.pojo.json.Provider;
-import org.joda.time.DateTime;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import org.joda.time.format.ISODateTimeFormat;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.text.ParseException;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-
-/**
-* Created with IntelliJ IDEA.
-* User: mdelaet
-* Date: 9/30/13
-* Time: 9:24 AM
-* To change this template use File | Settings | File Templates.
-*/
-public abstract class TwitterJsonEventActivitySerializer implements ActivitySerializer<String>, Serializable {
-
-    public static final DateTimeFormatter TWITTER_FORMAT = DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss Z yyyy");
-    public static final DateTimeFormatter ACTIVITY_FORMAT = ISODateTimeFormat.basicDateTime();
-
-    public static ObjectMapper mapper;
-    static {
-        mapper = new ObjectMapper();
-        //mapper.registerModule(new JodaModule());
-        mapper.disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
-        mapper.registerModule(new StreamsJacksonModule() {
-            {
-                addDeserializer(DateTime.class, new StdDeserializer<DateTime>(DateTime.class) {
-                    @Override
-                    public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException, JsonProcessingException {
-                        return TWITTER_FORMAT.parseDateTime(jpar.getValueAsString());
-                    }
-                });
-            }
-        });
-    }
-
-    @Override
-    public String serializationFormat() {
-        return "application/json+vnd.twitter.com.v1";
-    }
-
-    @Override
-    public String serialize(Activity deserialized) {
-        throw new UnsupportedOperationException("Cannot currently serialize to Twitter JSON");
-    }
-
-    @Override
-    public Activity deserialize(String serialized) throws ActivitySerializerException {
-        serialized = serialized.replaceAll("\\[[ ]*\\]", "null");
-
-        System.out.println(serialized);
-
-        AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(mapper.getTypeFactory());
-        mapper.setAnnotationIntrospector(introspector);
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
-        mapper.configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, Boolean.FALSE);
-        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.TRUE);
-
-        try {
-            ObjectNode event = (ObjectNode) mapper.readTree(serialized);
-
-            System.out.println(event.toString());
-
-            Activity activity = convert(event);
-
-            System.out.println(activity.toString());
-
-            return activity;
-
-        } catch (IOException e) {
-            throw new IllegalArgumentException("Unable to deserialize", e);
-        }
-
-    }
-
-    public abstract Activity convert(ObjectNode event) throws ActivitySerializerException;
-
-    @Override
-    public List<Activity> deserializeAll(List<String> serializedList) {
-        throw new NotImplementedException("Not currently implemented");
-    }
-
-    public static Generator buildGenerator(ObjectNode event) {
-        return null;
-    }
-
-    public static Icon getIcon(ObjectNode event) {
-        return null;
-    }
-
-    public static Provider buildProvider(ObjectNode event) {
-        Provider provider = new Provider();
-        provider.setId("id:providers:twitter");
-        return provider;
-    }
-
-    public static String getUrls(ObjectNode event) {
-        return null;
-    }
-
-    public static void addTwitterExtension(Activity activity, ObjectNode event) {
-        Map<String, Object> extensions = org.apache.streams.data.util.ActivityUtil.ensureExtensions(activity);
-        extensions.put("twitter", event);
-    }
-
-    public static String formatId(String... idparts) {
-        return Joiner.on(":").join(Lists.asList("id:twitter", idparts));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b59bcd28/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
index dc1f0ab..7c08590 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
@@ -15,10 +15,12 @@ import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.twitter.pojo.User;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer.*;
 import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
 
 /**
@@ -28,15 +30,32 @@ import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
 * Time: 9:24 AM
 * To change this template use File | Settings | File Templates.
 */
-public class TwitterJsonRetweetActivitySerializer extends TwitterJsonEventActivitySerializer implements ActivitySerializer<String> {
+public class TwitterJsonRetweetActivitySerializer implements ActivitySerializer<String> {
 
-    public Activity convert(ObjectNode event) throws ActivitySerializerException {
+    public TwitterJsonRetweetActivitySerializer() {
+
+    }
+
+    @Override
+    public String serializationFormat() {
+        return null;
+    }
+
+    @Override
+    public String serialize(Activity deserialized) throws ActivitySerializerException {
+        return null;
+    }
+
+    @Override
+    public Activity deserialize(String event) throws ActivitySerializerException {
 
         Retweet retweet = null;
         try {
-            retweet = mapper.treeToValue(event, Retweet.class);
+            retweet = TwitterJsonActivitySerializer.mapper.readValue(event, Retweet.class);
         } catch (JsonProcessingException e) {
             e.printStackTrace();
+        } catch (IOException e) {
+            e.printStackTrace();
         }
 
         Activity activity = new Activity();
@@ -44,7 +63,12 @@ public class TwitterJsonRetweetActivitySerializer extends TwitterJsonEventActivi
         activity.setVerb("share");
         if( retweet.getRetweetedStatus() != null )
             activity.setObject(buildActivityObject(retweet.getRetweetedStatus()));
-        activity.setId(formatId(activity.getVerb(), retweet.getIdStr()));
+        activity.setId(TwitterJsonActivitySerializer.formatId(activity.getVerb(),
+                Optional.fromNullable(
+                        retweet.getIdStr())
+                        .or(Optional.of(retweet.getId().toString()))
+                        .orNull()
+        ));
         if(Strings.isNullOrEmpty(activity.getId()))
             throw new ActivitySerializerException("Unable to determine activity id");
         try {
@@ -52,18 +76,27 @@ public class TwitterJsonRetweetActivitySerializer extends TwitterJsonEventActivi
         } catch( Exception e ) {
             throw new ActivitySerializerException("Unable to determine publishedDate", e);
         }
-        activity.setGenerator(buildGenerator(event));
-        activity.setIcon(getIcon(event));
-        activity.setProvider(buildProvider(event));
+        //activity.setGenerator(buildGenerator(mapper));
+        //activity.setIcon(getIcon(event));
+        activity.setProvider(TwitterJsonActivitySerializer.getProvider());
         activity.setTitle("");
-        activity.setContent(retweet.getRetweetedStatus().getText());
-        activity.setUrl(getUrls(event));
-        activity.setLinks(TwitterJsonTweetActivitySerializer.getLinks(retweet));
-        addTwitterExtension(activity, event);
+        try {
+            activity.setContent(retweet.getRetweetedStatus().getText());
+        } catch( Exception e ) {
+            throw new ActivitySerializerException("Unable to determine content", e);
+        }
+        activity.setUrl("http://twitter.com/" + retweet.getIdStr());
+        activity.setLinks(TwitterJsonTweetActivitySerializer.getLinks(retweet.getRetweetedStatus()));
+        addTwitterExtension(activity, TwitterJsonActivitySerializer.mapper.convertValue(retweet, ObjectNode.class));
         addLocationExtension(activity, retweet);
         return activity;
     }
 
+    @Override
+    public List<Activity> deserializeAll(List<String> serializedList) {
+        return null;
+    }
+
     public static Actor buildActor(Retweet retweet) {
         Actor actor = new Actor();
         User user = retweet.getUser();
@@ -95,7 +128,7 @@ public class TwitterJsonRetweetActivitySerializer extends TwitterJsonEventActivi
     public static void addLocationExtension(Activity activity, Retweet retweet) {
         Map<String, Object> extensions = ensureExtensions(activity);
         Map<String, Object> location = new HashMap<String, Object>();
-        location.put("id", formatId(retweet.getIdStr()));
+        location.put("id", TwitterJsonActivitySerializer.formatId(retweet.getIdStr()));
         location.put("coordinates", retweet.getCoordinates());
         extensions.put("location", location);
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b59bcd28/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
index dd40cd9..8bcb60b 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
@@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Optional;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
+import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.data.ActivitySerializer;
 import org.apache.streams.exceptions.ActivitySerializerException;
 import org.apache.streams.pojo.json.Activity;
@@ -14,10 +15,12 @@ import org.apache.streams.twitter.Url;
 import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.twitter.pojo.User;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer.*;
 import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
 
 /**
@@ -27,27 +30,36 @@ import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
 * Time: 9:24 AM
 * To change this template use File | Settings | File Templates.
 */
-public class TwitterJsonTweetActivitySerializer extends TwitterJsonEventActivitySerializer implements ActivitySerializer<String> {
+public class TwitterJsonTweetActivitySerializer implements ActivitySerializer<String> {
 
     public TwitterJsonTweetActivitySerializer() {
 
     }
 
-    public Activity convert(ObjectNode event) throws ActivitySerializerException {
+    @Override
+    public String serializationFormat() {
+        return null;
+    }
+
+    @Override
+    public String serialize(Activity deserialized) throws ActivitySerializerException {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public Activity deserialize(String serialized) throws ActivitySerializerException {
 
         Tweet tweet = null;
         try {
-            tweet = mapper.treeToValue(event, Tweet.class);
+            tweet = TwitterJsonActivitySerializer.mapper.readValue(serialized, Tweet.class);
         } catch (JsonProcessingException e) {
             e.printStackTrace();
+        } catch (IOException e) {
+            e.printStackTrace();
         }
 
-        System.out.println("10");
-
         Activity activity = new Activity();
 
-        System.out.println("11");
-
         activity.setActor(buildActor(tweet));
         activity.setVerb("post");
         activity.setId(formatId(activity.getVerb(),
@@ -63,21 +75,24 @@ public class TwitterJsonTweetActivitySerializer extends TwitterJsonEventActivity
             throw new ActivitySerializerException("Unable to determine publishedDate", e);
         }
         activity.setTarget(buildTarget(tweet));
-        activity.setGenerator(buildGenerator(event));
-        activity.setIcon(getIcon(event));
-        activity.setProvider(buildProvider(event));
+        activity.setProvider(getProvider());
         activity.setTitle("");
         activity.setContent(tweet.getText());
-        activity.setUrl(getUrls(event));
+        activity.setUrl("http://twitter.com/" + tweet.getIdStr());
         activity.setLinks(getLinks(tweet));
 
         System.out.println("12");
 
-        addTwitterExtension(activity, event);
+        addTwitterExtension(activity, TwitterJsonActivitySerializer.mapper.convertValue(tweet, ObjectNode.class));
         addLocationExtension(activity, tweet);
         return activity;
     }
 
+    @Override
+    public List<Activity> deserializeAll(List<String> serializedList) {
+        return null;
+    }
+
     public static Actor buildActor(Tweet tweet) {
         Actor actor = new Actor();
         User user = tweet.getUser();
@@ -96,10 +111,13 @@ public class TwitterJsonTweetActivitySerializer extends TwitterJsonEventActivity
 
     public static List<Object> getLinks(Tweet tweet) {
         List<Object> links = Lists.newArrayList();
-        if( tweet.getEntities().getUrls() != null )
-            for( Url url : tweet.getEntities().getUrls() ) {
+        if( tweet.getEntities().getUrls() != null ) {
+            for (Url url : tweet.getEntities().getUrls()) {
                 links.add(url.getExpandedUrl());
             }
+        }
+        else
+            System.out.println("  0 links");
         return links;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b59bcd28/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/tweet.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/tweet.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/tweet.json
index 32c93bb..6082fb8 100644
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/tweet.json
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/tweet.json
@@ -3,6 +3,7 @@
     "$schema": "http://json-schema.org/draft-03/schema",
     "id": "#",
     "javaType" : "org.apache.streams.twitter.pojo.Tweet",
+    "javaInterfaces": ["java.io.Serializable"],
     "properties": {
         "text": {
             "type": "string"
@@ -24,6 +25,7 @@
             "items": {
                 "type": "object",
                 "javaType" : "org.apache.streams.twitter.pojo.Contributor",
+                "javaInterfaces": ["java.io.Serializable"],
                 "properties": {
                     "id": {
                         "ignore_malformed": false,
@@ -41,6 +43,7 @@
         "coordinates": {
             "type": "object",
             "javaType" : "org.apache.streams.twitter.pojo.Coordinates",
+            "javaInterfaces": ["java.io.Serializable"],
             "items": {
                 "properties": {
                     "type": {
@@ -61,12 +64,14 @@
             "type": "object",
             "dynamic": "true",
             "javaType" : "org.apache.streams.twitter.pojo.Entities",
+            "javaInterfaces": ["java.io.Serializable"],
             "properties": {
                 "user_mentions": {
                     "type": "array",
                     "items": {
                         "type": "object",
                         "javaType" : "org.apache.streams.twitter.pojo.UserMentions",
+                        "javaInterfaces": ["java.io.Serializable"],
                         "properties": {
                             "id": {
                                 "ignore_malformed": false,
@@ -103,6 +108,7 @@
                     "items": {
                         "type": "object",
                         "javaType": "org.apache.streams.twitter.Url",
+                        "javaInterfaces": ["java.io.Serializable"],
                         "properties": {
                             "expanded_url": {
                                 "type": "string"
@@ -171,6 +177,7 @@
             "id": "user",
             "type": "object",
             "javaType" : "org.apache.streams.twitter.pojo.User",
+            "javaInterfaces": ["java.io.Serializable"],
             "dynamic": "true",
             "properties": {
                 "location": {
@@ -287,12 +294,6 @@
                     "type": "integer"
                 }
             }
-        },
-        "retweeted_status": {
-            "type": "object",
-            "required" : false,
-            "description" : "Describes the tweet being retweeted.",
-            "$ref" : "#"
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b59bcd28/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
index 0664595..c6dc0ad 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
@@ -9,9 +9,7 @@ import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.twitter.provider.TwitterEventClassifier;
-import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
+import org.apache.streams.twitter.serializer.*;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -23,6 +21,7 @@ import java.io.InputStreamReader;
 
 import static java.util.regex.Pattern.matches;
 import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 /**
@@ -35,21 +34,19 @@ import static org.junit.Assert.assertThat;
 public class TweetSerDeTest {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TweetSerDeTest.class);
-    private ObjectMapper mapper = new ObjectMapper();
+    private ObjectMapper mapper = TwitterJsonActivitySerializer.mapper;
 
-    private TwitterJsonTweetActivitySerializer twitterJsonTweetActivitySerializer = new TwitterJsonTweetActivitySerializer();
-    private TwitterJsonRetweetActivitySerializer twitterJsonRetweetActivitySerializer = new TwitterJsonRetweetActivitySerializer();
-    private TwitterJsonDeleteActivitySerializer twitterJsonDeleteActivitySerializer = new TwitterJsonDeleteActivitySerializer();
+    private TwitterJsonActivitySerializer twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
 
     //    @Ignore
     @Test
     public void Tests()
     {
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
+        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
         mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
         mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
 
-        InputStream is = TweetSerDeTest.class.getResourceAsStream("/twitter_jsons.txt");
+        InputStream is = TweetSerDeTest.class.getResourceAsStream("/testtweets.txt");
         InputStreamReader isr = new InputStreamReader(is);
         BufferedReader br = new BufferedReader(isr);
 
@@ -66,35 +63,38 @@ public class TweetSerDeTest {
 
                     assertThat(event, is(not(nullValue())));
 
-                    String tweetstring = mapper.writeValueAsString(event);
+                    if( detected == Tweet.class ) {
 
-                    LOGGER.info("{}: {}", detected.getName(), tweetstring);
+                        Tweet tweet = mapper.convertValue(event, Tweet.class);
 
-                    Activity activity;
-                    if( detected.equals( Delete.class )) {
-                        activity = twitterJsonDeleteActivitySerializer.convert(event);
-                    } else if ( detected.equals( Retweet.class )) {
-                        activity = twitterJsonRetweetActivitySerializer.convert(event);
-                    } else if ( detected.equals( Tweet.class )) {
-                        activity = twitterJsonTweetActivitySerializer.convert(event);
-                    } else {
-                        Assert.fail();
-                        return;
-                    }
+                        assertThat(tweet, is(not(nullValue())));
+                        assertThat(tweet.getCreatedAt(), is(not(nullValue())));
+                        assertThat(tweet.getText(), is(not(nullValue())));
+                        assertThat(tweet.getUser(), is(not(nullValue())));
+
+                    } else if( detected == Retweet.class ) {
+
+                        Retweet retweet = mapper.convertValue(event, Retweet.class);
 
-                    String activitystring = mapper.writeValueAsString(activity);
+                        assertThat(retweet.getRetweetedStatus(), is(not(nullValue())));
+                        assertThat(retweet.getRetweetedStatus().getCreatedAt(), is(not(nullValue())));
+                        assertThat(retweet.getRetweetedStatus().getText(), is(not(nullValue())));
+                        assertThat(retweet.getRetweetedStatus().getUser(), is(not(nullValue())));
+                        assertThat(retweet.getRetweetedStatus().getUser().getId(), is(not(nullValue())));
+                        assertThat(retweet.getRetweetedStatus().getUser().getCreatedAt(), is(not(nullValue())));
 
-                    LOGGER.info("activity: {}", activitystring);
+                    } else if( detected == Delete.class ) {
 
-                    assertThat(activity, is(not(nullValue())));
-                    if(activity.getId() != null) {
-                        assertThat(matches("id:.*:[a-z]*:[a-zA-Z0-9]*", activity.getId()), is(true));
+                        Delete delete = mapper.convertValue(event, Delete.class);
+
+                        assertThat(delete.getDelete(), is(not(nullValue())));
+                        assertThat(delete.getDelete().getStatus(), is(not(nullValue())));
+                        assertThat(delete.getDelete().getStatus().getId(), is(not(nullValue())));
+                        assertThat(delete.getDelete().getStatus().getUserId(), is(not(nullValue())));
+
+                    } else {
+                        Assert.fail();
                     }
-                    assertThat(activity.getActor(), is(not(nullValue())));
-                    assertThat(activity.getActor().getId(), is(not(nullValue())));
-                    assertThat(activity.getVerb(), is(not(nullValue())));
-                    assertThat(activity.getObject(), is(not(nullValue())));
-                    assertThat(activity.getObject().getObjectType(), is(not(nullValue())));
                 }
             }
         } catch( Exception e ) {