You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/12 00:51:32 UTC

[1/9] incubator-streams git commit: use StreamsJacksonMapper with channel-specific date support deprecate StreamsTwitterMapper deprecate StreamsDatasiftMapper use generic TypeConverterProcessor deprecate/delete now unnecessary classes refactor event clas

Repository: incubator-streams
Updated Branches:
  refs/heads/master bfa9466b0 -> 99e70b48c


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterEventClassifierTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterEventClassifierTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterEventClassifierTest.java
index e416ec7..4c7f2f3 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterEventClassifierTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TwitterEventClassifierTest.java
@@ -18,11 +18,16 @@
 
 package org.apache.streams.twitter.test;
 
+import org.apache.streams.data.ActivitySerializer;
 import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.twitter.pojo.User;
 import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonUserActivitySerializer;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -63,4 +68,33 @@ public class TwitterEventClassifierTest {
         if( !result.equals(User.class) )
             Assert.fail();
     }
+
+    @Test
+    public void testDetectTweetSerializer() {
+        ActivitySerializer serializer = TwitterEventClassifier.bestSerializer(tweet);
+        if( !(serializer instanceof TwitterJsonTweetActivitySerializer) )
+            Assert.fail();
+    }
+
+    @Test
+    public void testDetectRetweetSerializer() {
+        ActivitySerializer serializer = TwitterEventClassifier.bestSerializer(retweet);
+        if( !(serializer instanceof TwitterJsonRetweetActivitySerializer) )
+            Assert.fail();
+    }
+
+    @Test
+    public void testDetectDeleteSerializer() {
+        ActivitySerializer serializer = TwitterEventClassifier.bestSerializer(delete);
+        if( !(serializer instanceof TwitterJsonDeleteActivitySerializer) )
+            Assert.fail();
+    }
+
+    @Test
+    public void testDetectUserSerializer() {
+        ActivitySerializer serializer = TwitterEventClassifier.bestSerializer(user);
+        if( !(serializer instanceof TwitterJsonUserActivitySerializer) )
+            Assert.fail();
+    }
+
 }


[6/9] incubator-streams git commit: need to add these classes to SCM

Posted by sb...@apache.org.
need to add these classes to SCM


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/800bce96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/800bce96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/800bce96

Branch: refs/heads/master
Commit: 800bce964753ab4d4c8fe1a0fbf13b1557c7c48b
Parents: 612676d
Author: sblackmon <sb...@apache.org>
Authored: Fri Nov 7 10:58:39 2014 -0800
Committer: sblackmon <sb...@apache.org>
Committed: Fri Nov 7 10:58:39 2014 -0800

----------------------------------------------------------------------
 .../serializer/DatasiftEventClassifier.java     |  35 +++
 .../DatasiftInteractionActivitySerializer.java  | 222 +++++++++++++++++++
 2 files changed, 257 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/800bce96/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java
new file mode 100644
index 0000000..e90de6a
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java
@@ -0,0 +1,35 @@
+package org.apache.streams.datasift.serializer;
+
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.instagram.Instagram;
+import org.apache.streams.datasift.interaction.Interaction;
+import org.apache.streams.datasift.twitter.Twitter;
+
+/**
+ * Created by sblackmon on 11/6/14.
+ */
+public class DatasiftEventClassifier {
+
+    public static Class detectClass(Datasift event) {
+
+        if(event.getTwitter() != null) {
+            return Twitter.class;
+        } else if(event.getInstagram() != null) {
+            return Instagram.class;
+        } else {
+            return Interaction.class;
+        }
+    }
+
+    public static ActivitySerializer bestSerializer(Datasift event) {
+
+        if(event.getTwitter() != null) {
+            return DatasiftTweetActivitySerializer.getInstance();
+        } else if(event.getInstagram() != null) {
+            return DatasiftInstagramActivitySerializer.getInstance();
+        } else {
+            return DatasiftInteractionActivitySerializer.getInstance();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/800bce96/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializer.java
new file mode 100644
index 0000000..c856dc2
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializer.java
@@ -0,0 +1,222 @@
+package org.apache.streams.datasift.serializer;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.interaction.Interaction;
+import org.apache.streams.datasift.links.Links;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
+import org.apache.streams.pojo.json.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
+
+/**
+ *
+ */
+public class DatasiftInteractionActivitySerializer implements ActivitySerializer<Datasift>, Serializable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(DatasiftInteractionActivitySerializer.class);
+
+    private static DatasiftInteractionActivitySerializer instance = new DatasiftInteractionActivitySerializer();
+
+    public static DatasiftInteractionActivitySerializer getInstance() {
+        return instance;
+    }
+
+    ObjectMapper mapper = StreamsDatasiftMapper.getInstance();
+
+    @Override
+    public String serializationFormat() {
+        return "application/json+datasift.com.v1.1";
+    }
+
+    @Override
+    public Datasift serialize(Activity deserialized) {
+        throw new UnsupportedOperationException("Cannot currently serialize to Datasift JSON");
+    }
+
+    public Activity deserialize(String datasiftJson) {
+        try {
+            return deserialize(this.mapper.readValue(datasiftJson, Datasift.class));
+        } catch (Exception e) {
+            LOGGER.error("Exception while trying convert,\n {},\n to a Datasift object.", datasiftJson);
+            LOGGER.error("Exception : {}", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Activity deserialize(Datasift serialized) {
+
+        try {
+
+            Activity activity = convert(serialized);
+
+            return activity;
+
+        } catch (Exception e) {
+            throw new IllegalArgumentException("Unable to deserialize", e);
+        }
+
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<Datasift> datasifts) {
+        List<Activity> activities = Lists.newArrayList();
+        for( Datasift datasift : datasifts ) {
+            activities.add(deserialize(datasift));
+        }
+        return activities;
+    }
+
+    public static Generator buildGenerator(Interaction interaction) {
+        Generator generator = new Generator();
+        generator.setDisplayName(interaction.getSource());
+        generator.setId(interaction.getSource());
+        return generator;
+    }
+
+    public static Icon getIcon(Interaction interaction) {
+        return null;
+    }
+
+    public static Provider buildProvider(Interaction interaction) {
+        Provider provider = new Provider();
+        provider.setId("id:providers:"+interaction.getType());
+        provider.setDisplayName(interaction.getType());
+        return provider;
+    }
+
+    public static String getUrls(Interaction interaction) {
+        return null;
+    }
+
+    public static void addDatasiftExtension(Activity activity, Datasift datasift) {
+        Map<String, Object> extensions = org.apache.streams.data.util.ActivityUtil.ensureExtensions(activity);
+        extensions.put("datasift", datasift);
+    }
+
+    public static String formatId(String... idparts) {
+        return Joiner.on(":").join(Lists.asList("id:datasift", idparts));
+    }
+
+    public Activity convert(Datasift event) {
+
+        Preconditions.checkNotNull(event);
+        Preconditions.checkNotNull(event.getInteraction());
+
+        Activity activity = new Activity();
+        activity.setActor(buildActor(event.getInteraction()));
+        activity.setVerb(selectVerb(event));
+        activity.setObject(buildActivityObject(event.getInteraction()));
+        activity.setId(formatId(activity.getVerb(), event.getInteraction().getId()));
+        activity.setTarget(buildTarget(event.getInteraction()));
+        activity.setPublished(event.getInteraction().getCreatedAt());
+        activity.setGenerator(buildGenerator(event.getInteraction()));
+        activity.setIcon(getIcon(event.getInteraction()));
+        activity.setProvider(buildProvider(event.getInteraction()));
+        activity.setTitle(event.getInteraction().getTitle());
+        activity.setContent(event.getInteraction().getContent());
+        activity.setUrl(event.getInteraction().getLink());
+        activity.setLinks(getLinks(event));
+        addDatasiftExtension(activity, event);
+        if( event.getInteraction().getGeo() != null) {
+            addLocationExtension(activity, event.getInteraction());
+        }
+        return activity;
+    }
+
+    private String selectVerb(Datasift event) {
+        return "post";
+    }
+
+    public Actor buildActor(Interaction interaction) {
+        Actor actor = new Actor();
+        org.apache.streams.datasift.interaction.Author author = interaction.getAuthor();
+        if(author == null) {
+            LOGGER.warn("Interaction does not contain author information.");
+            return actor;
+        }
+        String userName = author.getUsername();
+        String name = author.getName();
+        Long id  = author.getId();
+        if(userName != null) {
+            actor.setDisplayName(userName);
+        } else {
+            actor.setDisplayName(name);
+        }
+
+        if(id != null) {
+            actor.setId(id.toString());
+        } else {
+            if(userName != null)
+                actor.setId(userName);
+            else
+                actor.setId(name);
+        }
+        Image image = new Image();
+        image.setUrl(interaction.getAuthor().getAvatar());
+        actor.setImage(image);
+        if (interaction.getAuthor().getLink()!=null){
+            actor.setUrl(interaction.getAuthor().getLink());
+        }
+        return actor;
+    }
+
+    public static ActivityObject buildActivityObject(Interaction interaction) {
+        ActivityObject actObj = new ActivityObject();
+        actObj.setObjectType(interaction.getContenttype());
+        actObj.setUrl(interaction.getLink());
+        actObj.setId(formatId("post",interaction.getId()));
+        actObj.setContent(interaction.getContent());
+
+        return actObj;
+    }
+
+    public static List<String> getLinks(Datasift event) {
+        List<String> result = Lists.newArrayList();
+        Links links = event.getLinks();
+        if(links == null)
+            return null;
+        for(Object link : links.getNormalizedUrl()) {
+            if(link != null) {
+                if(link instanceof String) {
+                    result.add((String) link);
+                } else {
+                    LOGGER.warn("link is not of type String : {}", link.getClass().getName());
+                }
+            }
+        }
+        return result;
+    }
+
+    public static ActivityObject buildTarget(Interaction interaction) {
+        return null;
+    }
+
+    public static void addLocationExtension(Activity activity, Interaction interaction) {
+        Map<String, Object> extensions = ensureExtensions(activity);
+        Map<String, Object> location = new HashMap<String, Object>();
+        Map<String, Double> coordinates = new HashMap<String, Double>();
+        coordinates.put("latitude", interaction.getGeo().getLatitude());
+        coordinates.put("longitude", interaction.getGeo().getLongitude());
+        location.put("coordinates", coordinates);
+        extensions.put("location", location);
+    }
+
+    public static String firstStringIfNotNull(List<Object> list) {
+        if( list != null && list.size() > 0) {
+            return (String) list.get(0);
+        } else return null;
+    }
+}


[9/9] incubator-streams git commit: revamped testing, more cases more conditions per test

Posted by sb...@apache.org.
revamped testing, more cases more conditions per test


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/99e70b48
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/99e70b48
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/99e70b48

Branch: refs/heads/master
Commit: 99e70b48c426a2753f6e385c561df7286f15e3f6
Parents: dabc511
Author: sblackmon <sb...@apache.org>
Authored: Tue Nov 11 13:50:18 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Tue Nov 11 13:50:18 2014 -0600

----------------------------------------------------------------------
 .../serializer/DatasiftEventClassifier.java     |   2 +-
 .../DatasiftTweetActivitySerializer.java        | 272 -------------------
 .../DatasiftTwitterActivitySerializer.java      | 272 +++++++++++++++++++
 .../DatasiftActivitySerializerTest.java         |  80 ++----
 .../serializer/DatasiftEventClassifierTest.java |  14 +-
 ...DatasiftInstagramActivitySerializerTest.java |  43 +++
 ...tasiftInteractionActivitySerializerTest.java |  48 ++++
 .../DatasiftTwitterActivitySerializerTest.java  |  43 +++
 8 files changed, 437 insertions(+), 337 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/99e70b48/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java
index 0169f17..7d7d547 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java
@@ -43,7 +43,7 @@ public class DatasiftEventClassifier {
     public static ActivitySerializer bestSerializer(Datasift event) {
 
         if(event.getTwitter() != null) {
-            return DatasiftTweetActivitySerializer.getInstance();
+            return DatasiftTwitterActivitySerializer.getInstance();
         } else if(event.getInstagram() != null) {
             return DatasiftInstagramActivitySerializer.getInstance();
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/99e70b48/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
deleted file mode 100644
index 6fd19e7..0000000
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.streams.datasift.serializer;
-
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.interaction.Author;
-import org.apache.streams.datasift.interaction.Interaction;
-import org.apache.streams.datasift.twitter.DatasiftTwitterUser;
-import org.apache.streams.datasift.twitter.Retweet;
-import org.apache.streams.datasift.twitter.Twitter;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.Actor;
-import org.apache.streams.pojo.json.Image;
-import org.apache.streams.twitter.serializer.util.TwitterActivityUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
-
-/**
- *
- */
-public class DatasiftTweetActivitySerializer extends DatasiftInteractionActivitySerializer {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(DatasiftTweetActivitySerializer.class);
-
-    private static DatasiftTweetActivitySerializer instance = new DatasiftTweetActivitySerializer();
-
-    public static DatasiftTweetActivitySerializer getInstance() {
-        return instance;
-    }
-
-    @Override
-    public Activity convert(Datasift event) {
-        Activity activity = new Activity();
-        Twitter twitter = event.getTwitter();
-        boolean retweet = twitter.getRetweet() != null;
-
-        activity.setActor(buildActor(event, twitter)); //TODO
-        if(retweet) {
-            activity.setVerb("share");
-        } else {
-            activity.setVerb("post");
-        }
-        activity.setObject(buildActivityObject(event.getInteraction()));
-        activity.setId(formatId(activity.getVerb(), event.getInteraction().getId()));
-        activity.setTarget(buildTarget(event.getInteraction()));
-        activity.setPublished(event.getInteraction().getCreatedAt());
-        activity.setGenerator(buildGenerator(event.getInteraction()));
-        activity.setIcon(getIcon(event.getInteraction()));
-        activity.setProvider(TwitterActivityUtil.getProvider());
-        activity.setTitle(event.getInteraction().getTitle());
-        activity.setContent(event.getInteraction().getContent());
-        activity.setUrl(event.getInteraction().getLink());
-        if(retweet)
-            activity.setLinks(getLinks(twitter.getRetweet()));
-        else
-            activity.setLinks(getLinks(twitter));
-        addDatasiftExtension(activity, event);
-        if( twitter.getGeo() != null) {
-            addLocationExtension(activity, twitter);
-        }
-        addTwitterExtensions(activity, twitter, event.getInteraction());
-        return activity;
-    }
-
-    /**
-     * Get the links from this tweet as a list
-     * @param twitter
-     * @return the links from the tweet
-     */
-    public List<String> getLinks(Twitter twitter) {
-        return getLinks(twitter.getLinks());
-    }
-
-    /**
-     * Get the links from this tweet as a list
-     * @param retweet
-     * @return the links from the tweet
-     */
-    public List<String> getLinks(Retweet retweet) {
-        return getLinks(retweet.getLinks());
-    }
-
-    /**
-     * Converts the list of objects to a list of strings
-     * @param links
-     * @return
-     */
-    private List<String> getLinks(List<Object> links) {
-        if(links == null)
-            return Lists.newArrayList();
-        List<String> result = Lists.newLinkedList();
-        for(Object obj : links) {
-            if(obj instanceof String) {
-                result.add((String) obj);
-            } else {
-                LOGGER.warn("Links is not instance of String : {}", obj.getClass().getName());
-            }
-        }
-        return result;
-    }
-
-    public Actor buildActor(Datasift event, Twitter twitter) {
-        DatasiftTwitterUser user = twitter.getUser();
-        Actor actor = super.buildActor(event.getInteraction());
-        if(user == null) {
-            user = twitter.getRetweet().getUser();
-        }
-
-        actor.setDisplayName(user.getName());
-        actor.setId(formatId(Optional.fromNullable(
-                user.getIdStr())
-                .or(Optional.of(user.getId().toString()))
-                .orNull()));
-        actor.setSummary(user.getDescription());
-        try {
-            actor.setPublished(user.getCreatedAt());
-        } catch (Exception e) {
-            LOGGER.warn("Exception trying to parse date : {}", e);
-        }
-
-        if(user.getUrl() != null) {
-            actor.setUrl(user.getUrl());
-        }
-
-        Map<String, Object> extensions = new HashMap<String,Object>();
-        extensions.put("location", user.getLocation());
-        extensions.put("posts", user.getStatusesCount());
-        extensions.put("followers", user.getFollowersCount());
-        extensions.put("screenName", user.getScreenName());
-        if(user.getAdditionalProperties() != null) {
-            extensions.put("favorites", user.getFavouritesCount());
-        }
-
-        Image profileImage = new Image();
-        String profileUrl = null;
-        Author author = event.getInteraction().getAuthor();
-        if( author != null )
-            profileUrl = author.getAvatar();
-        if(profileUrl == null && user.getProfileImageUrlHttps() != null) {
-            Object url = user.getProfileImageUrlHttps();
-            if(url instanceof String)
-                profileUrl = (String) url;
-        }
-        if(profileUrl == null) {
-            profileUrl = user.getProfileImageUrl();
-        }
-        profileImage.setUrl(profileUrl);
-        actor.setImage(profileImage);
-
-        actor.setAdditionalProperty("extensions", extensions);
-        return actor;
-    }
-
-    public void addLocationExtension(Activity activity, Twitter twitter) {
-        Map<String, Object> extensions = ensureExtensions(activity);
-        Map<String, Object> location = Maps.newHashMap();
-        double[] coordiantes = new double[] { twitter.getGeo().getLongitude(), twitter.getGeo().getLatitude() };
-        Map<String, Object> coords = Maps.newHashMap();
-        coords.put("coordinates", coordiantes);
-        coords.put("type", "geo_point");
-        location.put("coordinates", coords);
-        extensions.put("location", location);
-    }
-
-    public void addTwitterExtensions(Activity activity, Twitter twitter, Interaction interaction) {
-        Retweet retweet = twitter.getRetweet();
-        Map<String, Object> extensions = ensureExtensions(activity);
-        List<String> hashTags = Lists.newLinkedList();
-        List<Object> hts = Lists.newLinkedList();
-        if(twitter.getHashtags() != null) {
-            hts = twitter.getHashtags();
-        } else if (retweet != null) {
-            hts = retweet.getHashtags();
-        }
-        if(hts != null) {
-            for(Object ht : twitter.getHashtags()) {
-                if(ht instanceof String) {
-                    hashTags.add((String) ht);
-                } else {
-                    LOGGER.warn("Hashtag was not instance of String : {}", ht.getClass().getName());
-                }
-            }
-        }
-        extensions.put("hashtags", hashTags);
-
-
-        if(retweet != null) {
-            Map<String, Object> rebroadcasts = Maps.newHashMap();
-            rebroadcasts.put("perspectival", true);
-            rebroadcasts.put("count", retweet.getCount());
-            extensions.put("rebroadcasts", rebroadcasts);
-        }
-
-        if(interaction.getAdditionalProperties() != null) {
-            ArrayList<Map<String,Object>> userMentions = createUserMentions(interaction);
-
-            if(userMentions.size() > 0)
-                extensions.put("user_mentions", userMentions);
-        }
-
-        extensions.put("keywords", interaction.getContent());
-    }
-
-    /**
-     * Returns an ArrayList of all UserMentions in this interaction
-     * Note: The ID list and the handle lists do not necessarily correspond 1:1 for this provider
-     * If those lists are the same size, then they will be merged into individual UserMention
-     * objects. However, if they are not the same size, a new UserMention object will be created
-     * for each entry in both lists.
-     *
-     * @param interaction
-     * @return
-     */
-    private ArrayList<Map<String,Object>> createUserMentions(Interaction interaction) {
-        ArrayList<String> mentions = (ArrayList<String>) interaction.getAdditionalProperties().get("mentions");
-        ArrayList<Long> mentionIds = (ArrayList<Long>) interaction.getAdditionalProperties().get("mention_ids");
-        ArrayList<Map<String,Object>> userMentions = new ArrayList<Map<String,Object>>();
-
-        if(mentions != null && !mentions.isEmpty()) {
-            for(int x = 0; x < mentions.size(); x ++) {
-                Map<String, Object> actor = new HashMap<String, Object>();
-                actor.put("displayName", mentions.get(x));
-                actor.put("handle", mentions.get(x));
-
-                userMentions.add(actor);
-            }
-        }
-        if(mentionIds != null && !mentionIds.isEmpty()) {
-            for(int x = 0; x < mentionIds.size(); x ++) {
-                Map<String, Object> actor = new HashMap<String, Object>();
-                actor.put("id", "id:twitter:" + mentionIds.get(x));
-
-                userMentions.add(actor);
-            }
-        }
-
-        return userMentions;
-    }
-
-    public static String formatId(String... idparts) {
-        return Joiner.on(":").join(Lists.asList("id:twitter", idparts));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/99e70b48/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTwitterActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTwitterActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTwitterActivitySerializer.java
new file mode 100644
index 0000000..8ac84f6
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTwitterActivitySerializer.java
@@ -0,0 +1,272 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.streams.datasift.serializer;
+
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.interaction.Author;
+import org.apache.streams.datasift.interaction.Interaction;
+import org.apache.streams.datasift.twitter.DatasiftTwitterUser;
+import org.apache.streams.datasift.twitter.Retweet;
+import org.apache.streams.datasift.twitter.Twitter;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Actor;
+import org.apache.streams.pojo.json.Image;
+import org.apache.streams.twitter.serializer.util.TwitterActivityUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
+
+/**
+ *
+ */
+public class DatasiftTwitterActivitySerializer extends DatasiftInteractionActivitySerializer {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(DatasiftTwitterActivitySerializer.class);
+
+    private static DatasiftTwitterActivitySerializer instance = new DatasiftTwitterActivitySerializer();
+
+    public static DatasiftTwitterActivitySerializer getInstance() {
+        return instance;
+    }
+
+    @Override
+    public Activity convert(Datasift event) {
+        Activity activity = new Activity();
+        Twitter twitter = event.getTwitter();
+        boolean retweet = twitter.getRetweet() != null;
+
+        activity.setActor(buildActor(event, twitter)); //TODO
+        if(retweet) {
+            activity.setVerb("share");
+        } else {
+            activity.setVerb("post");
+        }
+        activity.setObject(buildActivityObject(event.getInteraction()));
+        activity.setId(formatId(activity.getVerb(), event.getInteraction().getId()));
+        activity.setTarget(buildTarget(event.getInteraction()));
+        activity.setPublished(event.getInteraction().getCreatedAt());
+        activity.setGenerator(buildGenerator(event.getInteraction()));
+        activity.setIcon(getIcon(event.getInteraction()));
+        activity.setProvider(TwitterActivityUtil.getProvider());
+        activity.setTitle(event.getInteraction().getTitle());
+        activity.setContent(event.getInteraction().getContent());
+        activity.setUrl(event.getInteraction().getLink());
+        if(retweet)
+            activity.setLinks(getLinks(twitter.getRetweet()));
+        else
+            activity.setLinks(getLinks(twitter));
+        addDatasiftExtension(activity, event);
+        if( twitter.getGeo() != null) {
+            addLocationExtension(activity, twitter);
+        }
+        addTwitterExtensions(activity, twitter, event.getInteraction());
+        return activity;
+    }
+
+    /**
+     * Get the links from this tweet as a list
+     * @param twitter
+     * @return the links from the tweet
+     */
+    public List<String> getLinks(Twitter twitter) {
+        return getLinks(twitter.getLinks());
+    }
+
+    /**
+     * Get the links from this tweet as a list
+     * @param retweet
+     * @return the links from the tweet
+     */
+    public List<String> getLinks(Retweet retweet) {
+        return getLinks(retweet.getLinks());
+    }
+
+    /**
+     * Converts the list of objects to a list of strings
+     * @param links
+     * @return
+     */
+    private List<String> getLinks(List<Object> links) {
+        if(links == null)
+            return Lists.newArrayList();
+        List<String> result = Lists.newLinkedList();
+        for(Object obj : links) {
+            if(obj instanceof String) {
+                result.add((String) obj);
+            } else {
+                LOGGER.warn("Links is not instance of String : {}", obj.getClass().getName());
+            }
+        }
+        return result;
+    }
+
+    public Actor buildActor(Datasift event, Twitter twitter) {
+        DatasiftTwitterUser user = twitter.getUser();
+        Actor actor = super.buildActor(event.getInteraction());
+        if(user == null) {
+            user = twitter.getRetweet().getUser();
+        }
+
+        actor.setDisplayName(user.getName());
+        actor.setId(formatId(Optional.fromNullable(
+                user.getIdStr())
+                .or(Optional.of(user.getId().toString()))
+                .orNull()));
+        actor.setSummary(user.getDescription());
+        try {
+            actor.setPublished(user.getCreatedAt());
+        } catch (Exception e) {
+            LOGGER.warn("Exception trying to parse date : {}", e);
+        }
+
+        if(user.getUrl() != null) {
+            actor.setUrl(user.getUrl());
+        }
+
+        Map<String, Object> extensions = new HashMap<String,Object>();
+        extensions.put("location", user.getLocation());
+        extensions.put("posts", user.getStatusesCount());
+        extensions.put("followers", user.getFollowersCount());
+        extensions.put("screenName", user.getScreenName());
+        if(user.getAdditionalProperties() != null) {
+            extensions.put("favorites", user.getFavouritesCount());
+        }
+
+        Image profileImage = new Image();
+        String profileUrl = null;
+        Author author = event.getInteraction().getAuthor();
+        if( author != null )
+            profileUrl = author.getAvatar();
+        if(profileUrl == null && user.getProfileImageUrlHttps() != null) {
+            Object url = user.getProfileImageUrlHttps();
+            if(url instanceof String)
+                profileUrl = (String) url;
+        }
+        if(profileUrl == null) {
+            profileUrl = user.getProfileImageUrl();
+        }
+        profileImage.setUrl(profileUrl);
+        actor.setImage(profileImage);
+
+        actor.setAdditionalProperty("extensions", extensions);
+        return actor;
+    }
+
+    public void addLocationExtension(Activity activity, Twitter twitter) {
+        Map<String, Object> extensions = ensureExtensions(activity);
+        Map<String, Object> location = Maps.newHashMap();
+        double[] coordiantes = new double[] { twitter.getGeo().getLongitude(), twitter.getGeo().getLatitude() };
+        Map<String, Object> coords = Maps.newHashMap();
+        coords.put("coordinates", coordiantes);
+        coords.put("type", "geo_point");
+        location.put("coordinates", coords);
+        extensions.put("location", location);
+    }
+
+    public void addTwitterExtensions(Activity activity, Twitter twitter, Interaction interaction) {
+        Retweet retweet = twitter.getRetweet();
+        Map<String, Object> extensions = ensureExtensions(activity);
+        List<String> hashTags = Lists.newLinkedList();
+        List<Object> hts = Lists.newLinkedList();
+        if(twitter.getHashtags() != null) {
+            hts = twitter.getHashtags();
+        } else if (retweet != null) {
+            hts = retweet.getHashtags();
+        }
+        if(hts != null) {
+            for(Object ht : twitter.getHashtags()) {
+                if(ht instanceof String) {
+                    hashTags.add((String) ht);
+                } else {
+                    LOGGER.warn("Hashtag was not instance of String : {}", ht.getClass().getName());
+                }
+            }
+        }
+        extensions.put("hashtags", hashTags);
+
+
+        if(retweet != null) {
+            Map<String, Object> rebroadcasts = Maps.newHashMap();
+            rebroadcasts.put("perspectival", true);
+            rebroadcasts.put("count", retweet.getCount());
+            extensions.put("rebroadcasts", rebroadcasts);
+        }
+
+        if(interaction.getAdditionalProperties() != null) {
+            ArrayList<Map<String,Object>> userMentions = createUserMentions(interaction);
+
+            if(userMentions.size() > 0)
+                extensions.put("user_mentions", userMentions);
+        }
+
+        extensions.put("keywords", interaction.getContent());
+    }
+
+    /**
+     * Returns an ArrayList of all UserMentions in this interaction
+     * Note: The ID list and the handle lists do not necessarily correspond 1:1 for this provider
+     * If those lists are the same size, then they will be merged into individual UserMention
+     * objects. However, if they are not the same size, a new UserMention object will be created
+     * for each entry in both lists.
+     *
+     * @param interaction
+     * @return
+     */
+    private ArrayList<Map<String,Object>> createUserMentions(Interaction interaction) {
+        ArrayList<String> mentions = (ArrayList<String>) interaction.getAdditionalProperties().get("mentions");
+        ArrayList<Long> mentionIds = (ArrayList<Long>) interaction.getAdditionalProperties().get("mention_ids");
+        ArrayList<Map<String,Object>> userMentions = new ArrayList<Map<String,Object>>();
+
+        if(mentions != null && !mentions.isEmpty()) {
+            for(int x = 0; x < mentions.size(); x ++) {
+                Map<String, Object> actor = new HashMap<String, Object>();
+                actor.put("displayName", mentions.get(x));
+                actor.put("handle", mentions.get(x));
+
+                userMentions.add(actor);
+            }
+        }
+        if(mentionIds != null && !mentionIds.isEmpty()) {
+            for(int x = 0; x < mentionIds.size(); x ++) {
+                Map<String, Object> actor = new HashMap<String, Object>();
+                actor.put("id", "id:twitter:" + mentionIds.get(x));
+
+                userMentions.add(actor);
+            }
+        }
+
+        return userMentions;
+    }
+
+    public static String formatId(String... idparts) {
+        return Joiner.on(":").join(Lists.asList("id:twitter", idparts));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/99e70b48/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
index 162526b..8f7ad43 100644
--- a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
@@ -4,10 +4,13 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.datasift.Datasift;
 import org.apache.streams.datasift.util.StreamsDatasiftMapper;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.Actor;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Scanner;
@@ -17,18 +20,27 @@ import static org.junit.Assert.assertNotNull;
 
 public class DatasiftActivitySerializerTest {
 
-    private static final DatasiftActivitySerializer SERIALIZER = new DatasiftActivitySerializer();
+    protected ActivitySerializer SERIALIZER;
 
-    private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsDatasiftMapper.DATASIFT_FORMAT));
+    protected static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsDatasiftMapper.DATASIFT_FORMAT));
+
+    @Before
+    public void initSerializer() {
+        SERIALIZER = new DatasiftActivitySerializer();
+    }
 
     @Test
-    public void testGeneralConversion() throws Exception {
+    public void testConversion() throws Exception {
         Scanner scanner = new Scanner(DatasiftActivitySerializerTest.class.getResourceAsStream("/rand_sample_datasift_json.txt"));
         String line = null;
         while(scanner.hasNextLine()) {
             try {
                 line = scanner.nextLine();
-                testGeneralConversion(line);
+                Datasift item = MAPPER.readValue(line, Datasift.class);
+                testConversion(item);
+                String json = MAPPER.writeValueAsString(item);
+                testDeserNoNull(json);
+                testDeserNoAddProps(json);
             } catch (Exception e) {
                 System.err.println(line);
                 throw e;
@@ -36,59 +48,27 @@ public class DatasiftActivitySerializerTest {
         }
     }
 
-    @Test
-    public void testTwitterConversion() throws Exception {
-        Scanner scanner = new Scanner(DatasiftActivitySerializerTest.class.getResourceAsStream("/twitter_datasift_json.txt"));
-        String line = null;
-        while(scanner.hasNextLine()) {
-            line = scanner.nextLine();
-            testGeneralConversion(line);
-            testDeserNoNull(line);
-            testDeserNoAddProps(line);
-
-            System.out.println("ORIGINAL -> "+line);
-            System.out.println("ACTIVITY -> "+MAPPER.writeValueAsString(SERIALIZER.deserialize(line)));
-            System.out.println("NODE     -> "+MAPPER.convertValue(SERIALIZER.deserialize(line), JsonNode.class));
-        }
-    }
-
-    @Test
-    public void testInstagramConversion() throws Exception {
-        Scanner scanner = new Scanner(DatasiftActivitySerializerTest.class.getResourceAsStream("/instagram_datasift_json.txt"));
-        String line = null;
-        while(scanner.hasNextLine()) {
-            line = scanner.nextLine();
-            testGeneralConversion(line);
-            System.out.println("ORIGINAL -> "+line);
-            System.out.println("ACTIVITY -> "+MAPPER.writeValueAsString(SERIALIZER.deserialize(line)));
-            System.out.println("NODE     -> "+MAPPER.convertValue(SERIALIZER.deserialize(line), JsonNode.class));
-        }
-    }
-
     /**
      * Test that the minimum number of things that an activity has
-     * @param json
+     * @param item
      */
-    private void testGeneralConversion(String json) throws Exception {
-        Activity activity = SERIALIZER.deserialize(json);
-        assertNotNull(json, activity.getId());
-        assertNotNull(json, activity.getPublished());
-        assertNotNull(json, activity.getProvider());
-        assertNotNull(json, activity.getUrl());
-        assertNotNull(json, activity.getVerb());
+    protected void testConversion(Datasift item) throws Exception {
+        Activity activity = SERIALIZER.deserialize(item);
+        assertNotNull("activity.id", activity.getId());
+        assertNotNull("activity.published", activity.getPublished());
+        assertNotNull("activity.provider", activity.getProvider());
+        assertNotNull("activity.url", activity.getUrl());
+        assertNotNull("activity.verb", activity.getVerb());
         Actor actor = activity.getActor();
-        assertNotNull(json, actor);
-
+        assertNotNull("activity.actor", actor);
     }
 
     /**
      * Test that null fields are not present
      * @param json
      */
-    private void testDeserNoNull(String json) throws Exception {
-        Activity ser = SERIALIZER.deserialize(json);
-        String deser = MAPPER.writeValueAsString(ser);
-        int nulls = StringUtils.countMatches(deser, ":null");
+    protected void testDeserNoNull(String json) throws Exception {
+        int nulls = StringUtils.countMatches(json, ":null");
         assertEquals(0l, (long)nulls);
 
     }
@@ -97,10 +77,8 @@ public class DatasiftActivitySerializerTest {
      * Test that null fields are not present
      * @param json
      */
-    private void testDeserNoAddProps(String json) throws Exception {
-        Activity ser = SERIALIZER.deserialize(json);
-        String deser = MAPPER.writeValueAsString(ser);
-        int nulls = StringUtils.countMatches(deser, "additionalProperties:{");
+    protected void testDeserNoAddProps(String json) throws Exception {
+        int nulls = StringUtils.countMatches(json, "additionalProperties:{");
         assertEquals(0l, (long)nulls);
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/99e70b48/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java
index 2004654..fda57c4 100644
--- a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java
@@ -18,25 +18,13 @@
 
 package org.apache.streams.datasift.serializer;
 
-import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
-import org.apache.streams.data.ActivitySerializer;
 import org.apache.streams.datasift.Datasift;
 import org.apache.streams.datasift.instagram.Instagram;
 import org.apache.streams.datasift.twitter.Twitter;
 import org.apache.streams.datasift.util.StreamsDatasiftMapper;
 import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.pojo.Delete;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.pojo.User;
-import org.apache.streams.twitter.provider.TwitterEventClassifier;
-import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonUserActivitySerializer;
-import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Scanner;
@@ -56,7 +44,7 @@ public class DatasiftEventClassifierTest {
             line = scanner.nextLine();
             Datasift datasift = MAPPER.readValue(line, Datasift.class);
             assert(DatasiftEventClassifier.detectClass(datasift) == Twitter.class);
-            assert(DatasiftEventClassifier.bestSerializer(datasift) instanceof DatasiftTweetActivitySerializer);
+            assert(DatasiftEventClassifier.bestSerializer(datasift) instanceof DatasiftTwitterActivitySerializer);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/99e70b48/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializerTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializerTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializerTest.java
new file mode 100644
index 0000000..5350d74
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializerTest.java
@@ -0,0 +1,43 @@
+package org.apache.streams.datasift.serializer;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Actor;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Scanner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class DatasiftInstagramActivitySerializerTest extends DatasiftActivitySerializerTest {
+
+    @Before
+    @Override
+    public void initSerializer() {
+        SERIALIZER = new DatasiftInstagramActivitySerializer();
+    }
+
+    @Test
+    @Override
+    public void testConversion() throws Exception {
+        Scanner scanner = new Scanner(DatasiftActivitySerializerTest.class.getResourceAsStream("/instagram_datasift_json.txt"));
+        String line = null;
+        while(scanner.hasNextLine()) {
+            line = scanner.nextLine();
+            Datasift item = MAPPER.readValue(line, Datasift.class);
+            testConversion(item);
+            String json = MAPPER.writeValueAsString(item);
+            testDeserNoNull(json);
+            testDeserNoAddProps(json);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/99e70b48/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializerTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializerTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializerTest.java
new file mode 100644
index 0000000..21d4ebb
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializerTest.java
@@ -0,0 +1,48 @@
+package org.apache.streams.datasift.serializer;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Actor;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Scanner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class DatasiftInteractionActivitySerializerTest extends DatasiftActivitySerializerTest {
+
+    @Before
+    @Override
+    public void initSerializer() {
+        SERIALIZER = new DatasiftInteractionActivitySerializer();
+    }
+
+    @Test
+    @Override
+    public void testConversion() throws Exception {
+        Scanner scanner = new Scanner(DatasiftInteractionActivitySerializerTest.class.getResourceAsStream("/rand_sample_datasift_json.txt"));
+        String line = null;
+        while(scanner.hasNextLine()) {
+            try {
+                line = scanner.nextLine();
+                Datasift item = MAPPER.readValue(line, Datasift.class);
+                testConversion(item);
+                String json = MAPPER.writeValueAsString(item);
+                testDeserNoNull(json);
+                testDeserNoAddProps(json);
+            } catch (Exception e) {
+                System.err.println(line);
+                throw e;
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/99e70b48/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftTwitterActivitySerializerTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftTwitterActivitySerializerTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftTwitterActivitySerializerTest.java
new file mode 100644
index 0000000..33b1f77
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftTwitterActivitySerializerTest.java
@@ -0,0 +1,43 @@
+package org.apache.streams.datasift.serializer;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Actor;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Scanner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class DatasiftTwitterActivitySerializerTest extends DatasiftActivitySerializerTest {
+
+    @Before
+    @Override
+    public void initSerializer() {
+        SERIALIZER = new DatasiftTwitterActivitySerializer();
+    }
+
+    @Test
+    @Override
+    public void testConversion() throws Exception {
+        Scanner scanner = new Scanner(DatasiftTwitterActivitySerializerTest.class.getResourceAsStream("/twitter_datasift_json.txt"));
+        String line = null;
+        while(scanner.hasNextLine()) {
+            line = scanner.nextLine();
+            Datasift item = MAPPER.readValue(line, Datasift.class);
+            testConversion(item);
+            String json = MAPPER.writeValueAsString(item);
+            testDeserNoNull(json);
+            testDeserNoAddProps(json);
+        }
+    }
+
+}


[3/9] incubator-streams git commit: deleting this was a mistake

Posted by sb...@apache.org.
deleting this was a mistake


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9bfaaa55
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9bfaaa55
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9bfaaa55

Branch: refs/heads/master
Commit: 9bfaaa551c9814073c303258d0edd747f1e75435
Parents: 2f6a657
Author: sblackmon <sb...@apache.org>
Authored: Thu Nov 6 16:53:50 2014 -0800
Committer: sblackmon <sb...@apache.org>
Committed: Thu Nov 6 16:53:50 2014 -0800

----------------------------------------------------------------------
 .../processor/TwitterProfileProcessor.java      | 133 +++++++++++++++++++
 1 file changed, 133 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9bfaaa55/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
new file mode 100644
index 0000000..674eef1
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.processor;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Queue;
+import java.util.Random;
+
+public class TwitterProfileProcessor implements StreamsProcessor, Runnable {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterProfileProcessor.class);
+
+    private ObjectMapper mapper = new StreamsTwitterMapper();
+
+    private Queue<StreamsDatum> inQueue;
+    private Queue<StreamsDatum> outQueue;
+
+    public final static String TERMINATE = new String("TERMINATE");
+
+    @Override
+    public void run() {
+
+        while(true) {
+            StreamsDatum item;
+            try {
+                item = inQueue.poll();
+                if(item.getDocument() instanceof String && item.equals(TERMINATE)) {
+                    LOGGER.info("Terminating!");
+                    break;
+                }
+
+                Thread.sleep(new Random().nextInt(100));
+
+                for( StreamsDatum entry : process(item)) {
+                    outQueue.offer(entry);
+                }
+
+
+            } catch (Exception e) {
+                e.printStackTrace();
+
+            }
+        }
+    }
+
+    public StreamsDatum createStreamsDatum(User user) {
+        return new StreamsDatum(user, user.getIdStr());
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+
+        List<StreamsDatum> result = Lists.newArrayList();
+        String item;
+        try {
+            // first check for valid json
+            // since data is coming from outside provider, we don't know what type the events are
+            if( entry.getDocument() instanceof String) {
+                item = (String) entry.getDocument();
+            } else {
+                item = mapper.writeValueAsString((ObjectNode)entry.getDocument());
+            }
+
+            Class inClass = TwitterEventClassifier.detectClass(item);
+
+            User user;
+
+            if ( inClass.equals( Tweet.class )) {
+                LOGGER.debug("TWEET");
+                Tweet tweet = mapper.readValue(item, Tweet.class);
+                user = tweet.getUser();
+                result.add(createStreamsDatum(user));
+            }
+            else if ( inClass.equals( Retweet.class )) {
+                LOGGER.debug("RETWEET");
+                Retweet retweet = mapper.readValue(item, Retweet.class);
+                user = retweet.getRetweetedStatus().getUser();
+                result.add(createStreamsDatum(user));
+            } else if ( inClass.equals( User.class )) {
+                LOGGER.debug("USER");
+                user = mapper.readValue(item, User.class);
+                result.add(createStreamsDatum(user));
+            } else {
+                return Lists.newArrayList();
+            }
+
+            return result;
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOGGER.warn("Error processing " + entry.toString());
+            return Lists.newArrayList();
+        }
+    }
+
+    @Override
+    public void prepare(Object o) {
+
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+};


[7/9] incubator-streams git commit: add license header

Posted by sb...@apache.org.
add license header


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a6f86266
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a6f86266
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a6f86266

Branch: refs/heads/master
Commit: a6f8626621d2ecb34071628d957e6c69673c2597
Parents: 800bce9
Author: sblackmon <sb...@apache.org>
Authored: Tue Nov 11 12:55:02 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Tue Nov 11 12:55:02 2014 -0600

----------------------------------------------------------------------
 .../serializer/DatasiftEventClassifier.java       | 18 ++++++++++++++++++
 1 file changed, 18 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a6f86266/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java
index e90de6a..0169f17 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.streams.datasift.serializer;
 
 import org.apache.streams.data.ActivitySerializer;


[5/9] incubator-streams git commit: Merge branch 'STREAMS-212' of https://git-wip-us.apache.org/repos/asf/incubator-streams into STREAMS-212

Posted by sb...@apache.org.
Merge branch 'STREAMS-212' of https://git-wip-us.apache.org/repos/asf/incubator-streams into STREAMS-212


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/612676d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/612676d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/612676d4

Branch: refs/heads/master
Commit: 612676d4e56a0011d5b402585745887849fcb349
Parents: aefa064 9bfaaa5
Author: sblackmon <sb...@apache.org>
Authored: Thu Nov 6 16:57:55 2014 -0800
Committer: sblackmon <sb...@apache.org>
Committed: Thu Nov 6 16:57:55 2014 -0800

----------------------------------------------------------------------
 .../DatasiftTypeConverterProcessor.java         |   7 +-
 .../serializer/DatasiftActivitySerializer.java  |  12 +-
 .../DatasiftDefaultActivitySerializer.java      | 214 -------------------
 .../DatasiftInstagramActivitySerializer.java    |   8 +-
 .../DatasiftTweetActivitySerializer.java        |   8 +-
 .../datasift/util/StreamsDatasiftMapper.java    |  11 +-
 .../com/datasift/test/DatasiftSerDeTest.java    |  18 +-
 .../DatasiftTypeConverterProcessorTest.java     |  72 -------
 .../DatasiftActivitySerializerTest.java         |   6 +-
 .../streams-provider-twitter/pom.xml            |   6 +
 .../processor/TwitterEventProcessor.java        | 194 -----------------
 .../twitter/processor/TwitterTypeConverter.java | 209 ------------------
 .../provider/TwitterEventClassifier.java        |  58 +++--
 .../serializer/StreamsTwitterMapper.java        |  11 +-
 .../TwitterJsonActivitySerializer.java          |  24 +--
 .../TwitterJsonDeleteActivitySerializer.java    |   6 +
 .../TwitterJsonRetweetActivitySerializer.java   |   6 +
 .../TwitterJsonTweetActivitySerializer.java     |   6 +
 .../TwitterJsonUserActivitySerializer.java      |   6 +
 ...erJsonUserstreameventActivitySerializer.java |   6 +
 .../streams/twitter/test/SimpleTweetTest.java   |  11 +-
 .../twitter/test/TweetActivitySerDeTest.java    |   6 +-
 .../streams/twitter/test/TweetSerDeTest.java    |   6 +-
 .../test/TwitterEventClassifierTest.java        |  34 +++
 24 files changed, 185 insertions(+), 760 deletions(-)
----------------------------------------------------------------------



[4/9] incubator-streams git commit: adding new test to SCM

Posted by sb...@apache.org.
adding new test to SCM


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/aefa064d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/aefa064d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/aefa064d

Branch: refs/heads/master
Commit: aefa064d54a8be673a546031eb3587c1e42d6f65
Parents: 9aebd0b
Author: sblackmon <sb...@apache.org>
Authored: Thu Nov 6 16:57:33 2014 -0800
Committer: sblackmon <sb...@apache.org>
Committed: Thu Nov 6 16:57:33 2014 -0800

----------------------------------------------------------------------
 .../serializer/DatasiftEventClassifierTest.java | 76 ++++++++++++++++++++
 1 file changed, 76 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aefa064d/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java
new file mode 100644
index 0000000..2004654
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.datasift.serializer;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.instagram.Instagram;
+import org.apache.streams.datasift.twitter.Twitter;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonUserActivitySerializer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Scanner;
+
+/**
+ * Created by sblackmon on 12/13/13.
+ */
+public class DatasiftEventClassifierTest {
+
+    private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsDatasiftMapper.DATASIFT_FORMAT));
+
+    @Test
+    public void testTwitterDetection() throws Exception {
+        Scanner scanner = new Scanner(DatasiftActivitySerializerTest.class.getResourceAsStream("/twitter_datasift_json.txt"));
+        String line = null;
+        while(scanner.hasNextLine()) {
+            line = scanner.nextLine();
+            Datasift datasift = MAPPER.readValue(line, Datasift.class);
+            assert(DatasiftEventClassifier.detectClass(datasift) == Twitter.class);
+            assert(DatasiftEventClassifier.bestSerializer(datasift) instanceof DatasiftTweetActivitySerializer);
+        }
+    }
+
+    @Test
+    public void testInstagramDetection() throws Exception {
+        Scanner scanner = new Scanner(DatasiftActivitySerializerTest.class.getResourceAsStream("/instagram_datasift_json.txt"));
+        String line = null;
+        while(scanner.hasNextLine()) {
+            line = scanner.nextLine();
+            Datasift datasift = MAPPER.readValue(line, Datasift.class);
+            assert(DatasiftEventClassifier.detectClass(datasift) == Instagram.class);
+            assert(DatasiftEventClassifier.bestSerializer(datasift) instanceof DatasiftInstagramActivitySerializer);
+        }
+    }
+    
+
+}


[8/9] incubator-streams git commit: Merge branch 'master' into STREAMS-212

Posted by sb...@apache.org.
Merge branch 'master' into STREAMS-212


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/dabc5116
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/dabc5116
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/dabc5116

Branch: refs/heads/master
Commit: dabc51162f1260d8cc7bf8b66902264049876b13
Parents: a6f8626 bfa9466
Author: sblackmon <sb...@apache.org>
Authored: Tue Nov 11 12:55:59 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Tue Nov 11 12:55:59 2014 -0600

----------------------------------------------------------------------
 pom.xml                                         |  58 ++++
 streams-components/pom.xml                      |  62 +++++
 streams-components/streams-http/README.md       |  16 ++
 streams-components/streams-http/pom.xml         | 153 +++++++++++
 .../components/http/HttpConfigurator.java       |  62 +++++
 .../http/processor/SimpleHTTPGetProcessor.java  | 268 +++++++++++++++++++
 .../http/provider/SimpleHTTPGetProvider.java    | 230 ++++++++++++++++
 .../components/http/HttpConfiguration.json      |  50 ++++
 .../http/HttpProcessorConfiguration.json        |  28 ++
 .../http/HttpProviderConfiguration.json         |  18 ++
 streams-contrib/pom.xml                         |   5 +-
 streams-contrib/streams-persist-console/pom.xml |  34 +++
 .../streams/console/ConsolePersistReader.java   |  11 +-
 .../streams/console/ConsolePersistWriter.java   |  11 +-
 .../streams-processor-peoplepattern/pom.xml     | 138 ++++++++++
 .../peoplepattern/AccountTypeProcessor.java     |  75 ++++++
 .../peoplepattern/DemographicsProcessor.java    |  76 ++++++
 .../streams/peoplepattern/AccountType.json      |  27 ++
 .../streams/peoplepattern/Demographics.json     |  60 +++++
 .../resources/templates/peoplepatternactor.json |  25 ++
 .../api/FacebookPostActivitySerializer.java     |   1 -
 .../google-gplus/pom.xml                        |  32 +++
 .../processor/GooglePlusCommentProcessor.java   |  87 ++++++
 .../gplus/provider/AbstractGPlusProvider.java   | 234 ++++++++++++++++
 .../gplus/provider/GPlusActivitySerializer.java |   4 +-
 .../gplus/provider/GPlusDataCollector.java      |  50 ++++
 .../provider/GPlusHistoryProviderTask.java      | 106 --------
 .../google/gplus/provider/GPlusProvider.java    | 189 -------------
 .../provider/GPlusUserActivityCollector.java    | 108 ++++++++
 .../provider/GPlusUserActivityProvider.java     |  18 ++
 .../gplus/provider/GPlusUserDataCollector.java  |  79 ++++++
 .../gplus/provider/GPlusUserDataProvider.java   |  18 ++
 .../util/GPlusCommentDeserializer.java          |  98 +++++++
 .../serializer/util/GooglePlusActivityUtil.java |  50 ++++
 .../com/google/gplus/GPlusConfiguration.json    |  44 ++-
 .../gplus/GooglePlusCommentSerDeTest.java       | 114 ++++++++
 .../provider/TestAbstractGPlusProvider.java     |  82 ++++++
 .../TestGPlusUserActivityCollector.java         | 268 +++++++++++++++++++
 .../provider/TestGPlusUserDataCollector.java    | 131 +++++++++
 .../resources/google_plus_comments_jsons.txt    |   3 +
 streams-contrib/streams-provider-google/pom.xml |   6 +
 .../streams-provider-twitter/pom.xml            |   5 +
 .../processor/TwitterUrlApiProcessor.java       |  73 +++++
 .../provider/TwitterTimelineProvider.java       |   7 +-
 streams-pojo-extensions/pom.xml                 |  64 +++++
 .../apache/streams/data/util/ExtensionUtil.java | 108 ++++++++
 .../apache/streams/data/util/ActivityUtil.java  |  14 +-
 .../jackson/StreamsDateTimeDeserializer.java    |  23 +-
 .../streams/jackson/StreamsJacksonMapper.java   |  20 ++
 .../streams/jackson/StreamsJacksonModule.java   |   9 +
 .../org/apache/streams/pojo/json/activity.json  |   3 +-
 .../org/apache/streams/pojo/json/object.json    |   2 +-
 .../local/builders/LocalStreamBuilder.java      |   5 +
 .../streams/local/tasks/StreamsMergeTask.java   |   7 +
 .../local/tasks/StreamsPersistWriterTask.java   |  18 +-
 .../local/tasks/StreamsProcessorTask.java       |  20 +-
 .../local/tasks/StreamsProviderTask.java        |  15 ++
 .../apache/streams/local/tasks/StreamsTask.java |   4 +
 .../local/builders/LocalStreamBuilderTest.java  |  32 ++-
 .../streams/local/tasks/BasicTasksTest.java     |  26 +-
 .../backoff/AbstractBackOffStrategy.java        |  15 +-
 61 files changed, 3255 insertions(+), 344 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dabc5116/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --cc streams-contrib/streams-provider-twitter/pom.xml
index 9c99a92,0bf3fe7..3880135
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@@ -50,14 -50,13 +50,19 @@@
          </dependency>
          <dependency>
              <groupId>org.apache.streams</groupId>
 +            <artifactId>streams-processor-jackson</artifactId>
 +            <version>${project.version}</version>
 +            <scope>test</scope>
 +        </dependency>
 +        <dependency>
 +            <groupId>org.apache.streams</groupId>
              <artifactId>streams-util</artifactId>
              <version>${project.version}</version>
+         </dependency>
+         <dependency>
+             <groupId>org.apache.streams</groupId>
+             <artifactId>streams-http</artifactId>
+             <version>${project.version}</version>
          </dependency>
          <dependency>
              <groupId>com.google.guava</groupId>


[2/9] incubator-streams git commit: use StreamsJacksonMapper with channel-specific date support deprecate StreamsTwitterMapper deprecate StreamsDatasiftMapper use generic TypeConverterProcessor deprecate/delete now unnecessary classes refactor event clas

Posted by sb...@apache.org.
use StreamsJacksonMapper with channel-specific date support
deprecate StreamsTwitterMapper
deprecate StreamsDatasiftMapper
use generic TypeConverterProcessor
deprecate/delete now unnecessary classes
refactor event classification
add event classification tests
refactor channel serialization
update tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2f6a6574
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2f6a6574
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2f6a6574

Branch: refs/heads/master
Commit: 2f6a6574e1a3d87fd6ae1457f35ce663fc914173
Parents: 9aebd0b
Author: sblackmon <sb...@apache.org>
Authored: Thu Nov 6 15:27:52 2014 -0800
Committer: sblackmon <sb...@apache.org>
Committed: Thu Nov 6 15:27:52 2014 -0800

----------------------------------------------------------------------
 .../DatasiftTypeConverterProcessor.java         |   7 +-
 .../serializer/DatasiftActivitySerializer.java  |  12 +-
 .../DatasiftDefaultActivitySerializer.java      | 214 -------------------
 .../DatasiftInstagramActivitySerializer.java    |   8 +-
 .../DatasiftTweetActivitySerializer.java        |   8 +-
 .../datasift/util/StreamsDatasiftMapper.java    |  11 +-
 .../com/datasift/test/DatasiftSerDeTest.java    |  18 +-
 .../DatasiftTypeConverterProcessorTest.java     |  72 -------
 .../DatasiftActivitySerializerTest.java         |   6 +-
 .../serializer/DatasiftEventClassifierTest.java |  76 +++++++
 .../streams-provider-twitter/pom.xml            |   6 +
 .../processor/TwitterEventProcessor.java        | 194 -----------------
 .../processor/TwitterProfileProcessor.java      | 133 ------------
 .../twitter/processor/TwitterTypeConverter.java | 209 ------------------
 .../provider/TwitterEventClassifier.java        |  58 +++--
 .../serializer/StreamsTwitterMapper.java        |  11 +-
 .../TwitterJsonActivitySerializer.java          |  24 +--
 .../TwitterJsonDeleteActivitySerializer.java    |   6 +
 .../TwitterJsonRetweetActivitySerializer.java   |   6 +
 .../TwitterJsonTweetActivitySerializer.java     |   6 +
 .../TwitterJsonUserActivitySerializer.java      |   6 +
 ...erJsonUserstreameventActivitySerializer.java |   6 +
 .../streams/twitter/test/SimpleTweetTest.java   |  11 +-
 .../twitter/test/TweetActivitySerDeTest.java    |   6 +-
 .../streams/twitter/test/TweetSerDeTest.java    |   6 +-
 .../test/TwitterEventClassifierTest.java        |  34 +++
 26 files changed, 261 insertions(+), 893 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
index a00cf23..1166b2e 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
@@ -57,7 +57,12 @@ public class DatasiftTypeConverterProcessor implements StreamsProcessor {
         List<StreamsDatum> result = Lists.newLinkedList();
         Object doc;
         try {
-            doc = this.converter.convert(entry.getDocument(), this.mapper);
+            if( entry.getDocument() instanceof String ) {
+                ObjectNode node = this.mapper.readValue((String)entry.getDocument(), ObjectNode.class);
+                doc = this.converter.convert(node, this.mapper);
+            } else {
+                doc = this.converter.convert(entry.getDocument(), this.mapper);
+            }
             if(doc != null) {
                 result.add(new StreamsDatum(doc, entry.getId()));
             }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
index 0ada979..b587cd6 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
@@ -32,9 +32,6 @@ import java.util.List;
  */
 public class DatasiftActivitySerializer implements ActivitySerializer<Datasift> {
 
-    private static final DatasiftDefaultActivitySerializer DEFAULT_SERIALIZER = new DatasiftDefaultActivitySerializer();
-    private static final DatasiftTweetActivitySerializer TWITTER_SERIALIZER = new DatasiftTweetActivitySerializer();
-    private static final DatasiftInstagramActivitySerializer INSTAGRAM_SERIALIZER = new DatasiftInstagramActivitySerializer();
     private static final ObjectMapper MAPPER = StreamsDatasiftMapper.getInstance();
 
     @Override
@@ -49,13 +46,8 @@ public class DatasiftActivitySerializer implements ActivitySerializer<Datasift>
 
     @Override
     public Activity deserialize(Datasift serialized) throws ActivitySerializerException {
-        if(serialized.getTwitter() != null) {
-            return TWITTER_SERIALIZER.deserialize(serialized);
-        } else if(serialized.getInstagram() != null) {
-            return INSTAGRAM_SERIALIZER.deserialize(serialized);
-        } else {
-            return DEFAULT_SERIALIZER.deserialize(serialized);
-        }
+        ActivitySerializer serializer = DatasiftEventClassifier.bestSerializer(serialized);
+        return serializer.deserialize(serialized);
     }
 
     public Activity deserialize(String json) throws ActivitySerializerException {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
deleted file mode 100644
index 678f67b..0000000
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
+++ /dev/null
@@ -1,214 +0,0 @@
-package org.apache.streams.datasift.serializer;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.interaction.Interaction;
-import org.apache.streams.datasift.links.Links;
-import org.apache.streams.datasift.util.StreamsDatasiftMapper;
-import org.apache.streams.pojo.json.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
-
-/**
- *
- */
-public class DatasiftDefaultActivitySerializer implements ActivitySerializer<Datasift>, Serializable {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(DatasiftDefaultActivitySerializer.class);
-
-    public static final String DATE_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy";
-
-    ObjectMapper mapper = StreamsDatasiftMapper.getInstance();
-
-    @Override
-    public String serializationFormat() {
-        return "application/json+datasift.com.v1.1";
-    }
-
-    @Override
-    public Datasift serialize(Activity deserialized) {
-        throw new UnsupportedOperationException("Cannot currently serialize to Datasift JSON");
-    }
-
-    public Activity deserialize(String datasiftJson) {
-        try {
-            return deserialize(this.mapper.readValue(datasiftJson, Datasift.class));
-        } catch (Exception e) {
-            LOGGER.error("Exception while trying convert,\n {},\n to a Datasift object.", datasiftJson);
-            LOGGER.error("Exception : {}", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public Activity deserialize(Datasift serialized) {
-
-        try {
-
-            Activity activity = convert(serialized);
-
-            return activity;
-
-        } catch (Exception e) {
-            throw new IllegalArgumentException("Unable to deserialize", e);
-        }
-
-    }
-
-    @Override
-    public List<Activity> deserializeAll(List<Datasift> datasifts) {
-        List<Activity> activities = Lists.newArrayList();
-        for( Datasift datasift : datasifts ) {
-            activities.add(deserialize(datasift));
-        }
-        return activities;
-    }
-
-    public static Generator buildGenerator(Interaction interaction) {
-        Generator generator = new Generator();
-        generator.setDisplayName(interaction.getSource());
-        generator.setId(interaction.getSource());
-        return generator;
-    }
-
-    public static Icon getIcon(Interaction interaction) {
-        return null;
-    }
-
-    public static Provider buildProvider(Interaction interaction) {
-        Provider provider = new Provider();
-        provider.setId("id:providers:"+interaction.getType());
-        provider.setDisplayName(interaction.getType());
-        return provider;
-    }
-
-    public static String getUrls(Interaction interaction) {
-        return null;
-    }
-
-    public static void addDatasiftExtension(Activity activity, Datasift datasift) {
-        Map<String, Object> extensions = org.apache.streams.data.util.ActivityUtil.ensureExtensions(activity);
-        extensions.put("datasift", datasift);
-    }
-
-    public static String formatId(String... idparts) {
-        return Joiner.on(":").join(Lists.asList("id:datasift", idparts));
-    }
-
-    public Activity convert(Datasift event) {
-
-        Activity activity = new Activity();
-        activity.setActor(buildActor(event.getInteraction()));
-        activity.setVerb(selectVerb(event));
-        activity.setObject(buildActivityObject(event.getInteraction()));
-        activity.setId(formatId(activity.getVerb(), event.getInteraction().getId()));
-        activity.setTarget(buildTarget(event.getInteraction()));
-        activity.setPublished(event.getInteraction().getCreatedAt());
-        activity.setGenerator(buildGenerator(event.getInteraction()));
-        activity.setIcon(getIcon(event.getInteraction()));
-        activity.setProvider(buildProvider(event.getInteraction()));
-        activity.setTitle(event.getInteraction().getTitle());
-        activity.setContent(event.getInteraction().getContent());
-        activity.setUrl(event.getInteraction().getLink());
-        activity.setLinks(getLinks(event));
-        addDatasiftExtension(activity, event);
-        if( event.getInteraction().getGeo() != null) {
-            addLocationExtension(activity, event.getInteraction());
-        }
-        return activity;
-    }
-
-    private String selectVerb(Datasift event) {
-        return "post";
-    }
-
-    public Actor buildActor(Interaction interaction) {
-        Actor actor = new Actor();
-        org.apache.streams.datasift.interaction.Author author = interaction.getAuthor();
-        if(author == null) {
-            LOGGER.warn("Interaction does not contain author information.");
-            return actor;
-        }
-        String userName = author.getUsername();
-        String name = author.getName();
-        Long id  = author.getId();
-        if(userName != null) {
-            actor.setDisplayName(userName);
-        } else {
-            actor.setDisplayName(name);
-        }
-
-        if(id != null) {
-            actor.setId(id.toString());
-        } else {
-            if(userName != null)
-                actor.setId(userName);
-            else
-                actor.setId(name);
-        }
-        Image image = new Image();
-        image.setUrl(interaction.getAuthor().getAvatar());
-        actor.setImage(image);
-        if (interaction.getAuthor().getLink()!=null){
-            actor.setUrl(interaction.getAuthor().getLink());
-        }
-        return actor;
-    }
-
-    public static ActivityObject buildActivityObject(Interaction interaction) {
-        ActivityObject actObj = new ActivityObject();
-        actObj.setObjectType(interaction.getContenttype());
-        actObj.setUrl(interaction.getLink());
-        actObj.setId(formatId("post",interaction.getId()));
-        actObj.setContent(interaction.getContent());
-
-        return actObj;
-    }
-
-    public static List<String> getLinks(Datasift event) {
-        List<String> result = Lists.newArrayList();
-        Links links = event.getLinks();
-        if(links == null)
-            return null;
-        for(Object link : links.getNormalizedUrl()) {
-            if(link != null) {
-                if(link instanceof String) {
-                    result.add((String) link);
-                } else {
-                    LOGGER.warn("link is not of type String : {}", link.getClass().getName());
-                }
-            }
-        }
-        return result;
-    }
-
-    public static ActivityObject buildTarget(Interaction interaction) {
-        return null;
-    }
-
-    public static void addLocationExtension(Activity activity, Interaction interaction) {
-        Map<String, Object> extensions = ensureExtensions(activity);
-        Map<String, Object> location = new HashMap<String, Object>();
-        Map<String, Double> coordinates = new HashMap<String, Double>();
-        coordinates.put("latitude", interaction.getGeo().getLatitude());
-        coordinates.put("longitude", interaction.getGeo().getLongitude());
-        location.put("coordinates", coordinates);
-        extensions.put("location", location);
-    }
-
-    public static String firstStringIfNotNull(List<Object> list) {
-        if( list != null && list.size() > 0) {
-            return (String) list.get(0);
-        } else return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java
index cb44df2..d121d65 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java
@@ -39,10 +39,16 @@ import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
 /**
  *
  */
-public class DatasiftInstagramActivitySerializer extends DatasiftDefaultActivitySerializer {
+public class DatasiftInstagramActivitySerializer extends DatasiftInteractionActivitySerializer {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(DatasiftInstagramActivitySerializer.class);
 
+    private static DatasiftInstagramActivitySerializer instance = new DatasiftInstagramActivitySerializer();
+
+    public static DatasiftInstagramActivitySerializer getInstance() {
+        return instance;
+    }
+
     @Override
     public Activity convert(Datasift event) {
         Activity activity = super.convert(event);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
index 3c7abda..6fd19e7 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
@@ -46,10 +46,16 @@ import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
 /**
  *
  */
-public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySerializer {
+public class DatasiftTweetActivitySerializer extends DatasiftInteractionActivitySerializer {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(DatasiftTweetActivitySerializer.class);
 
+    private static DatasiftTweetActivitySerializer instance = new DatasiftTweetActivitySerializer();
+
+    public static DatasiftTweetActivitySerializer getInstance() {
+        return instance;
+    }
+
     @Override
     public Activity convert(Datasift event) {
         Activity activity = new Activity();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java
index c5f2abf..93ab28b 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java
@@ -33,16 +33,21 @@ import java.io.IOException;
 
 /**
  * Created by sblackmon on 3/27/14.
+ *
+ * Depracated: Use StreamsJacksonMapper instead
  */
+@Deprecated()
 public class StreamsDatasiftMapper extends StreamsJacksonMapper {
 
-    public static final DateTimeFormatter DATASIFT_FORMAT = DateTimeFormat.forPattern("EEE, dd MMM yyyy HH:mm:ss Z");
+    public static final String DATASIFT_FORMAT = "EEE, dd MMM yyyy HH:mm:ss Z";
+
+    public static final DateTimeFormatter DATASIFT_FORMATTER = DateTimeFormat.forPattern(DATASIFT_FORMAT);
 
     public static final Long getMillis(String dateTime) {
 
         // this function is for pig which doesn't handle exceptions well
         try {
-            Long result = DATASIFT_FORMAT.parseMillis(dateTime);
+            Long result = DATASIFT_FORMATTER.parseMillis(dateTime);
             return result;
         } catch( Exception e ) {
             return null;
@@ -66,7 +71,7 @@ public class StreamsDatasiftMapper extends StreamsJacksonMapper {
                     public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException, JsonProcessingException {
                         DateTime result = null;
                         try {
-                            result = DATASIFT_FORMAT.parseDateTime(jpar.getValueAsString());
+                            result = DATASIFT_FORMATTER.parseDateTime(jpar.getValueAsString());
                         } catch (Exception e) {}
                         if (result == null) {
                             try {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/test/java17/com/datasift/test/DatasiftSerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/com/datasift/test/DatasiftSerDeTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/com/datasift/test/DatasiftSerDeTest.java
index c4422db..750915e 100644
--- a/streams-contrib/streams-provider-datasift/src/test/java17/com/datasift/test/DatasiftSerDeTest.java
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/com/datasift/test/DatasiftSerDeTest.java
@@ -20,7 +20,9 @@ package com.datasift.test;
 
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
 import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.junit.Assert;
 import org.junit.Ignore;
@@ -31,6 +33,8 @@ import org.slf4j.LoggerFactory;
 import java.io.BufferedReader;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  *
@@ -39,18 +43,11 @@ public class DatasiftSerDeTest {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftSerDeTest.class);
 
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsDatasiftMapper.DATASIFT_FORMAT));
 
-
-
-
-    @Test @Ignore
+    @Test
     public void Tests()
     {
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
-
         InputStream is = DatasiftSerDeTest.class.getResourceAsStream("/part-r-00000.json");
         InputStreamReader isr = new InputStreamReader(is);
         BufferedReader br = new BufferedReader(isr);
@@ -60,6 +57,7 @@ public class DatasiftSerDeTest {
                 String line = br.readLine();
                 LOGGER.debug(line);
                 System.out.println(line);
+
                 Datasift ser = mapper.readValue(line, Datasift.class);
 
                 String de = mapper.writeValueAsString(ser);
@@ -68,7 +66,7 @@ public class DatasiftSerDeTest {
 
                 Datasift serde = mapper.readValue(de, Datasift.class);
 
-//                Assert.assertEquals(ser, serde);
+                Assert.assertEquals(ser, serde);
 
                 LOGGER.debug(mapper.writeValueAsString(serde));
             }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java
deleted file mode 100644
index 015f4e9..0000000
--- a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.datasift.provider;
-
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.datasift.processor.DatasiftTypeConverterProcessor;
-import org.apache.streams.pojo.json.Activity;
-import org.junit.Test;
-
-import java.util.List;
-
-import static junit.framework.Assert.*;
-
-/**
- *
- */
-public class DatasiftTypeConverterProcessorTest {
-
-    private static final String DATASIFT_JSON = "{\"demographic\":{\"gender\":\"female\"},\"interaction\":{\"schema\":{\"version\":3},\"source\":\"Twitter for Android\",\"author\":{\"username\":\"ViiOLeee\",\"name\":\"Violeta Anguita\",\"id\":70931384,\"avatar\":\"http://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"link\":\"http://twitter.com/ViiOLeee\",\"language\":\"en\"},\"type\":\"twitter\",\"created_at\":\"Tue, 27 May 2014 22:38:15 +0000\",\"received_at\":1.401230295658E9,\"content\":\"RT @AliiAnguita: \\\"@Pharrell: Loved working with @edsheeran on Sing. He's a genius. https://t.co/wB2qKyJMRw\\\" @ViiOLeee  look at this!\",\"id\":\"1e3e5ef97532a580e0741841f5746728\",\"link\":\"http://twitter.com/ViiOLeee/status/471420141989666817\",\"mentions\":[\"Pharrell\",\"edsheeran\",\"ViiOLeee\",\"AliiAnguita\"],\"mention_ids\":[338084918,85452649,70931384]},\"klout\":{\"score\":34},\"language\":{\"tag\":\"en\",\"tag_extended\":\"en\",\
 "confidence\":98},\"links\":{\"code\":[200],\"created_at\":[\"Tue, 27 May 2014 14:28:06 +0000\"],\"meta\":{\"charset\":[\"UTF-8\"],\"content_type\":[\"text/html\"],\"description\":[\"Official Video for Ed Sheeran&#39;s track SING Get this track on iTunes: http://smarturl.it/EdSing Pre-order &#39;x&#39; on iTunes and get &#39;One&#39; instantly: http://smartu...\"],\"keywords\":[[\"ed sheeran\",\"ed sheeran sing\",\"ed sheeran new album\",\"Ed Sheeran (Musical Artist)\",\"ed sheeran one\",\"ed sheeran fault in our stars\",\"ed sheeran all of the stars\",\"s...\"]],\"lang\":[\"en\"],\"opengraph\":[{\"site_name\":\"YouTube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\",\"title\":\"Ed Sheeran - SING [Official Video]\",\"image\":\"https://i1.ytimg.com/vi/tlYcUqEPN58/maxresdefault.jpg\",\"description\":\"Official Video for Ed Sheeran&#39;s track SING Get this track on iTunes: http://smarturl.it/EdSing Pre-order &#39;x&#39; on iTunes and get &#39;One&#39; instantly: http://smartu
 ...\",\"type\":\"video\"}],\"twitter\":[{\"card\":\"player\",\"site\":\"@youtube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\",\"title\":\"Ed Sheeran - SING [Official Video]\",\"description\":\"Official Video for Ed Sheeran&#39;s track SING Get this track on iTunes: http://smarturl.it/EdSing Pre-order &#39;x&#39; on iTunes and get &#39;One&#39; instantly: http://smartu...\",\"image\":\"https://i1.ytimg.com/vi/tlYcUqEPN58/maxresdefault.jpg\",\"app\":{\"iphone\":{\"name\":\"YouTube\",\"id\":\"544007664\",\"url\":\"vnd.youtube://watch/tlYcUqEPN58\"},\"ipad\":{\"name\":\"YouTube\",\"id\":\"544007664\",\"url\":\"vnd.youtube://watch/tlYcUqEPN58\"},\"googleplay\":{\"name\":\"YouTube\",\"id\":\"com.google.android.youtube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\"}},\"player\":\"https://www.youtube.com/embed/tlYcUqEPN58\",\"player_width\":\"1280\",\"player_height\":\"720\"}]},\"normalized_url\":[\"https://youtube.com/watch?v=tlYcUqEPN58\"],\"retweet_count\":[0],\"tit
 le\":[\"Ed Sheeran - SING [Official Video] - YouTube\"],\"url\":[\"https://www.youtube.com/watch?v=tlYcUqEPN58\"]},\"twitter\":{\"id\":\"471420141989666817\",\"retweet\":{\"text\":\"\\\"@Pharrell: Loved working with @edsheeran on Sing. He's a genius. https://t.co/wB2qKyJMRw\\\" @ViiOLeee  look at this!\",\"id\":\"471420141989666817\",\"user\":{\"name\":\"Violeta Anguita\",\"description\":\"La vida no seria la fiesta que todos esperamos, pero mientras estemos aqui debemos BAILAR!!! #ErasmusOnceErasmusForever\",\"location\":\"Espanhaa..Olaa!\",\"statuses_count\":5882,\"followers_count\":249,\"friends_count\":1090,\"screen_name\":\"ViiOLeee\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"lang\":\"en\",\"time_zone\":\"Madrid\",\"utc_offset\":7200,\"listed_count\":1,\"id\":709
 31384,\"id_str\":\"70931384\",\"geo_enabled\":false,\"verified\":false,\"favourites_count\":275,\"created_at\":\"Wed, 02 Sep 2009 10:19:59 +0000\"},\"source\":\"<a href=\\\"http://twitter.com/download/android\\\" rel=\\\"nofollow\\\">Twitter for Android</a>\",\"count\":1,\"created_at\":\"Tue, 27 May 2014 22:38:15 +0000\",\"mentions\":[\"Pharrell\",\"edsheeran\",\"ViiOLeee\",\"AliiAnguita\"],\"mention_ids\":[338084918,85452649,70931384],\"links\":[\"https://www.youtube.com/watch?v=tlYcUqEPN58\"],\"display_urls\":[\"youtube.com/watch?v=tlYcUq���\"],\"domains\":[\"www.youtube.com\"],\"lang\":\"en\"},\"retweeted\":{\"id\":\"471419867078209536\",\"user\":{\"name\":\"Alicia Anguita \",\"description\":\"Estudiante de Ingenieria de la Edificaci��n en Granada.\",\"statuses_count\":371,\"followers_count\":185,\"friends_count\":404,\"screen_name\":\"AliiAnguita\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/424248659677442048/qCPZL8c9_normal.jpeg\",\"profile_image_url_
 https\":\"https://pbs.twimg.com/profile_images/424248659677442048/qCPZL8c9_normal.jpeg\",\"lang\":\"es\",\"listed_count\":0,\"id\":561201891,\"id_str\":\"561201891\",\"geo_enabled\":false,\"verified\":false,\"favourites_count\":17,\"created_at\":\"Mon, 23 Apr 2012 13:11:44 +0000\"},\"source\":\"<a href=\\\"http://twitter.com/download/android\\\" rel=\\\"nofollow\\\">Twitter for Android</a>\",\"created_at\":\"Tue, 27 May 2014 22:37:09 +0000\"}}}";
-
-    @Test
-    public void testTypeConverterToString() {
-        final String ID = "1";
-        StreamsProcessor processor = new DatasiftTypeConverterProcessor(String.class);
-        processor.prepare(null);
-        StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID);
-        List<StreamsDatum> result = processor.process(datum);
-        assertNotNull(result);
-        assertEquals(1, result.size());
-        StreamsDatum resultDatum = result.get(0);
-        assertNotNull(resultDatum);
-        assertNotNull(resultDatum.getDocument());
-        assertTrue(resultDatum.getDocument() instanceof String);
-        assertEquals(ID, resultDatum.getId());
-    }
-
-    @Test
-    public void testTypeConverterToActivity() {
-        final String ID = "1";
-        StreamsProcessor processor = new DatasiftTypeConverterProcessor(Activity.class);
-        processor.prepare(null);
-        StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID);
-        List<StreamsDatum> result = processor.process(datum);
-        assertNotNull(result);
-        assertEquals(1, result.size());
-        StreamsDatum resultDatum = result.get(0);
-        assertNotNull(resultDatum);
-        assertNotNull(resultDatum.getDocument());
-        assertTrue(resultDatum.getDocument() instanceof Activity);
-        assertEquals(ID, resultDatum.getId());
-    }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
index 5f9feed..162526b 100644
--- a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
@@ -2,8 +2,10 @@ package org.apache.streams.datasift.serializer;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
 import org.apache.streams.datasift.util.StreamsDatasiftMapper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.Actor;
 import org.junit.Test;
@@ -16,7 +18,8 @@ import static org.junit.Assert.assertNotNull;
 public class DatasiftActivitySerializerTest {
 
     private static final DatasiftActivitySerializer SERIALIZER = new DatasiftActivitySerializer();
-    private static final ObjectMapper MAPPER = StreamsDatasiftMapper.getInstance();
+
+    private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsDatasiftMapper.DATASIFT_FORMAT));
 
     @Test
     public void testGeneralConversion() throws Exception {
@@ -42,6 +45,7 @@ public class DatasiftActivitySerializerTest {
             testGeneralConversion(line);
             testDeserNoNull(line);
             testDeserNoAddProps(line);
+
             System.out.println("ORIGINAL -> "+line);
             System.out.println("ACTIVITY -> "+MAPPER.writeValueAsString(SERIALIZER.deserialize(line)));
             System.out.println("NODE     -> "+MAPPER.convertValue(SERIALIZER.deserialize(line), JsonNode.class));

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java
new file mode 100644
index 0000000..2004654
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.datasift.serializer;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.instagram.Instagram;
+import org.apache.streams.datasift.twitter.Twitter;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonUserActivitySerializer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Scanner;
+
+/**
+ * Created by sblackmon on 12/13/13.
+ */
+public class DatasiftEventClassifierTest {
+
+    private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsDatasiftMapper.DATASIFT_FORMAT));
+
+    @Test
+    public void testTwitterDetection() throws Exception {
+        Scanner scanner = new Scanner(DatasiftActivitySerializerTest.class.getResourceAsStream("/twitter_datasift_json.txt"));
+        String line = null;
+        while(scanner.hasNextLine()) {
+            line = scanner.nextLine();
+            Datasift datasift = MAPPER.readValue(line, Datasift.class);
+            assert(DatasiftEventClassifier.detectClass(datasift) == Twitter.class);
+            assert(DatasiftEventClassifier.bestSerializer(datasift) instanceof DatasiftTweetActivitySerializer);
+        }
+    }
+
+    @Test
+    public void testInstagramDetection() throws Exception {
+        Scanner scanner = new Scanner(DatasiftActivitySerializerTest.class.getResourceAsStream("/instagram_datasift_json.txt"));
+        String line = null;
+        while(scanner.hasNextLine()) {
+            line = scanner.nextLine();
+            Datasift datasift = MAPPER.readValue(line, Datasift.class);
+            assert(DatasiftEventClassifier.detectClass(datasift) == Instagram.class);
+            assert(DatasiftEventClassifier.bestSerializer(datasift) instanceof DatasiftInstagramActivitySerializer);
+        }
+    }
+    
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml
index edf4959..9c99a92 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -50,6 +50,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
+            <artifactId>streams-processor-jackson</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
             <artifactId>streams-util</artifactId>
             <version>${project.version}</version>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
deleted file mode 100644
index fb4615f..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.processor;
-
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.pojo.Delete;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.provider.TwitterEventClassifier;
-import org.apache.streams.twitter.serializer.*;
-import org.apache.streams.util.ComponentUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class TwitterEventProcessor implements StreamsProcessor {
-
-    private final static String STREAMS_ID = "TwitterEventProcessor";
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterEventProcessor.class);
-
-    private ObjectMapper mapper = new StreamsTwitterMapper();
-
-    private Class inClass;
-    private Class outClass;
-
-    private TwitterJsonActivitySerializer twitterJsonActivitySerializer;
-
-    public TwitterEventProcessor(Class inClass, Class outClass) {
-        this.inClass = inClass;
-        this.outClass = outClass;
-    }
-
-    public TwitterEventProcessor( Class outClass) {
-        this(null, outClass);
-    }
-
-    public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException {
-
-        Object result = null;
-
-        Preconditions.checkNotNull(event);
-        Preconditions.checkNotNull(mapper);
-        Preconditions.checkNotNull(twitterJsonActivitySerializer);
-
-        if( outClass.equals( Activity.class )) {
-                LOGGER.debug("ACTIVITY");
-                result = twitterJsonActivitySerializer.deserialize(
-                        mapper.writeValueAsString(event));
-        } else if( outClass.equals( Tweet.class )) {
-            if ( inClass.equals( Tweet.class )) {
-                LOGGER.debug("TWEET");
-                result = mapper.convertValue(event, Tweet.class);
-            }
-        } else if( outClass.equals( Retweet.class )) {
-            if ( inClass.equals( Retweet.class )) {
-                LOGGER.debug("RETWEET");
-                result = mapper.convertValue(event, Retweet.class);
-            }
-        } else if( outClass.equals( Delete.class )) {
-            if ( inClass.equals( Delete.class )) {
-                LOGGER.debug("DELETE");
-                result = mapper.convertValue(event, Delete.class);
-            }
-        } else if( outClass.equals( ObjectNode.class )) {
-            LOGGER.debug("OBJECTNODE");
-            result = mapper.convertValue(event, ObjectNode.class);
-        }
-
-            // no supported conversion were applied
-        if( result != null )
-            return result;
-
-        LOGGER.debug("CONVERT FAILED");
-
-        return null;
-
-    }
-
-    public boolean validate(Object document, Class klass) {
-
-        // TODO
-        return true;
-    }
-
-    public boolean isValidJSON(final String json) {
-        boolean valid = false;
-        try {
-            final JsonParser parser = new ObjectMapper().getJsonFactory()
-                    .createJsonParser(json);
-            while (parser.nextToken() != null) {
-            }
-            valid = true;
-        } catch (JsonParseException jpe) {
-            LOGGER.warn("validate: {}", jpe);
-        } catch (IOException ioe) {
-            LOGGER.warn("validate: {}", ioe);
-        }
-
-        return valid;
-    }
-
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-
-        // first check for valid json
-        ObjectNode node = (ObjectNode) entry.getDocument();
-
-        LOGGER.debug("{} processing {}", STREAMS_ID, node.getClass());
-
-        String json = null;
-        try {
-            json = mapper.writeValueAsString(node);
-        } catch (JsonProcessingException e) {
-            e.printStackTrace();
-        }
-
-        if( StringUtils.isNotEmpty(json)) {
-
-            // since data is coming from outside provider, we don't know what type the events are
-            Class inClass = TwitterEventClassifier.detectClass(json);
-
-            // if the target is string, just pass-through
-            if (java.lang.String.class.equals(outClass))
-                return Lists.newArrayList(new StreamsDatum(json));
-            else {
-                // convert to desired format
-                Object out = null;
-                try {
-                    out = convert(node, inClass, outClass);
-                } catch (ActivitySerializerException e) {
-                    LOGGER.warn("Failed deserializing", e);
-                    return Lists.newArrayList();
-                } catch (JsonProcessingException e) {
-                    LOGGER.warn("Failed parsing JSON", e);
-                    return Lists.newArrayList();
-                }
-
-                if (out != null && validate(out, outClass))
-                    return Lists.newArrayList(new StreamsDatum(out));
-            }
-        }
-
-        return Lists.newArrayList();
-
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-        mapper = new StreamsJacksonMapper();
-        twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
-    }
-
-    @Override
-    public void cleanUp() {
-
-    }
-};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
deleted file mode 100644
index 674eef1..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.processor;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.pojo.User;
-import org.apache.streams.twitter.provider.TwitterEventClassifier;
-import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Queue;
-import java.util.Random;
-
-public class TwitterProfileProcessor implements StreamsProcessor, Runnable {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterProfileProcessor.class);
-
-    private ObjectMapper mapper = new StreamsTwitterMapper();
-
-    private Queue<StreamsDatum> inQueue;
-    private Queue<StreamsDatum> outQueue;
-
-    public final static String TERMINATE = new String("TERMINATE");
-
-    @Override
-    public void run() {
-
-        while(true) {
-            StreamsDatum item;
-            try {
-                item = inQueue.poll();
-                if(item.getDocument() instanceof String && item.equals(TERMINATE)) {
-                    LOGGER.info("Terminating!");
-                    break;
-                }
-
-                Thread.sleep(new Random().nextInt(100));
-
-                for( StreamsDatum entry : process(item)) {
-                    outQueue.offer(entry);
-                }
-
-
-            } catch (Exception e) {
-                e.printStackTrace();
-
-            }
-        }
-    }
-
-    public StreamsDatum createStreamsDatum(User user) {
-        return new StreamsDatum(user, user.getIdStr());
-    }
-
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-
-        List<StreamsDatum> result = Lists.newArrayList();
-        String item;
-        try {
-            // first check for valid json
-            // since data is coming from outside provider, we don't know what type the events are
-            if( entry.getDocument() instanceof String) {
-                item = (String) entry.getDocument();
-            } else {
-                item = mapper.writeValueAsString((ObjectNode)entry.getDocument());
-            }
-
-            Class inClass = TwitterEventClassifier.detectClass(item);
-
-            User user;
-
-            if ( inClass.equals( Tweet.class )) {
-                LOGGER.debug("TWEET");
-                Tweet tweet = mapper.readValue(item, Tweet.class);
-                user = tweet.getUser();
-                result.add(createStreamsDatum(user));
-            }
-            else if ( inClass.equals( Retweet.class )) {
-                LOGGER.debug("RETWEET");
-                Retweet retweet = mapper.readValue(item, Retweet.class);
-                user = retweet.getRetweetedStatus().getUser();
-                result.add(createStreamsDatum(user));
-            } else if ( inClass.equals( User.class )) {
-                LOGGER.debug("USER");
-                user = mapper.readValue(item, User.class);
-                result.add(createStreamsDatum(user));
-            } else {
-                return Lists.newArrayList();
-            }
-
-            return result;
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOGGER.warn("Error processing " + entry.toString());
-            return Lists.newArrayList();
-        }
-    }
-
-    @Override
-    public void prepare(Object o) {
-
-    }
-
-    @Override
-    public void cleanUp() {
-
-    }
-};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
deleted file mode 100644
index 74cce27..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.processor;
-
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.pojo.Delete;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.provider.TwitterEventClassifier;
-import org.apache.streams.twitter.serializer.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Queue;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class TwitterTypeConverter implements StreamsProcessor {
-
-    public final static String STREAMS_ID = "TwitterTypeConverter";
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTypeConverter.class);
-
-    private ObjectMapper mapper;
-
-    private Queue<StreamsDatum> inQueue;
-    private Queue<StreamsDatum> outQueue;
-
-    private Class inClass;
-    private Class outClass;
-
-    private TwitterJsonActivitySerializer twitterJsonActivitySerializer;
-
-    private int count = 0;
-
-    public final static String TERMINATE = new String("TERMINATE");
-
-    public TwitterTypeConverter(Class inClass, Class outClass) {
-        this.inClass = inClass;
-        this.outClass = outClass;
-    }
-
-    public Queue<StreamsDatum> getProcessorOutputQueue() {
-        return outQueue;
-    }
-
-    public void setProcessorInputQueue(Queue<StreamsDatum> inputQueue) {
-        inQueue = inputQueue;
-    }
-
-    public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException {
-
-        Object result = null;
-
-        if( outClass.equals( Activity.class )) {
-            LOGGER.debug("ACTIVITY");
-            result = twitterJsonActivitySerializer.deserialize(
-                    mapper.writeValueAsString(event));
-        } else if( outClass.equals( Tweet.class )) {
-            if ( inClass.equals( Tweet.class )) {
-                LOGGER.debug("TWEET");
-                result = mapper.convertValue(event, Tweet.class);
-            }
-        } else if( outClass.equals( Retweet.class )) {
-            if ( inClass.equals( Retweet.class )) {
-                LOGGER.debug("RETWEET");
-                result = mapper.convertValue(event, Retweet.class);
-            }
-        } else if( outClass.equals( Delete.class )) {
-            if ( inClass.equals( Delete.class )) {
-                LOGGER.debug("DELETE");
-                result = mapper.convertValue(event, Delete.class);
-            }
-        } else if( outClass.equals( ObjectNode.class )) {
-            LOGGER.debug("OBJECTNODE");
-            result = mapper.convertValue(event, ObjectNode.class);
-        }
-
-        // no supported conversion were applied
-        if( result != null ) {
-            count ++;
-            return result;
-        }
-
-        LOGGER.debug("CONVERT FAILED");
-
-        return null;
-
-    }
-
-    public boolean validate(Object document, Class klass) {
-
-        // TODO
-        return true;
-    }
-
-    public boolean isValidJSON(final String json) {
-        boolean valid = false;
-        try {
-            final JsonParser parser = new ObjectMapper().getJsonFactory()
-                    .createJsonParser(json);
-            while (parser.nextToken() != null) {
-            }
-            valid = true;
-        } catch (JsonParseException jpe) {
-            LOGGER.warn("validate: {}", jpe);
-        } catch (IOException ioe) {
-            LOGGER.warn("validate: {}", ioe);
-        }
-
-        return valid;
-    }
-
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-
-        StreamsDatum result = null;
-
-        try {
-
-            Object item = entry.getDocument();
-            ObjectNode node;
-
-            LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
-
-            if( item instanceof String ) {
-
-                // if the target is string, just pass-through
-                if( String.class.equals(outClass)) {
-                    result = entry;
-                }
-                else {
-                    // first check for valid json
-                    node = (ObjectNode)mapper.readTree((String)item);
-
-                    // since data is coming from outside provider, we don't know what type the events are
-                    Class inClass = TwitterEventClassifier.detectClass((String) item);
-
-                    Object out = convert(node, inClass, outClass);
-
-                    if( out != null && validate(out, outClass))
-                        result = new StreamsDatum(out);
-                }
-
-            } else if( item instanceof ObjectNode ) {
-
-                // first check for valid json
-                node = (ObjectNode)mapper.valueToTree(item);
-
-                // since data is coming from outside provider, we don't know what type the events are
-                Class inClass = TwitterEventClassifier.detectClass(mapper.writeValueAsString(item));
-
-                Object out = convert(node, inClass, outClass);
-
-                if( out != null && validate(out, outClass))
-                    result = new StreamsDatum(out);
-
-            }
-
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-
-        if( result != null )
-            return Lists.newArrayList(result);
-        else
-            return Lists.newArrayList();
-    }
-
-    @Override
-    public void prepare(Object o) {
-        mapper = new StreamsTwitterMapper();
-        twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
-    }
-
-    @Override
-    public void cleanUp() {
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
index 432a047..2234739 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
@@ -18,41 +18,38 @@
 
 package org.apache.streams.twitter.provider;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.pojo.*;
 import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
+import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonUserActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonUserstreameventActivitySerializer;
 
 import java.io.IOException;
+import java.io.Serializable;
 
 /**
  * Created by sblackmon on 12/13/13.
  */
-public class TwitterEventClassifier {
+public class TwitterEventClassifier implements Serializable {
 
-    public static Class detectClass( String json ) {
+    private static ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
 
+    public static Class detectClass( String json ) {
         Preconditions.checkNotNull(json);
         Preconditions.checkArgument(StringUtils.isNotEmpty(json));
 
-//        try {
-//            JsonAssert.with(json).assertNull("$.delete");
-//        } catch( AssertionError ae ) {
-//            return Delete.class;
-//        }
-//
-//        try {
-//            JsonAssert.with(json).assertNull("$.retweeted_status");
-//        } catch( AssertionError ae ) {
-//            return Retweet.class;
-//        }
-//
-//        return Tweet.class;
-
         ObjectNode objectNode;
         try {
-            objectNode = (ObjectNode) StreamsTwitterMapper.getInstance().readTree(json);
+            objectNode = (ObjectNode) mapper.readTree(json);
         } catch (IOException e) {
             e.printStackTrace();
             return null;
@@ -72,4 +69,31 @@ public class TwitterEventClassifier {
         else
             return Tweet.class;
     }
+    public static ActivitySerializer bestSerializer( String json ) {
+
+        Preconditions.checkNotNull(json);
+        Preconditions.checkArgument(StringUtils.isNotEmpty(json));
+
+        ObjectNode objectNode;
+        try {
+            objectNode = (ObjectNode) mapper.readTree(json);
+        } catch (IOException e) {
+            e.printStackTrace();
+            return null;
+        }
+
+        if( objectNode.findValue("retweeted_status") != null && objectNode.get("retweeted_status") != null)
+            return TwitterJsonRetweetActivitySerializer.getInstance();
+        else if( objectNode.findValue("delete") != null )
+            return TwitterJsonDeleteActivitySerializer.getInstance();
+//        else if( objectNode.findValue("friends") != null ||
+//                objectNode.findValue("friends_str") != null )
+//            return FriendList.class;
+        else if( objectNode.findValue("target_object") != null )
+            return TwitterJsonUserstreameventActivitySerializer.getInstance();
+        else if ( objectNode.findValue("location") != null && objectNode.findValue("user") == null)
+            return TwitterJsonUserActivitySerializer.getInstance();
+        else
+            return TwitterJsonTweetActivitySerializer.getInstance();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
index 2ed16ba..395bd95 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
@@ -40,16 +40,21 @@ import java.io.IOException;
 
 /**
  * Created by sblackmon on 3/27/14.
+ *
+ * Deprecated: Use StreamsJacksonMapper
  */
+@Deprecated
 public class StreamsTwitterMapper extends StreamsJacksonMapper {
 
-    public static final DateTimeFormatter TWITTER_FORMAT = DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss Z yyyy");
+    public static final String TWITTER_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy";
+
+    public static final DateTimeFormatter TWITTER_FORMATTER = DateTimeFormat.forPattern(TWITTER_FORMAT);
 
     public static final Long getMillis(String dateTime) {
 
         // this function is for pig which doesn't handle exceptions well
         try {
-            Long result = TWITTER_FORMAT.parseMillis(dateTime);
+            Long result = TWITTER_FORMATTER.parseMillis(dateTime);
             return result;
         } catch( Exception e ) {
             return null;
@@ -73,7 +78,7 @@ public class StreamsTwitterMapper extends StreamsJacksonMapper {
                     public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException, JsonProcessingException {
                         DateTime result = null;
                         try {
-                            result = TWITTER_FORMAT.parseDateTime(jpar.getValueAsString());
+                            result = TWITTER_FORMATTER.parseDateTime(jpar.getValueAsString());
                         } catch( Exception e ) { }
                         try {
                             result = RFC3339Utils.getInstance().parseToUTC(jpar.getValueAsString());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
index 5b5be96..d1f0de9 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
@@ -35,10 +35,11 @@ public class TwitterJsonActivitySerializer implements ActivitySerializer<String>
 
     }
 
-    TwitterJsonTweetActivitySerializer tweetActivitySerializer = new TwitterJsonTweetActivitySerializer();
-    TwitterJsonRetweetActivitySerializer retweetActivitySerializer = new TwitterJsonRetweetActivitySerializer();
-    TwitterJsonDeleteActivitySerializer deleteActivitySerializer = new TwitterJsonDeleteActivitySerializer();
-    TwitterJsonUserActivitySerializer userActivitySerializer = new TwitterJsonUserActivitySerializer();
+    private static TwitterJsonActivitySerializer instance = new TwitterJsonActivitySerializer();
+
+    public static TwitterJsonActivitySerializer getInstance() {
+        return instance;
+    }
 
     @Override
     public String serializationFormat() {
@@ -53,18 +54,11 @@ public class TwitterJsonActivitySerializer implements ActivitySerializer<String>
     @Override
     public Activity deserialize(String serialized) throws ActivitySerializerException {
 
-        Class documentSubType = TwitterEventClassifier.detectClass(serialized);
+        ActivitySerializer serializer = TwitterEventClassifier.bestSerializer(serialized);
+        Activity activity = serializer.deserialize(serialized);
 
-        Activity activity;
-        if( documentSubType == Tweet.class )
-            activity = tweetActivitySerializer.deserialize(serialized);
-        else if( documentSubType == Retweet.class )
-            activity = retweetActivitySerializer.deserialize(serialized);
-        else if( documentSubType == Delete.class )
-            activity = deleteActivitySerializer.deserialize(serialized);
-        else if( documentSubType == User.class )
-            activity = userActivitySerializer.deserialize(serialized);
-        else throw new ActivitySerializerException("unrecognized type");
+        if( activity == null )
+            throw new ActivitySerializerException("unrecognized type");
 
         return activity;
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
index cb1618a..b368f71 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
@@ -46,6 +46,12 @@ import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*;
 */
 public class TwitterJsonDeleteActivitySerializer implements ActivitySerializer<String>, Serializable {
 
+    private static TwitterJsonDeleteActivitySerializer instance = new TwitterJsonDeleteActivitySerializer();
+
+    public static TwitterJsonDeleteActivitySerializer getInstance() {
+        return instance;
+    }
+
     @Override
     public String serializationFormat() {
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
index 4f141bb..58cb769 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
@@ -37,6 +37,12 @@ public class TwitterJsonRetweetActivitySerializer implements ActivitySerializer<
 
     }
 
+    private static TwitterJsonRetweetActivitySerializer instance = new TwitterJsonRetweetActivitySerializer();
+
+    public static TwitterJsonRetweetActivitySerializer getInstance() {
+        return instance;
+    }
+
     @Override
     public String serializationFormat() {
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
index 8e58326..e6fc05f 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
@@ -34,6 +34,12 @@ import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*;
 
 public class TwitterJsonTweetActivitySerializer implements ActivitySerializer<String>, Serializable {
 
+    private static TwitterJsonTweetActivitySerializer instance = new TwitterJsonTweetActivitySerializer();
+
+    public static TwitterJsonTweetActivitySerializer getInstance() {
+        return instance;
+    }
+
     @Override
     public String serializationFormat() {
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java
index 2ae5355..1bf935c 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java
@@ -36,6 +36,12 @@ public class TwitterJsonUserActivitySerializer implements ActivitySerializer<Str
 
     public TwitterJsonUserActivitySerializer() {}
 
+    private static TwitterJsonUserActivitySerializer instance = new TwitterJsonUserActivitySerializer();
+
+    public static TwitterJsonUserActivitySerializer getInstance() {
+        return instance;
+    }
+
     @Override
     public String serializationFormat() {
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java
index edaec01..e2832dd 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java
@@ -44,6 +44,12 @@ import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*;
 */
 public class TwitterJsonUserstreameventActivitySerializer implements ActivitySerializer<String> {
 
+    private static TwitterJsonUserstreameventActivitySerializer instance = new TwitterJsonUserstreameventActivitySerializer();
+
+    public static TwitterJsonUserstreameventActivitySerializer getInstance() {
+        return instance;
+    }
+
     @Override
     public String serializationFormat() {
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
index 2d18db9..6b62fe3 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
@@ -21,13 +21,15 @@ package org.apache.streams.twitter.test;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.jackson.TypeConverterProcessor;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.processor.TwitterTypeConverter;
 import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
 import org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer;
 import org.junit.Assert;
@@ -55,14 +57,13 @@ import static org.junit.Assert.assertThat;
 public class SimpleTweetTest {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(SimpleTweetTest.class);
-    private ObjectMapper mapper = StreamsTwitterMapper.getInstance();
+
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
 
     private static final String TWITTER_JSON= "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":410898682356047872,\"id_str\":\"410898682356047872\",\"text\":\"RT @ughhblog: RRome (Brooklyn, NY) \\u2013 MY GIRL http:\\/\\/t.co\\/x6uxX9PLsH via @indierapblog @RRoseRRome\",\"source\":\"\\u003ca href=\\\"https:\\/\\/about.twitter.com\\/products\\/tweetdeck\\\" rel=\\\"nofollow\\\"\\u003eTweetDeck\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":70463906,\"id_str\":\"70463906\",\"name\":\"MHM DESIGNS, LLC\",\"screen_name\":\"MHMDESIGNS\",\"location\":\"Los Angeles New York\",\"url\":\"http:\\/\\/www.mhmdesigns.com\",\"description\":\"Multi Media Made Simple- Web desig, Graphic Design, Internet Marketing, Photography, Video Production and much much more.\",\"protected\":false,\"followers_count\":10,\"friends_coun
 t\":64,\"listed_count\":1,\"created_at\":\"Mon Aug 31 18:31:54 +0000 2009\",\"favourites_count\":0,\"utc_offset\":-28800,\"time_zone\":\"Pacific Time (US & Canada)\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":87,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"9AE4E8\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_background_images\\/33456434\\/body.png\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/33456434\\/body.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/391494416\\/mhm_design_logo__normal.png\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/391494416\\/mhm_design_logo__normal.png\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"BDDCAD\",\"profile_sidebar_fill_color\":\"DDFFCC\",\"profile_text_color\":\"333333\",\"profile_us
 e_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweeted_status\":{\"created_at\":\"Wed Dec 11 10:56:49 +0000 2013\",\"id\":410724848306892800,\"id_str\":\"410724848306892800\",\"text\":\"RRome (Brooklyn, NY) \\u2013 MY GIRL http:\\/\\/t.co\\/x6uxX9PLsH via @indierapblog @RRoseRRome\",\"source\":\"\\u003ca href=\\\"http:\\/\\/twitter.com\\/tweetbutton\\\" rel=\\\"nofollow\\\"\\u003eTweet Button\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":538836510,\"id_str\":\"538836510\",\"name\":\"UGHHBlog\",\"screen_name\":\"ughhblog\",\"location\":\"Los Angeles\",\"url\":\"http:\\/\\/www.undergroundhiphopblog.com\",\"description\":\"http:\\/\\/UN
 DERGROUNDHIPHOPBLOG.com: A top Indie\\/Underground Hip Hop community blog. Submission Email: ughhblog@gmail.com \\/\\/\\/ Official Host: @pawz1\",\"protected\":false,\"followers_count\":2598,\"friends_count\":373,\"listed_count\":25,\"created_at\":\"Wed Mar 28 05:40:49 +0000 2012\",\"favourites_count\":423,\"utc_offset\":-28800,\"time_zone\":\"Pacific Time (US & Canada)\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":9623,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"131516\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_background_images\\/544717772\\/UGHHBlogLogo.jpg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/544717772\\/UGHHBlogLogo.jpg\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/2583702975\\/uas8528qzzdlnsb7igzn_normal.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.tw
 img.com\\/profile_images\\/2583702975\\/uas8528qzzdlnsb7igzn_normal.jpeg\",\"profile_link_color\":\"009999\",\"profile_sidebar_border_color\":\"EEEEEE\",\"profile_sidebar_fill_color\":\"EFEFEF\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":4,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/t.co\\/x6uxX9PLsH\",\"expanded_url\":\"http:\\/\\/indierapblog.com\\/rrome-brooklyn-ny-my-girl\\/\",\"display_url\":\"indierapblog.com\\/rrome-brooklyn\\u2026\",\"indices\":[31,53]}],\"user_mentions\":[{\"screen_name\":\"IndieRapBlog\",\"name\":\"IndieRapBlog.com\",\"id\":922776728,\"id_str\":\"922776728\",\"indices\":[58,71]},{\"screen_name\":\"RRoseRRome\",\"name\":\"RRome\",\"id\":76371478,\"id_str\":\"76
 371478\",\"indices\":[72,83]}]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"lang\":\"en\"},\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/t.co\\/x6uxX9PLsH\",\"expanded_url\":\"http:\\/\\/indierapblog.com\\/rrome-brooklyn-ny-my-girl\\/\",\"display_url\":\"indierapblog.com\\/rrome-brooklyn\\u2026\",\"indices\":[45,67]}],\"user_mentions\":[{\"screen_name\":\"ughhblog\",\"name\":\"UGHHBlog\",\"id\":538836510,\"id_str\":\"538836510\",\"indices\":[3,12]},{\"screen_name\":\"IndieRapBlog\",\"name\":\"IndieRapBlog.com\",\"id\":922776728,\"id_str\":\"922776728\",\"indices\":[72,85]},{\"screen_name\":\"RRoseRRome\",\"name\":\"RRome\",\"id\":76371478,\"id_str\":\"76371478\",\"indices\":[86,97]}]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"}";
 
     private TwitterJsonActivitySerializer twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
 
-
-    //    @Ignore
     @Test
     public void Tests()
     {
@@ -100,7 +101,7 @@ public class SimpleTweetTest {
         }
 
         try {
-            TwitterTypeConverter converter = new TwitterTypeConverter(String.class, Activity.class);
+            TypeConverterProcessor converter = new TypeConverterProcessor(String.class, Activity.class);
             converter.prepare(null);
             converter.process(new StreamsDatum(TWITTER_JSON));
         } catch (Throwable e) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java
index c7f6434..d6af4d9 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java
@@ -21,7 +21,9 @@ package org.apache.streams.twitter.test;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
@@ -52,11 +54,11 @@ import static org.junit.Assert.assertThat;
 public class TweetActivitySerDeTest {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TweetActivitySerDeTest.class);
-    private ObjectMapper mapper = StreamsTwitterMapper.getInstance();
+
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
 
     private TwitterJsonActivitySerializer twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
 
-    //    @Ignore
     @Test
     public void Tests()
     {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
index d0a6714..eba5fd0 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
@@ -22,7 +22,9 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Retweet;
@@ -54,11 +56,11 @@ import static org.junit.Assert.assertThat;
 public class TweetSerDeTest {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TweetSerDeTest.class);
-    private ObjectMapper mapper = StreamsTwitterMapper.getInstance();
+
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
 
     private TwitterJsonActivitySerializer twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
 
-    //    @Ignore
     @Test
     public void Tests()
     {