You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/14 01:28:59 UTC

[6/9] incubator-streams git commit: omni-bus update

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java
index 7d7d547..b258095 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java
@@ -18,7 +18,9 @@
 
 package org.apache.streams.datasift.serializer;
 
-import org.apache.streams.data.ActivitySerializer;
+import com.google.common.base.Preconditions;
+import org.apache.streams.data.ActivityConverter;
+import org.apache.streams.data.DocumentClassifier;
 import org.apache.streams.datasift.Datasift;
 import org.apache.streams.datasift.instagram.Instagram;
 import org.apache.streams.datasift.interaction.Interaction;
@@ -27,27 +29,29 @@ import org.apache.streams.datasift.twitter.Twitter;
 /**
  * Created by sblackmon on 11/6/14.
  */
-public class DatasiftEventClassifier {
+public class DatasiftEventClassifier implements DocumentClassifier {
 
-    public static Class detectClass(Datasift event) {
+    public DatasiftEventClassifier() {
 
-        if(event.getTwitter() != null) {
+    }
+
+    private static DatasiftEventClassifier instance = new DatasiftEventClassifier();
+
+    public static DatasiftEventClassifier getInstance() {
+        return instance;
+    }
+
+    public Class detectClass(Object document) {
+
+        Preconditions.checkArgument(document instanceof Datasift);
+        Datasift datasift = (Datasift)document;
+        if(datasift.getTwitter() != null) {
             return Twitter.class;
-        } else if(event.getInstagram() != null) {
+        } else if(datasift.getInstagram() != null) {
             return Instagram.class;
         } else {
             return Interaction.class;
         }
     }
 
-    public static ActivitySerializer bestSerializer(Datasift event) {
-
-        if(event.getTwitter() != null) {
-            return DatasiftTwitterActivitySerializer.getInstance();
-        } else if(event.getInstagram() != null) {
-            return DatasiftInstagramActivitySerializer.getInstance();
-        } else {
-            return DatasiftInteractionActivitySerializer.getInstance();
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivityConverter.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivityConverter.java
new file mode 100644
index 0000000..eade439
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivityConverter.java
@@ -0,0 +1,124 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.streams.datasift.serializer;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.instagram.From;
+import org.apache.streams.datasift.instagram.Instagram;
+import org.apache.streams.instagram.serializer.util.InstagramActivityUtil;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Actor;
+import org.apache.streams.pojo.json.Image;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
+
+/**
+ *
+ */
+public class DatasiftInstagramActivityConverter extends DatasiftInteractionActivityConverter {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(DatasiftInstagramActivityConverter.class);
+
+    private static DatasiftInstagramActivityConverter instance = new DatasiftInstagramActivityConverter();
+
+    public static DatasiftInstagramActivityConverter getInstance() {
+        return instance;
+    }
+
+    @Override
+    public Activity convert(Datasift event) {
+        Activity activity = super.convert(event);
+
+        Instagram instagram = event.getInstagram();
+
+        activity.setActor(buildActor(event, instagram));
+        activity.setId(formatId(activity.getVerb(), event.getInteraction().getId()));
+        activity.setProvider(InstagramActivityUtil.getProvider());
+        activity.setLinks(getLinks(event.getInstagram()));
+
+        activity.setVerb(selectVerb(event));
+        addInstagramExtensions(activity, instagram);
+        return activity;
+    }
+
+    /**
+     * Gets links from the object
+     * @return
+     */
+    private List<String> getLinks(Instagram instagram) {
+        List<String> result = Lists.newLinkedList();
+        if( instagram.getMedia() != null ) {
+            result.add(instagram.getMedia().getImage());
+            result.add(instagram.getMedia().getVideo());
+        }
+        return result;
+    }
+
+    public Actor buildActor(Datasift event, Instagram instagram) {
+        Actor actor = super.buildActor(event.getInteraction());
+        From user = instagram.getFrom();
+
+        actor.setDisplayName(firstStringIfNotNull(user.getFullName()));
+        actor.setId(formatId(Optional.fromNullable(
+                user.getId())
+                .or(actor.getId())));
+
+        Image profileImage = new Image();
+        String profileUrl = null;
+        profileUrl = user.getProfilePicture();
+        profileImage.setUrl(profileUrl);
+        actor.setImage(profileImage);
+
+        return actor;
+    }
+
+    public void addInstagramExtensions(Activity activity, Instagram instagram) {
+        Map<String, Object> extensions = ensureExtensions(activity);
+        List<String> hashTags;
+        if(instagram.getMedia() != null) {
+            hashTags = instagram.getMedia().getTags();
+            extensions.put("hashtags", hashTags);
+            extensions.put("keywords", activity.getContent());
+        } else {
+            extensions.put("keywords", activity.getContent());
+
+        }
+
+    }
+
+    private String selectVerb(Datasift event) {
+        if( event.getInteraction().getSubtype().equals("like"))
+            return "like";
+        else
+            return "post";
+    }
+
+    public static String formatId(String... idparts) {
+        return Joiner.on(":").join(Lists.asList("id:instagram", idparts));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java
deleted file mode 100644
index d121d65..0000000
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.streams.datasift.serializer;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.instagram.From;
-import org.apache.streams.datasift.instagram.Instagram;
-import org.apache.streams.instagram.serializer.util.InstagramActivityUtil;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.Actor;
-import org.apache.streams.pojo.json.Image;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
-
-/**
- *
- */
-public class DatasiftInstagramActivitySerializer extends DatasiftInteractionActivitySerializer {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(DatasiftInstagramActivitySerializer.class);
-
-    private static DatasiftInstagramActivitySerializer instance = new DatasiftInstagramActivitySerializer();
-
-    public static DatasiftInstagramActivitySerializer getInstance() {
-        return instance;
-    }
-
-    @Override
-    public Activity convert(Datasift event) {
-        Activity activity = super.convert(event);
-
-        Instagram instagram = event.getInstagram();
-
-        activity.setActor(buildActor(event, instagram));
-        activity.setId(formatId(activity.getVerb(), event.getInteraction().getId()));
-        activity.setProvider(InstagramActivityUtil.getProvider());
-        activity.setLinks(getLinks(event.getInstagram()));
-
-        activity.setVerb(selectVerb(event));
-        addInstagramExtensions(activity, instagram);
-        return activity;
-    }
-
-    /**
-     * Gets links from the object
-     * @return
-     */
-    private List<String> getLinks(Instagram instagram) {
-        List<String> result = Lists.newLinkedList();
-        if( instagram.getMedia() != null ) {
-            result.add(instagram.getMedia().getImage());
-            result.add(instagram.getMedia().getVideo());
-        }
-        return result;
-    }
-
-    public Actor buildActor(Datasift event, Instagram instagram) {
-        Actor actor = super.buildActor(event.getInteraction());
-        From user = instagram.getFrom();
-
-        actor.setDisplayName(firstStringIfNotNull(user.getFullName()));
-        actor.setId(formatId(Optional.fromNullable(
-                user.getId())
-                .or(actor.getId())));
-
-        Image profileImage = new Image();
-        String profileUrl = null;
-        profileUrl = user.getProfilePicture();
-        profileImage.setUrl(profileUrl);
-        actor.setImage(profileImage);
-
-        return actor;
-    }
-
-    public void addInstagramExtensions(Activity activity, Instagram instagram) {
-        Map<String, Object> extensions = ensureExtensions(activity);
-        List<String> hashTags;
-        if(instagram.getMedia() != null) {
-            hashTags = instagram.getMedia().getTags();
-            extensions.put("hashtags", hashTags);
-            extensions.put("keywords", activity.getContent());
-        } else {
-            extensions.put("keywords", activity.getContent());
-
-        }
-
-    }
-
-    private String selectVerb(Datasift event) {
-        if( event.getInteraction().getSubtype().equals("like"))
-            return "like";
-        else
-            return "post";
-    }
-
-    public static String formatId(String... idparts) {
-        return Joiner.on(":").join(Lists.asList("id:instagram", idparts));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivityConverter.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivityConverter.java
new file mode 100644
index 0000000..da21006
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivityConverter.java
@@ -0,0 +1,222 @@
+package org.apache.streams.datasift.serializer;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.streams.data.ActivityConverter;
+import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.interaction.Interaction;
+import org.apache.streams.datasift.links.Links;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
+import org.apache.streams.pojo.json.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
+
+/**
+ *
+ */
+public class DatasiftInteractionActivityConverter implements ActivityConverter<Datasift> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(DatasiftInteractionActivityConverter.class);
+
+    private static DatasiftInteractionActivityConverter instance = new DatasiftInteractionActivityConverter();
+
+    public static DatasiftInteractionActivityConverter getInstance() {
+        return instance;
+    }
+
+    ObjectMapper mapper = StreamsDatasiftMapper.getInstance();
+
+    @Override
+    public String serializationFormat() {
+        return "application/json+datasift.com.v1.1";
+    }
+
+    @Override
+    public Datasift serialize(Activity deserialized) {
+        throw new UnsupportedOperationException("Cannot currently serialize to Datasift JSON");
+    }
+
+    public Activity deserialize(String datasiftJson) {
+        try {
+            return deserialize(this.mapper.readValue(datasiftJson, Datasift.class));
+        } catch (Exception e) {
+            LOGGER.error("Exception while trying convert,\n {},\n to a Datasift object.", datasiftJson);
+            LOGGER.error("Exception : {}", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Activity deserialize(Datasift serialized) {
+
+        try {
+
+            Activity activity = convert(serialized);
+
+            return activity;
+
+        } catch (Exception e) {
+            throw new IllegalArgumentException("Unable to deserialize", e);
+        }
+
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<Datasift> datasifts) {
+        List<Activity> activities = Lists.newArrayList();
+        for( Datasift datasift : datasifts ) {
+            activities.add(deserialize(datasift));
+        }
+        return activities;
+    }
+
+    public static Generator buildGenerator(Interaction interaction) {
+        Generator generator = new Generator();
+        generator.setDisplayName(interaction.getSource());
+        generator.setId(interaction.getSource());
+        return generator;
+    }
+
+    public static Icon getIcon(Interaction interaction) {
+        return null;
+    }
+
+    public static Provider buildProvider(Interaction interaction) {
+        Provider provider = new Provider();
+        provider.setId("id:providers:"+interaction.getType());
+        provider.setDisplayName(interaction.getType());
+        return provider;
+    }
+
+    public static String getUrls(Interaction interaction) {
+        return null;
+    }
+
+    public static void addDatasiftExtension(Activity activity, Datasift datasift) {
+        Map<String, Object> extensions = org.apache.streams.data.util.ActivityUtil.ensureExtensions(activity);
+        extensions.put("datasift", datasift);
+    }
+
+    public static String formatId(String... idparts) {
+        return Joiner.on(":").join(Lists.asList("id:datasift", idparts));
+    }
+
+    public Activity convert(Datasift event) {
+
+        Preconditions.checkNotNull(event);
+        Preconditions.checkNotNull(event.getInteraction());
+
+        Activity activity = new Activity();
+        activity.setActor(buildActor(event.getInteraction()));
+        activity.setVerb(selectVerb(event));
+        activity.setObject(buildActivityObject(event.getInteraction()));
+        activity.setId(formatId(activity.getVerb(), event.getInteraction().getId()));
+        activity.setTarget(buildTarget(event.getInteraction()));
+        activity.setPublished(event.getInteraction().getCreatedAt());
+        activity.setGenerator(buildGenerator(event.getInteraction()));
+        activity.setIcon(getIcon(event.getInteraction()));
+        activity.setProvider(buildProvider(event.getInteraction()));
+        activity.setTitle(event.getInteraction().getTitle());
+        activity.setContent(event.getInteraction().getContent());
+        activity.setUrl(event.getInteraction().getLink());
+        activity.setLinks(getLinks(event));
+        addDatasiftExtension(activity, event);
+        if( event.getInteraction().getGeo() != null) {
+            addLocationExtension(activity, event.getInteraction());
+        }
+        return activity;
+    }
+
+    private String selectVerb(Datasift event) {
+        return "post";
+    }
+
+    public Actor buildActor(Interaction interaction) {
+        Actor actor = new Actor();
+        org.apache.streams.datasift.interaction.Author author = interaction.getAuthor();
+        if(author == null) {
+            LOGGER.warn("Interaction does not contain author information.");
+            return actor;
+        }
+        String userName = author.getUsername();
+        String name = author.getName();
+        Long id  = author.getId();
+        if(userName != null) {
+            actor.setDisplayName(userName);
+        } else {
+            actor.setDisplayName(name);
+        }
+
+        if(id != null) {
+            actor.setId(id.toString());
+        } else {
+            if(userName != null)
+                actor.setId(userName);
+            else
+                actor.setId(name);
+        }
+        Image image = new Image();
+        image.setUrl(interaction.getAuthor().getAvatar());
+        actor.setImage(image);
+        if (interaction.getAuthor().getLink()!=null){
+            actor.setUrl(interaction.getAuthor().getLink());
+        }
+        return actor;
+    }
+
+    public static ActivityObject buildActivityObject(Interaction interaction) {
+        ActivityObject actObj = new ActivityObject();
+        actObj.setObjectType(interaction.getContenttype());
+        actObj.setUrl(interaction.getLink());
+        actObj.setId(formatId("post",interaction.getId()));
+        actObj.setContent(interaction.getContent());
+
+        return actObj;
+    }
+
+    public static List<String> getLinks(Datasift event) {
+        List<String> result = Lists.newArrayList();
+        Links links = event.getLinks();
+        if(links == null)
+            return null;
+        for(Object link : links.getNormalizedUrl()) {
+            if(link != null) {
+                if(link instanceof String) {
+                    result.add((String) link);
+                } else {
+                    LOGGER.warn("link is not of type String : {}", link.getClass().getName());
+                }
+            }
+        }
+        return result;
+    }
+
+    public static ActivityObject buildTarget(Interaction interaction) {
+        return null;
+    }
+
+    public static void addLocationExtension(Activity activity, Interaction interaction) {
+        Map<String, Object> extensions = ensureExtensions(activity);
+        Map<String, Object> location = new HashMap<String, Object>();
+        Map<String, Double> coordinates = new HashMap<String, Double>();
+        coordinates.put("latitude", interaction.getGeo().getLatitude());
+        coordinates.put("longitude", interaction.getGeo().getLongitude());
+        location.put("coordinates", coordinates);
+        extensions.put("location", location);
+    }
+
+    public static String firstStringIfNotNull(List<Object> list) {
+        if( list != null && list.size() > 0) {
+            return (String) list.get(0);
+        } else return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializer.java
deleted file mode 100644
index c856dc2..0000000
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializer.java
+++ /dev/null
@@ -1,222 +0,0 @@
-package org.apache.streams.datasift.serializer;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.interaction.Interaction;
-import org.apache.streams.datasift.links.Links;
-import org.apache.streams.datasift.util.StreamsDatasiftMapper;
-import org.apache.streams.pojo.json.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
-
-/**
- *
- */
-public class DatasiftInteractionActivitySerializer implements ActivitySerializer<Datasift>, Serializable {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(DatasiftInteractionActivitySerializer.class);
-
-    private static DatasiftInteractionActivitySerializer instance = new DatasiftInteractionActivitySerializer();
-
-    public static DatasiftInteractionActivitySerializer getInstance() {
-        return instance;
-    }
-
-    ObjectMapper mapper = StreamsDatasiftMapper.getInstance();
-
-    @Override
-    public String serializationFormat() {
-        return "application/json+datasift.com.v1.1";
-    }
-
-    @Override
-    public Datasift serialize(Activity deserialized) {
-        throw new UnsupportedOperationException("Cannot currently serialize to Datasift JSON");
-    }
-
-    public Activity deserialize(String datasiftJson) {
-        try {
-            return deserialize(this.mapper.readValue(datasiftJson, Datasift.class));
-        } catch (Exception e) {
-            LOGGER.error("Exception while trying convert,\n {},\n to a Datasift object.", datasiftJson);
-            LOGGER.error("Exception : {}", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public Activity deserialize(Datasift serialized) {
-
-        try {
-
-            Activity activity = convert(serialized);
-
-            return activity;
-
-        } catch (Exception e) {
-            throw new IllegalArgumentException("Unable to deserialize", e);
-        }
-
-    }
-
-    @Override
-    public List<Activity> deserializeAll(List<Datasift> datasifts) {
-        List<Activity> activities = Lists.newArrayList();
-        for( Datasift datasift : datasifts ) {
-            activities.add(deserialize(datasift));
-        }
-        return activities;
-    }
-
-    public static Generator buildGenerator(Interaction interaction) {
-        Generator generator = new Generator();
-        generator.setDisplayName(interaction.getSource());
-        generator.setId(interaction.getSource());
-        return generator;
-    }
-
-    public static Icon getIcon(Interaction interaction) {
-        return null;
-    }
-
-    public static Provider buildProvider(Interaction interaction) {
-        Provider provider = new Provider();
-        provider.setId("id:providers:"+interaction.getType());
-        provider.setDisplayName(interaction.getType());
-        return provider;
-    }
-
-    public static String getUrls(Interaction interaction) {
-        return null;
-    }
-
-    public static void addDatasiftExtension(Activity activity, Datasift datasift) {
-        Map<String, Object> extensions = org.apache.streams.data.util.ActivityUtil.ensureExtensions(activity);
-        extensions.put("datasift", datasift);
-    }
-
-    public static String formatId(String... idparts) {
-        return Joiner.on(":").join(Lists.asList("id:datasift", idparts));
-    }
-
-    public Activity convert(Datasift event) {
-
-        Preconditions.checkNotNull(event);
-        Preconditions.checkNotNull(event.getInteraction());
-
-        Activity activity = new Activity();
-        activity.setActor(buildActor(event.getInteraction()));
-        activity.setVerb(selectVerb(event));
-        activity.setObject(buildActivityObject(event.getInteraction()));
-        activity.setId(formatId(activity.getVerb(), event.getInteraction().getId()));
-        activity.setTarget(buildTarget(event.getInteraction()));
-        activity.setPublished(event.getInteraction().getCreatedAt());
-        activity.setGenerator(buildGenerator(event.getInteraction()));
-        activity.setIcon(getIcon(event.getInteraction()));
-        activity.setProvider(buildProvider(event.getInteraction()));
-        activity.setTitle(event.getInteraction().getTitle());
-        activity.setContent(event.getInteraction().getContent());
-        activity.setUrl(event.getInteraction().getLink());
-        activity.setLinks(getLinks(event));
-        addDatasiftExtension(activity, event);
-        if( event.getInteraction().getGeo() != null) {
-            addLocationExtension(activity, event.getInteraction());
-        }
-        return activity;
-    }
-
-    private String selectVerb(Datasift event) {
-        return "post";
-    }
-
-    public Actor buildActor(Interaction interaction) {
-        Actor actor = new Actor();
-        org.apache.streams.datasift.interaction.Author author = interaction.getAuthor();
-        if(author == null) {
-            LOGGER.warn("Interaction does not contain author information.");
-            return actor;
-        }
-        String userName = author.getUsername();
-        String name = author.getName();
-        Long id  = author.getId();
-        if(userName != null) {
-            actor.setDisplayName(userName);
-        } else {
-            actor.setDisplayName(name);
-        }
-
-        if(id != null) {
-            actor.setId(id.toString());
-        } else {
-            if(userName != null)
-                actor.setId(userName);
-            else
-                actor.setId(name);
-        }
-        Image image = new Image();
-        image.setUrl(interaction.getAuthor().getAvatar());
-        actor.setImage(image);
-        if (interaction.getAuthor().getLink()!=null){
-            actor.setUrl(interaction.getAuthor().getLink());
-        }
-        return actor;
-    }
-
-    public static ActivityObject buildActivityObject(Interaction interaction) {
-        ActivityObject actObj = new ActivityObject();
-        actObj.setObjectType(interaction.getContenttype());
-        actObj.setUrl(interaction.getLink());
-        actObj.setId(formatId("post",interaction.getId()));
-        actObj.setContent(interaction.getContent());
-
-        return actObj;
-    }
-
-    public static List<String> getLinks(Datasift event) {
-        List<String> result = Lists.newArrayList();
-        Links links = event.getLinks();
-        if(links == null)
-            return null;
-        for(Object link : links.getNormalizedUrl()) {
-            if(link != null) {
-                if(link instanceof String) {
-                    result.add((String) link);
-                } else {
-                    LOGGER.warn("link is not of type String : {}", link.getClass().getName());
-                }
-            }
-        }
-        return result;
-    }
-
-    public static ActivityObject buildTarget(Interaction interaction) {
-        return null;
-    }
-
-    public static void addLocationExtension(Activity activity, Interaction interaction) {
-        Map<String, Object> extensions = ensureExtensions(activity);
-        Map<String, Object> location = new HashMap<String, Object>();
-        Map<String, Double> coordinates = new HashMap<String, Double>();
-        coordinates.put("latitude", interaction.getGeo().getLatitude());
-        coordinates.put("longitude", interaction.getGeo().getLongitude());
-        location.put("coordinates", coordinates);
-        extensions.put("location", location);
-    }
-
-    public static String firstStringIfNotNull(List<Object> list) {
-        if( list != null && list.size() > 0) {
-            return (String) list.get(0);
-        } else return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTwitterActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTwitterActivityConverter.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTwitterActivityConverter.java
new file mode 100644
index 0000000..408e936
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTwitterActivityConverter.java
@@ -0,0 +1,272 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.streams.datasift.serializer;
+
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.interaction.Author;
+import org.apache.streams.datasift.interaction.Interaction;
+import org.apache.streams.datasift.twitter.DatasiftTwitterUser;
+import org.apache.streams.datasift.twitter.Retweet;
+import org.apache.streams.datasift.twitter.Twitter;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Actor;
+import org.apache.streams.pojo.json.Image;
+import org.apache.streams.twitter.serializer.util.TwitterActivityUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
+
+/**
+ *
+ */
+public class DatasiftTwitterActivityConverter extends DatasiftInteractionActivityConverter {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(DatasiftTwitterActivityConverter.class);
+
+    private static DatasiftTwitterActivityConverter instance = new DatasiftTwitterActivityConverter();
+
+    public static DatasiftTwitterActivityConverter getInstance() {
+        return instance;
+    }
+
+    @Override
+    public Activity convert(Datasift event) {
+        Activity activity = new Activity();
+        Twitter twitter = event.getTwitter();
+        boolean retweet = twitter.getRetweet() != null;
+
+        activity.setActor(buildActor(event, twitter)); //TODO
+        if(retweet) {
+            activity.setVerb("share");
+        } else {
+            activity.setVerb("post");
+        }
+        activity.setObject(buildActivityObject(event.getInteraction()));
+        activity.setId(formatId(activity.getVerb(), event.getInteraction().getId()));
+        activity.setTarget(buildTarget(event.getInteraction()));
+        activity.setPublished(event.getInteraction().getCreatedAt());
+        activity.setGenerator(buildGenerator(event.getInteraction()));
+        activity.setIcon(getIcon(event.getInteraction()));
+        activity.setProvider(TwitterActivityUtil.getProvider());
+        activity.setTitle(event.getInteraction().getTitle());
+        activity.setContent(event.getInteraction().getContent());
+        activity.setUrl(event.getInteraction().getLink());
+        if(retweet)
+            activity.setLinks(getLinks(twitter.getRetweet()));
+        else
+            activity.setLinks(getLinks(twitter));
+        addDatasiftExtension(activity, event);
+        if( twitter.getGeo() != null) {
+            addLocationExtension(activity, twitter);
+        }
+        addTwitterExtensions(activity, twitter, event.getInteraction());
+        return activity;
+    }
+
+    /**
+     * Get the links from this tweet as a list
+     * @param twitter
+     * @return the links from the tweet
+     */
+    public List<String> getLinks(Twitter twitter) {
+        return getLinks(twitter.getLinks());
+    }
+
+    /**
+     * Get the links from this tweet as a list
+     * @param retweet
+     * @return the links from the tweet
+     */
+    public List<String> getLinks(Retweet retweet) {
+        return getLinks(retweet.getLinks());
+    }
+
+    /**
+     * Converts the list of objects to a list of strings
+     * @param links
+     * @return
+     */
+    private List<String> getLinks(List<Object> links) {
+        if(links == null)
+            return Lists.newArrayList();
+        List<String> result = Lists.newLinkedList();
+        for(Object obj : links) {
+            if(obj instanceof String) {
+                result.add((String) obj);
+            } else {
+                LOGGER.warn("Links is not instance of String : {}", obj.getClass().getName());
+            }
+        }
+        return result;
+    }
+
+    public Actor buildActor(Datasift event, Twitter twitter) {
+        DatasiftTwitterUser user = twitter.getUser();
+        Actor actor = super.buildActor(event.getInteraction());
+        if(user == null) {
+            user = twitter.getRetweet().getUser();
+        }
+
+        actor.setDisplayName(user.getName());
+        actor.setId(formatId(Optional.fromNullable(
+                user.getIdStr())
+                .or(Optional.of(user.getId().toString()))
+                .orNull()));
+        actor.setSummary(user.getDescription());
+        try {
+            actor.setPublished(user.getCreatedAt());
+        } catch (Exception e) {
+            LOGGER.warn("Exception trying to parse date : {}", e);
+        }
+
+        if(user.getUrl() != null) {
+            actor.setUrl(user.getUrl());
+        }
+
+        Map<String, Object> extensions = new HashMap<String,Object>();
+        extensions.put("location", user.getLocation());
+        extensions.put("posts", user.getStatusesCount());
+        extensions.put("followers", user.getFollowersCount());
+        extensions.put("screenName", user.getScreenName());
+        if(user.getAdditionalProperties() != null) {
+            extensions.put("favorites", user.getFavouritesCount());
+        }
+
+        Image profileImage = new Image();
+        String profileUrl = null;
+        Author author = event.getInteraction().getAuthor();
+        if( author != null )
+            profileUrl = author.getAvatar();
+        if(profileUrl == null && user.getProfileImageUrlHttps() != null) {
+            Object url = user.getProfileImageUrlHttps();
+            if(url instanceof String)
+                profileUrl = (String) url;
+        }
+        if(profileUrl == null) {
+            profileUrl = user.getProfileImageUrl();
+        }
+        profileImage.setUrl(profileUrl);
+        actor.setImage(profileImage);
+
+        actor.setAdditionalProperty("extensions", extensions);
+        return actor;
+    }
+
+    public void addLocationExtension(Activity activity, Twitter twitter) {
+        Map<String, Object> extensions = ensureExtensions(activity);
+        Map<String, Object> location = Maps.newHashMap();
+        double[] coordiantes = new double[] { twitter.getGeo().getLongitude(), twitter.getGeo().getLatitude() };
+        Map<String, Object> coords = Maps.newHashMap();
+        coords.put("coordinates", coordiantes);
+        coords.put("type", "geo_point");
+        location.put("coordinates", coords);
+        extensions.put("location", location);
+    }
+
+    public void addTwitterExtensions(Activity activity, Twitter twitter, Interaction interaction) {
+        Retweet retweet = twitter.getRetweet();
+        Map<String, Object> extensions = ensureExtensions(activity);
+        List<String> hashTags = Lists.newLinkedList();
+        List<Object> hts = Lists.newLinkedList();
+        if(twitter.getHashtags() != null) {
+            hts = twitter.getHashtags();
+        } else if (retweet != null) {
+            hts = retweet.getHashtags();
+        }
+        if(hts != null) {
+            for(Object ht : twitter.getHashtags()) {
+                if(ht instanceof String) {
+                    hashTags.add((String) ht);
+                } else {
+                    LOGGER.warn("Hashtag was not instance of String : {}", ht.getClass().getName());
+                }
+            }
+        }
+        extensions.put("hashtags", hashTags);
+
+
+        if(retweet != null) {
+            Map<String, Object> rebroadcasts = Maps.newHashMap();
+            rebroadcasts.put("perspectival", true);
+            rebroadcasts.put("count", retweet.getCount());
+            extensions.put("rebroadcasts", rebroadcasts);
+        }
+
+        if(interaction.getAdditionalProperties() != null) {
+            ArrayList<Map<String,Object>> userMentions = createUserMentions(interaction);
+
+            if(userMentions.size() > 0)
+                extensions.put("user_mentions", userMentions);
+        }
+
+        extensions.put("keywords", interaction.getContent());
+    }
+
+    /**
+     * Returns an ArrayList of all UserMentions in this interaction
+     * Note: The ID list and the handle lists do not necessarily correspond 1:1 for this provider
+     * If those lists are the same size, then they will be merged into individual UserMention
+     * objects. However, if they are not the same size, a new UserMention object will be created
+     * for each entry in both lists.
+     *
+     * @param interaction
+     * @return
+     */
+    private ArrayList<Map<String,Object>> createUserMentions(Interaction interaction) {
+        ArrayList<String> mentions = (ArrayList<String>) interaction.getAdditionalProperties().get("mentions");
+        ArrayList<Long> mentionIds = (ArrayList<Long>) interaction.getAdditionalProperties().get("mention_ids");
+        ArrayList<Map<String,Object>> userMentions = new ArrayList<Map<String,Object>>();
+
+        if(mentions != null && !mentions.isEmpty()) {
+            for(int x = 0; x < mentions.size(); x ++) {
+                Map<String, Object> actor = new HashMap<String, Object>();
+                actor.put("displayName", mentions.get(x));
+                actor.put("handle", mentions.get(x));
+
+                userMentions.add(actor);
+            }
+        }
+        if(mentionIds != null && !mentionIds.isEmpty()) {
+            for(int x = 0; x < mentionIds.size(); x ++) {
+                Map<String, Object> actor = new HashMap<String, Object>();
+                actor.put("id", "id:twitter:" + mentionIds.get(x));
+
+                userMentions.add(actor);
+            }
+        }
+
+        return userMentions;
+    }
+
+    public static String formatId(String... idparts) {
+        return Joiner.on(":").join(Lists.asList("id:twitter", idparts));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTwitterActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTwitterActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTwitterActivitySerializer.java
deleted file mode 100644
index 8ac84f6..0000000
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTwitterActivitySerializer.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.streams.datasift.serializer;
-
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.interaction.Author;
-import org.apache.streams.datasift.interaction.Interaction;
-import org.apache.streams.datasift.twitter.DatasiftTwitterUser;
-import org.apache.streams.datasift.twitter.Retweet;
-import org.apache.streams.datasift.twitter.Twitter;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.Actor;
-import org.apache.streams.pojo.json.Image;
-import org.apache.streams.twitter.serializer.util.TwitterActivityUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
-
-/**
- *
- */
-public class DatasiftTwitterActivitySerializer extends DatasiftInteractionActivitySerializer {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(DatasiftTwitterActivitySerializer.class);
-
-    private static DatasiftTwitterActivitySerializer instance = new DatasiftTwitterActivitySerializer();
-
-    public static DatasiftTwitterActivitySerializer getInstance() {
-        return instance;
-    }
-
-    @Override
-    public Activity convert(Datasift event) {
-        Activity activity = new Activity();
-        Twitter twitter = event.getTwitter();
-        boolean retweet = twitter.getRetweet() != null;
-
-        activity.setActor(buildActor(event, twitter)); //TODO
-        if(retweet) {
-            activity.setVerb("share");
-        } else {
-            activity.setVerb("post");
-        }
-        activity.setObject(buildActivityObject(event.getInteraction()));
-        activity.setId(formatId(activity.getVerb(), event.getInteraction().getId()));
-        activity.setTarget(buildTarget(event.getInteraction()));
-        activity.setPublished(event.getInteraction().getCreatedAt());
-        activity.setGenerator(buildGenerator(event.getInteraction()));
-        activity.setIcon(getIcon(event.getInteraction()));
-        activity.setProvider(TwitterActivityUtil.getProvider());
-        activity.setTitle(event.getInteraction().getTitle());
-        activity.setContent(event.getInteraction().getContent());
-        activity.setUrl(event.getInteraction().getLink());
-        if(retweet)
-            activity.setLinks(getLinks(twitter.getRetweet()));
-        else
-            activity.setLinks(getLinks(twitter));
-        addDatasiftExtension(activity, event);
-        if( twitter.getGeo() != null) {
-            addLocationExtension(activity, twitter);
-        }
-        addTwitterExtensions(activity, twitter, event.getInteraction());
-        return activity;
-    }
-
-    /**
-     * Get the links from this tweet as a list
-     * @param twitter
-     * @return the links from the tweet
-     */
-    public List<String> getLinks(Twitter twitter) {
-        return getLinks(twitter.getLinks());
-    }
-
-    /**
-     * Get the links from this tweet as a list
-     * @param retweet
-     * @return the links from the tweet
-     */
-    public List<String> getLinks(Retweet retweet) {
-        return getLinks(retweet.getLinks());
-    }
-
-    /**
-     * Converts the list of objects to a list of strings
-     * @param links
-     * @return
-     */
-    private List<String> getLinks(List<Object> links) {
-        if(links == null)
-            return Lists.newArrayList();
-        List<String> result = Lists.newLinkedList();
-        for(Object obj : links) {
-            if(obj instanceof String) {
-                result.add((String) obj);
-            } else {
-                LOGGER.warn("Links is not instance of String : {}", obj.getClass().getName());
-            }
-        }
-        return result;
-    }
-
-    public Actor buildActor(Datasift event, Twitter twitter) {
-        DatasiftTwitterUser user = twitter.getUser();
-        Actor actor = super.buildActor(event.getInteraction());
-        if(user == null) {
-            user = twitter.getRetweet().getUser();
-        }
-
-        actor.setDisplayName(user.getName());
-        actor.setId(formatId(Optional.fromNullable(
-                user.getIdStr())
-                .or(Optional.of(user.getId().toString()))
-                .orNull()));
-        actor.setSummary(user.getDescription());
-        try {
-            actor.setPublished(user.getCreatedAt());
-        } catch (Exception e) {
-            LOGGER.warn("Exception trying to parse date : {}", e);
-        }
-
-        if(user.getUrl() != null) {
-            actor.setUrl(user.getUrl());
-        }
-
-        Map<String, Object> extensions = new HashMap<String,Object>();
-        extensions.put("location", user.getLocation());
-        extensions.put("posts", user.getStatusesCount());
-        extensions.put("followers", user.getFollowersCount());
-        extensions.put("screenName", user.getScreenName());
-        if(user.getAdditionalProperties() != null) {
-            extensions.put("favorites", user.getFavouritesCount());
-        }
-
-        Image profileImage = new Image();
-        String profileUrl = null;
-        Author author = event.getInteraction().getAuthor();
-        if( author != null )
-            profileUrl = author.getAvatar();
-        if(profileUrl == null && user.getProfileImageUrlHttps() != null) {
-            Object url = user.getProfileImageUrlHttps();
-            if(url instanceof String)
-                profileUrl = (String) url;
-        }
-        if(profileUrl == null) {
-            profileUrl = user.getProfileImageUrl();
-        }
-        profileImage.setUrl(profileUrl);
-        actor.setImage(profileImage);
-
-        actor.setAdditionalProperty("extensions", extensions);
-        return actor;
-    }
-
-    public void addLocationExtension(Activity activity, Twitter twitter) {
-        Map<String, Object> extensions = ensureExtensions(activity);
-        Map<String, Object> location = Maps.newHashMap();
-        double[] coordiantes = new double[] { twitter.getGeo().getLongitude(), twitter.getGeo().getLatitude() };
-        Map<String, Object> coords = Maps.newHashMap();
-        coords.put("coordinates", coordiantes);
-        coords.put("type", "geo_point");
-        location.put("coordinates", coords);
-        extensions.put("location", location);
-    }
-
-    public void addTwitterExtensions(Activity activity, Twitter twitter, Interaction interaction) {
-        Retweet retweet = twitter.getRetweet();
-        Map<String, Object> extensions = ensureExtensions(activity);
-        List<String> hashTags = Lists.newLinkedList();
-        List<Object> hts = Lists.newLinkedList();
-        if(twitter.getHashtags() != null) {
-            hts = twitter.getHashtags();
-        } else if (retweet != null) {
-            hts = retweet.getHashtags();
-        }
-        if(hts != null) {
-            for(Object ht : twitter.getHashtags()) {
-                if(ht instanceof String) {
-                    hashTags.add((String) ht);
-                } else {
-                    LOGGER.warn("Hashtag was not instance of String : {}", ht.getClass().getName());
-                }
-            }
-        }
-        extensions.put("hashtags", hashTags);
-
-
-        if(retweet != null) {
-            Map<String, Object> rebroadcasts = Maps.newHashMap();
-            rebroadcasts.put("perspectival", true);
-            rebroadcasts.put("count", retweet.getCount());
-            extensions.put("rebroadcasts", rebroadcasts);
-        }
-
-        if(interaction.getAdditionalProperties() != null) {
-            ArrayList<Map<String,Object>> userMentions = createUserMentions(interaction);
-
-            if(userMentions.size() > 0)
-                extensions.put("user_mentions", userMentions);
-        }
-
-        extensions.put("keywords", interaction.getContent());
-    }
-
-    /**
-     * Returns an ArrayList of all UserMentions in this interaction
-     * Note: The ID list and the handle lists do not necessarily correspond 1:1 for this provider
-     * If those lists are the same size, then they will be merged into individual UserMention
-     * objects. However, if they are not the same size, a new UserMention object will be created
-     * for each entry in both lists.
-     *
-     * @param interaction
-     * @return
-     */
-    private ArrayList<Map<String,Object>> createUserMentions(Interaction interaction) {
-        ArrayList<String> mentions = (ArrayList<String>) interaction.getAdditionalProperties().get("mentions");
-        ArrayList<Long> mentionIds = (ArrayList<Long>) interaction.getAdditionalProperties().get("mention_ids");
-        ArrayList<Map<String,Object>> userMentions = new ArrayList<Map<String,Object>>();
-
-        if(mentions != null && !mentions.isEmpty()) {
-            for(int x = 0; x < mentions.size(); x ++) {
-                Map<String, Object> actor = new HashMap<String, Object>();
-                actor.put("displayName", mentions.get(x));
-                actor.put("handle", mentions.get(x));
-
-                userMentions.add(actor);
-            }
-        }
-        if(mentionIds != null && !mentionIds.isEmpty()) {
-            for(int x = 0; x < mentionIds.size(); x ++) {
-                Map<String, Object> actor = new HashMap<String, Object>();
-                actor.put("id", "id:twitter:" + mentionIds.get(x));
-
-                userMentions.add(actor);
-            }
-        }
-
-        return userMentions;
-    }
-
-    public static String formatId(String... idparts) {
-        return Joiner.on(":").join(Lists.asList("id:twitter", idparts));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-datasift/src/test/java17/com/datasift/test/DatasiftSerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/com/datasift/test/DatasiftSerDeTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/com/datasift/test/DatasiftSerDeTest.java
index 750915e..aa6a587 100644
--- a/streams-contrib/streams-provider-datasift/src/test/java17/com/datasift/test/DatasiftSerDeTest.java
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/com/datasift/test/DatasiftSerDeTest.java
@@ -43,7 +43,7 @@ public class DatasiftSerDeTest {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftSerDeTest.class);
 
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsDatasiftMapper.DATASIFT_FORMAT));
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance(StreamsDatasiftMapper.DATASIFT_FORMAT);
 
     @Test
     public void Tests()

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivityConverterTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivityConverterTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivityConverterTest.java
new file mode 100644
index 0000000..057875f
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivityConverterTest.java
@@ -0,0 +1,85 @@
+package org.apache.streams.datasift.serializer;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.data.ActivityConverter;
+import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Actor;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Scanner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class DatasiftActivityConverterTest {
+
+    protected ActivityConverter SERIALIZER;
+
+    protected static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(StreamsDatasiftMapper.DATASIFT_FORMAT);
+
+    @Before
+    public void initSerializer() {
+        SERIALIZER = new DatasiftActivityConverter();
+    }
+
+    @Test
+    public void testConversion() throws Exception {
+        Scanner scanner = new Scanner(DatasiftActivityConverterTest.class.getResourceAsStream("/rand_sample_datasift_json.txt"));
+        String line = null;
+        while(scanner.hasNextLine()) {
+            try {
+                line = scanner.nextLine();
+                Datasift item = MAPPER.readValue(line, Datasift.class);
+                testConversion(item);
+                String json = MAPPER.writeValueAsString(item);
+                testDeserNoNull(json);
+                testDeserNoAddProps(json);
+            } catch (Exception e) {
+                System.err.println(line);
+                throw e;
+            }
+        }
+    }
+
+    /**
+     * Test that the minimum number of things that an activity has
+     * @param item
+     */
+    protected void testConversion(Datasift item) throws Exception {
+        Activity activity = SERIALIZER.deserialize(item);
+        assertNotNull("activity.id", activity.getId());
+        assertNotNull("activity.published", activity.getPublished());
+        assertNotNull("activity.provider", activity.getProvider());
+        assertNotNull("activity.url", activity.getUrl());
+        assertNotNull("activity.verb", activity.getVerb());
+        Actor actor = activity.getActor();
+        assertNotNull("activity.actor", actor);
+    }
+
+    /**
+     * Test that null fields are not present
+     * @param json
+     */
+    protected void testDeserNoNull(String json) throws Exception {
+        int nulls = StringUtils.countMatches(json, ":null");
+        assertEquals(0l, (long)nulls);
+
+    }
+
+    /**
+     * Test that null fields are not present
+     * @param json
+     */
+    protected void testDeserNoAddProps(String json) throws Exception {
+        int nulls = StringUtils.countMatches(json, "additionalProperties:{");
+        assertEquals(0l, (long)nulls);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
deleted file mode 100644
index 8f7ad43..0000000
--- a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-package org.apache.streams.datasift.serializer;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.util.StreamsDatasiftMapper;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.Actor;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Scanner;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-public class DatasiftActivitySerializerTest {
-
-    protected ActivitySerializer SERIALIZER;
-
-    protected static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsDatasiftMapper.DATASIFT_FORMAT));
-
-    @Before
-    public void initSerializer() {
-        SERIALIZER = new DatasiftActivitySerializer();
-    }
-
-    @Test
-    public void testConversion() throws Exception {
-        Scanner scanner = new Scanner(DatasiftActivitySerializerTest.class.getResourceAsStream("/rand_sample_datasift_json.txt"));
-        String line = null;
-        while(scanner.hasNextLine()) {
-            try {
-                line = scanner.nextLine();
-                Datasift item = MAPPER.readValue(line, Datasift.class);
-                testConversion(item);
-                String json = MAPPER.writeValueAsString(item);
-                testDeserNoNull(json);
-                testDeserNoAddProps(json);
-            } catch (Exception e) {
-                System.err.println(line);
-                throw e;
-            }
-        }
-    }
-
-    /**
-     * Test that the minimum number of things that an activity has
-     * @param item
-     */
-    protected void testConversion(Datasift item) throws Exception {
-        Activity activity = SERIALIZER.deserialize(item);
-        assertNotNull("activity.id", activity.getId());
-        assertNotNull("activity.published", activity.getPublished());
-        assertNotNull("activity.provider", activity.getProvider());
-        assertNotNull("activity.url", activity.getUrl());
-        assertNotNull("activity.verb", activity.getVerb());
-        Actor actor = activity.getActor();
-        assertNotNull("activity.actor", actor);
-    }
-
-    /**
-     * Test that null fields are not present
-     * @param json
-     */
-    protected void testDeserNoNull(String json) throws Exception {
-        int nulls = StringUtils.countMatches(json, ":null");
-        assertEquals(0l, (long)nulls);
-
-    }
-
-    /**
-     * Test that null fields are not present
-     * @param json
-     */
-    protected void testDeserNoAddProps(String json) throws Exception {
-        int nulls = StringUtils.countMatches(json, "additionalProperties:{");
-        assertEquals(0l, (long)nulls);
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java
index fda57c4..48aaeef 100644
--- a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java
@@ -20,11 +20,14 @@ package org.apache.streams.datasift.serializer;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
+import org.apache.streams.data.ActivityConverter;
+import org.apache.streams.data.ActivityConverterFactory;
 import org.apache.streams.datasift.Datasift;
 import org.apache.streams.datasift.instagram.Instagram;
 import org.apache.streams.datasift.twitter.Twitter;
 import org.apache.streams.datasift.util.StreamsDatasiftMapper;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
 import org.junit.Test;
 
 import java.util.Scanner;
@@ -38,25 +41,33 @@ public class DatasiftEventClassifierTest {
 
     @Test
     public void testTwitterDetection() throws Exception {
-        Scanner scanner = new Scanner(DatasiftActivitySerializerTest.class.getResourceAsStream("/twitter_datasift_json.txt"));
+        Scanner scanner = new Scanner(DatasiftActivityConverterTest.class.getResourceAsStream("/twitter_datasift_json.txt"));
         String line = null;
         while(scanner.hasNextLine()) {
             line = scanner.nextLine();
             Datasift datasift = MAPPER.readValue(line, Datasift.class);
-            assert(DatasiftEventClassifier.detectClass(datasift) == Twitter.class);
-            assert(DatasiftEventClassifier.bestSerializer(datasift) instanceof DatasiftTwitterActivitySerializer);
+            Class detectedClass = DatasiftEventClassifier.getInstance().detectClass(datasift);
+            assert(detectedClass == Twitter.class);
+            Class converterClass = DatasiftConverterResolver.getInstance().bestSerializer(detectedClass);
+            assert(converterClass == DatasiftTwitterActivityConverter.class);
+            ActivityConverter detectedConverter = ActivityConverterFactory.getInstance(converterClass);
+            assert(detectedConverter instanceof DatasiftTwitterActivityConverter);
         }
     }
 
     @Test
     public void testInstagramDetection() throws Exception {
-        Scanner scanner = new Scanner(DatasiftActivitySerializerTest.class.getResourceAsStream("/instagram_datasift_json.txt"));
+        Scanner scanner = new Scanner(DatasiftActivityConverterTest.class.getResourceAsStream("/instagram_datasift_json.txt"));
         String line = null;
         while(scanner.hasNextLine()) {
             line = scanner.nextLine();
             Datasift datasift = MAPPER.readValue(line, Datasift.class);
-            assert(DatasiftEventClassifier.detectClass(datasift) == Instagram.class);
-            assert(DatasiftEventClassifier.bestSerializer(datasift) instanceof DatasiftInstagramActivitySerializer);
+            Class detectedClass = DatasiftEventClassifier.getInstance().detectClass(datasift);
+            assert(detectedClass == Instagram.class);
+            Class converterClass = DatasiftConverterResolver.getInstance().bestSerializer(detectedClass);
+            assert(converterClass == DatasiftInstagramActivityConverter.class);
+            ActivityConverter detectedConverter = ActivityConverterFactory.getInstance(converterClass);
+            assert(detectedConverter instanceof DatasiftInstagramActivityConverter);
         }
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftInstagramActivityConverterTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftInstagramActivityConverterTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftInstagramActivityConverterTest.java
new file mode 100644
index 0000000..8199319
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftInstagramActivityConverterTest.java
@@ -0,0 +1,35 @@
+package org.apache.streams.datasift.serializer;
+
+import org.apache.streams.datasift.Datasift;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Scanner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class DatasiftInstagramActivityConverterTest extends DatasiftActivityConverterTest {
+
+    @Before
+    @Override
+    public void initSerializer() {
+        SERIALIZER = new DatasiftInstagramActivityConverter();
+    }
+
+    @Test
+    @Override
+    public void testConversion() throws Exception {
+        Scanner scanner = new Scanner(DatasiftActivityConverterTest.class.getResourceAsStream("/instagram_datasift_json.txt"));
+        String line = null;
+        while(scanner.hasNextLine()) {
+            line = scanner.nextLine();
+            Datasift item = MAPPER.readValue(line, Datasift.class);
+            testConversion(item);
+            String json = MAPPER.writeValueAsString(item);
+            testDeserNoNull(json);
+            testDeserNoAddProps(json);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializerTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializerTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializerTest.java
deleted file mode 100644
index 5350d74..0000000
--- a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializerTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.apache.streams.datasift.serializer;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.util.StreamsDatasiftMapper;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.Actor;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Scanner;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-public class DatasiftInstagramActivitySerializerTest extends DatasiftActivitySerializerTest {
-
-    @Before
-    @Override
-    public void initSerializer() {
-        SERIALIZER = new DatasiftInstagramActivitySerializer();
-    }
-
-    @Test
-    @Override
-    public void testConversion() throws Exception {
-        Scanner scanner = new Scanner(DatasiftActivitySerializerTest.class.getResourceAsStream("/instagram_datasift_json.txt"));
-        String line = null;
-        while(scanner.hasNextLine()) {
-            line = scanner.nextLine();
-            Datasift item = MAPPER.readValue(line, Datasift.class);
-            testConversion(item);
-            String json = MAPPER.writeValueAsString(item);
-            testDeserNoNull(json);
-            testDeserNoAddProps(json);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftInteractionActivityConverterTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftInteractionActivityConverterTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftInteractionActivityConverterTest.java
new file mode 100644
index 0000000..c79db35
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftInteractionActivityConverterTest.java
@@ -0,0 +1,40 @@
+package org.apache.streams.datasift.serializer;
+
+import org.apache.streams.datasift.Datasift;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Scanner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class DatasiftInteractionActivityConverterTest extends DatasiftActivityConverterTest {
+
+    @Before
+    @Override
+    public void initSerializer() {
+        SERIALIZER = new DatasiftInteractionActivityConverter();
+    }
+
+    @Test
+    @Override
+    public void testConversion() throws Exception {
+        Scanner scanner = new Scanner(DatasiftInteractionActivityConverterTest.class.getResourceAsStream("/rand_sample_datasift_json.txt"));
+        String line = null;
+        while(scanner.hasNextLine()) {
+            try {
+                line = scanner.nextLine();
+                Datasift item = MAPPER.readValue(line, Datasift.class);
+                testConversion(item);
+                String json = MAPPER.writeValueAsString(item);
+                testDeserNoNull(json);
+                testDeserNoAddProps(json);
+            } catch (Exception e) {
+                System.err.println(line);
+                throw e;
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializerTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializerTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializerTest.java
deleted file mode 100644
index 21d4ebb..0000000
--- a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializerTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package org.apache.streams.datasift.serializer;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.util.StreamsDatasiftMapper;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.Actor;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Scanner;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-public class DatasiftInteractionActivitySerializerTest extends DatasiftActivitySerializerTest {
-
-    @Before
-    @Override
-    public void initSerializer() {
-        SERIALIZER = new DatasiftInteractionActivitySerializer();
-    }
-
-    @Test
-    @Override
-    public void testConversion() throws Exception {
-        Scanner scanner = new Scanner(DatasiftInteractionActivitySerializerTest.class.getResourceAsStream("/rand_sample_datasift_json.txt"));
-        String line = null;
-        while(scanner.hasNextLine()) {
-            try {
-                line = scanner.nextLine();
-                Datasift item = MAPPER.readValue(line, Datasift.class);
-                testConversion(item);
-                String json = MAPPER.writeValueAsString(item);
-                testDeserNoNull(json);
-                testDeserNoAddProps(json);
-            } catch (Exception e) {
-                System.err.println(line);
-                throw e;
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftTwitterActivityConverterTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftTwitterActivityConverterTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftTwitterActivityConverterTest.java
new file mode 100644
index 0000000..3b123ec
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftTwitterActivityConverterTest.java
@@ -0,0 +1,35 @@
+package org.apache.streams.datasift.serializer;
+
+import org.apache.streams.datasift.Datasift;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Scanner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class DatasiftTwitterActivityConverterTest extends DatasiftActivityConverterTest {
+
+    @Before
+    @Override
+    public void initSerializer() {
+        SERIALIZER = new DatasiftTwitterActivityConverter();
+    }
+
+    @Test
+    @Override
+    public void testConversion() throws Exception {
+        Scanner scanner = new Scanner(DatasiftTwitterActivityConverterTest.class.getResourceAsStream("/twitter_datasift_json.txt"));
+        String line = null;
+        while(scanner.hasNextLine()) {
+            line = scanner.nextLine();
+            Datasift item = MAPPER.readValue(line, Datasift.class);
+            testConversion(item);
+            String json = MAPPER.writeValueAsString(item);
+            testDeserNoNull(json);
+            testDeserNoAddProps(json);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftTwitterActivitySerializerTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftTwitterActivitySerializerTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftTwitterActivitySerializerTest.java
deleted file mode 100644
index 33b1f77..0000000
--- a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftTwitterActivitySerializerTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.apache.streams.datasift.serializer;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.util.StreamsDatasiftMapper;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.Actor;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Scanner;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-public class DatasiftTwitterActivitySerializerTest extends DatasiftActivitySerializerTest {
-
-    @Before
-    @Override
-    public void initSerializer() {
-        SERIALIZER = new DatasiftTwitterActivitySerializer();
-    }
-
-    @Test
-    @Override
-    public void testConversion() throws Exception {
-        Scanner scanner = new Scanner(DatasiftTwitterActivitySerializerTest.class.getResourceAsStream("/twitter_datasift_json.txt"));
-        String line = null;
-        while(scanner.hasNextLine()) {
-            line = scanner.nextLine();
-            Datasift item = MAPPER.readValue(line, Datasift.class);
-            testConversion(item);
-            String json = MAPPER.writeValueAsString(item);
-            testDeserNoNull(json);
-            testDeserNoAddProps(json);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivityConverter.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivityConverter.java
new file mode 100644
index 0000000..9f74909
--- /dev/null
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivityConverter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.facebook.api;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivityConverter;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.facebook.serializer.FacebookActivityUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.*;
+import org.apache.streams.facebook.Page;
+
+import java.util.List;
+
+/**
+ * Serializes activity posts
+ *   sblackmon: This class needs a rewrite
+ */
+public class FacebookPageActivityConverter implements ActivityConverter<Page> {
+
+    public FacebookPageActivityConverter() {
+
+    }
+
+    private static FacebookPageActivityConverter instance = new FacebookPageActivityConverter();
+
+    public static FacebookPageActivityConverter getInstance() {
+        return instance;
+    }
+
+    public static ObjectMapper mapper;
+    static {
+        mapper = StreamsJacksonMapper.getInstance();
+    }
+
+    @Override
+    public String serializationFormat() {
+        return "facebook_post_json_v1";
+    }
+
+    @Override
+    public Page serialize(Activity deserialized) throws ActivitySerializerException {
+        throw new NotImplementedException("Not currently supported by this deserializer");
+    }
+
+    @Override
+    public Activity deserialize(Page page) throws ActivitySerializerException {
+        Activity activity = new Activity();
+
+        FacebookActivityUtil.updateActivity(page, activity);
+
+        return activity;
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<Page> serializedList) {
+        throw new NotImplementedException("Not currently supported by this deserializer");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivitySerializer.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivitySerializer.java
deleted file mode 100644
index f59ab1e..0000000
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivitySerializer.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.facebook.api;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.facebook.serializer.FacebookActivityUtil;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.*;
-import org.apache.streams.facebook.Page;
-
-import java.util.List;
-
-/**
- * Serializes activity posts
- *   sblackmon: This class needs a rewrite
- */
-public class FacebookPageActivitySerializer implements ActivitySerializer<Page> {
-
-    public static ObjectMapper mapper;
-    static {
-        mapper = StreamsJacksonMapper.getInstance();
-    }
-
-    @Override
-    public String serializationFormat() {
-        return "facebook_post_json_v1";
-    }
-
-    @Override
-    public Page serialize(Activity deserialized) throws ActivitySerializerException {
-        throw new NotImplementedException("Not currently supported by this deserializer");
-    }
-
-    @Override
-    public Activity deserialize(Page page) throws ActivitySerializerException {
-        Activity activity = new Activity();
-
-        FacebookActivityUtil.updateActivity(page, activity);
-
-        return activity;
-    }
-
-    @Override
-    public List<Activity> deserializeAll(List<Page> serializedList) {
-        throw new NotImplementedException("Not currently supported by this deserializer");
-    }
-}
\ No newline at end of file