You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/07 00:29:19 UTC

[2/2] incubator-streams git commit: use StreamsJacksonMapper with channel-specific date support deprecate StreamsTwitterMapper deprecate StreamsDatasiftMapper use generic TypeConverterProcessor deprecate/delete now unnecessary classes refactor event clas

use StreamsJacksonMapper with channel-specific date support
deprecate StreamsTwitterMapper
deprecate StreamsDatasiftMapper
use generic TypeConverterProcessor
deprecate/delete now unnecessary classes
refactor event classification
add event classification tests
refactor channel serialization
update tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2f6a6574
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2f6a6574
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2f6a6574

Branch: refs/heads/STREAMS-212
Commit: 2f6a6574e1a3d87fd6ae1457f35ce663fc914173
Parents: 9aebd0b
Author: sblackmon <sb...@apache.org>
Authored: Thu Nov 6 15:27:52 2014 -0800
Committer: sblackmon <sb...@apache.org>
Committed: Thu Nov 6 15:27:52 2014 -0800

----------------------------------------------------------------------
 .../DatasiftTypeConverterProcessor.java         |   7 +-
 .../serializer/DatasiftActivitySerializer.java  |  12 +-
 .../DatasiftDefaultActivitySerializer.java      | 214 -------------------
 .../DatasiftInstagramActivitySerializer.java    |   8 +-
 .../DatasiftTweetActivitySerializer.java        |   8 +-
 .../datasift/util/StreamsDatasiftMapper.java    |  11 +-
 .../com/datasift/test/DatasiftSerDeTest.java    |  18 +-
 .../DatasiftTypeConverterProcessorTest.java     |  72 -------
 .../DatasiftActivitySerializerTest.java         |   6 +-
 .../serializer/DatasiftEventClassifierTest.java |  76 +++++++
 .../streams-provider-twitter/pom.xml            |   6 +
 .../processor/TwitterEventProcessor.java        | 194 -----------------
 .../processor/TwitterProfileProcessor.java      | 133 ------------
 .../twitter/processor/TwitterTypeConverter.java | 209 ------------------
 .../provider/TwitterEventClassifier.java        |  58 +++--
 .../serializer/StreamsTwitterMapper.java        |  11 +-
 .../TwitterJsonActivitySerializer.java          |  24 +--
 .../TwitterJsonDeleteActivitySerializer.java    |   6 +
 .../TwitterJsonRetweetActivitySerializer.java   |   6 +
 .../TwitterJsonTweetActivitySerializer.java     |   6 +
 .../TwitterJsonUserActivitySerializer.java      |   6 +
 ...erJsonUserstreameventActivitySerializer.java |   6 +
 .../streams/twitter/test/SimpleTweetTest.java   |  11 +-
 .../twitter/test/TweetActivitySerDeTest.java    |   6 +-
 .../streams/twitter/test/TweetSerDeTest.java    |   6 +-
 .../test/TwitterEventClassifierTest.java        |  34 +++
 26 files changed, 261 insertions(+), 893 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
index a00cf23..1166b2e 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
@@ -57,7 +57,12 @@ public class DatasiftTypeConverterProcessor implements StreamsProcessor {
         List<StreamsDatum> result = Lists.newLinkedList();
         Object doc;
         try {
-            doc = this.converter.convert(entry.getDocument(), this.mapper);
+            if( entry.getDocument() instanceof String ) {
+                ObjectNode node = this.mapper.readValue((String)entry.getDocument(), ObjectNode.class);
+                doc = this.converter.convert(node, this.mapper);
+            } else {
+                doc = this.converter.convert(entry.getDocument(), this.mapper);
+            }
             if(doc != null) {
                 result.add(new StreamsDatum(doc, entry.getId()));
             }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
index 0ada979..b587cd6 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
@@ -32,9 +32,6 @@ import java.util.List;
  */
 public class DatasiftActivitySerializer implements ActivitySerializer<Datasift> {
 
-    private static final DatasiftDefaultActivitySerializer DEFAULT_SERIALIZER = new DatasiftDefaultActivitySerializer();
-    private static final DatasiftTweetActivitySerializer TWITTER_SERIALIZER = new DatasiftTweetActivitySerializer();
-    private static final DatasiftInstagramActivitySerializer INSTAGRAM_SERIALIZER = new DatasiftInstagramActivitySerializer();
     private static final ObjectMapper MAPPER = StreamsDatasiftMapper.getInstance();
 
     @Override
@@ -49,13 +46,8 @@ public class DatasiftActivitySerializer implements ActivitySerializer<Datasift>
 
     @Override
     public Activity deserialize(Datasift serialized) throws ActivitySerializerException {
-        if(serialized.getTwitter() != null) {
-            return TWITTER_SERIALIZER.deserialize(serialized);
-        } else if(serialized.getInstagram() != null) {
-            return INSTAGRAM_SERIALIZER.deserialize(serialized);
-        } else {
-            return DEFAULT_SERIALIZER.deserialize(serialized);
-        }
+        ActivitySerializer serializer = DatasiftEventClassifier.bestSerializer(serialized);
+        return serializer.deserialize(serialized);
     }
 
     public Activity deserialize(String json) throws ActivitySerializerException {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
deleted file mode 100644
index 678f67b..0000000
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
+++ /dev/null
@@ -1,214 +0,0 @@
-package org.apache.streams.datasift.serializer;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.interaction.Interaction;
-import org.apache.streams.datasift.links.Links;
-import org.apache.streams.datasift.util.StreamsDatasiftMapper;
-import org.apache.streams.pojo.json.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
-
-/**
- *
- */
-public class DatasiftDefaultActivitySerializer implements ActivitySerializer<Datasift>, Serializable {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(DatasiftDefaultActivitySerializer.class);
-
-    public static final String DATE_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy";
-
-    ObjectMapper mapper = StreamsDatasiftMapper.getInstance();
-
-    @Override
-    public String serializationFormat() {
-        return "application/json+datasift.com.v1.1";
-    }
-
-    @Override
-    public Datasift serialize(Activity deserialized) {
-        throw new UnsupportedOperationException("Cannot currently serialize to Datasift JSON");
-    }
-
-    public Activity deserialize(String datasiftJson) {
-        try {
-            return deserialize(this.mapper.readValue(datasiftJson, Datasift.class));
-        } catch (Exception e) {
-            LOGGER.error("Exception while trying convert,\n {},\n to a Datasift object.", datasiftJson);
-            LOGGER.error("Exception : {}", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public Activity deserialize(Datasift serialized) {
-
-        try {
-
-            Activity activity = convert(serialized);
-
-            return activity;
-
-        } catch (Exception e) {
-            throw new IllegalArgumentException("Unable to deserialize", e);
-        }
-
-    }
-
-    @Override
-    public List<Activity> deserializeAll(List<Datasift> datasifts) {
-        List<Activity> activities = Lists.newArrayList();
-        for( Datasift datasift : datasifts ) {
-            activities.add(deserialize(datasift));
-        }
-        return activities;
-    }
-
-    public static Generator buildGenerator(Interaction interaction) {
-        Generator generator = new Generator();
-        generator.setDisplayName(interaction.getSource());
-        generator.setId(interaction.getSource());
-        return generator;
-    }
-
-    public static Icon getIcon(Interaction interaction) {
-        return null;
-    }
-
-    public static Provider buildProvider(Interaction interaction) {
-        Provider provider = new Provider();
-        provider.setId("id:providers:"+interaction.getType());
-        provider.setDisplayName(interaction.getType());
-        return provider;
-    }
-
-    public static String getUrls(Interaction interaction) {
-        return null;
-    }
-
-    public static void addDatasiftExtension(Activity activity, Datasift datasift) {
-        Map<String, Object> extensions = org.apache.streams.data.util.ActivityUtil.ensureExtensions(activity);
-        extensions.put("datasift", datasift);
-    }
-
-    public static String formatId(String... idparts) {
-        return Joiner.on(":").join(Lists.asList("id:datasift", idparts));
-    }
-
-    public Activity convert(Datasift event) {
-
-        Activity activity = new Activity();
-        activity.setActor(buildActor(event.getInteraction()));
-        activity.setVerb(selectVerb(event));
-        activity.setObject(buildActivityObject(event.getInteraction()));
-        activity.setId(formatId(activity.getVerb(), event.getInteraction().getId()));
-        activity.setTarget(buildTarget(event.getInteraction()));
-        activity.setPublished(event.getInteraction().getCreatedAt());
-        activity.setGenerator(buildGenerator(event.getInteraction()));
-        activity.setIcon(getIcon(event.getInteraction()));
-        activity.setProvider(buildProvider(event.getInteraction()));
-        activity.setTitle(event.getInteraction().getTitle());
-        activity.setContent(event.getInteraction().getContent());
-        activity.setUrl(event.getInteraction().getLink());
-        activity.setLinks(getLinks(event));
-        addDatasiftExtension(activity, event);
-        if( event.getInteraction().getGeo() != null) {
-            addLocationExtension(activity, event.getInteraction());
-        }
-        return activity;
-    }
-
-    private String selectVerb(Datasift event) {
-        return "post";
-    }
-
-    public Actor buildActor(Interaction interaction) {
-        Actor actor = new Actor();
-        org.apache.streams.datasift.interaction.Author author = interaction.getAuthor();
-        if(author == null) {
-            LOGGER.warn("Interaction does not contain author information.");
-            return actor;
-        }
-        String userName = author.getUsername();
-        String name = author.getName();
-        Long id  = author.getId();
-        if(userName != null) {
-            actor.setDisplayName(userName);
-        } else {
-            actor.setDisplayName(name);
-        }
-
-        if(id != null) {
-            actor.setId(id.toString());
-        } else {
-            if(userName != null)
-                actor.setId(userName);
-            else
-                actor.setId(name);
-        }
-        Image image = new Image();
-        image.setUrl(interaction.getAuthor().getAvatar());
-        actor.setImage(image);
-        if (interaction.getAuthor().getLink()!=null){
-            actor.setUrl(interaction.getAuthor().getLink());
-        }
-        return actor;
-    }
-
-    public static ActivityObject buildActivityObject(Interaction interaction) {
-        ActivityObject actObj = new ActivityObject();
-        actObj.setObjectType(interaction.getContenttype());
-        actObj.setUrl(interaction.getLink());
-        actObj.setId(formatId("post",interaction.getId()));
-        actObj.setContent(interaction.getContent());
-
-        return actObj;
-    }
-
-    public static List<String> getLinks(Datasift event) {
-        List<String> result = Lists.newArrayList();
-        Links links = event.getLinks();
-        if(links == null)
-            return null;
-        for(Object link : links.getNormalizedUrl()) {
-            if(link != null) {
-                if(link instanceof String) {
-                    result.add((String) link);
-                } else {
-                    LOGGER.warn("link is not of type String : {}", link.getClass().getName());
-                }
-            }
-        }
-        return result;
-    }
-
-    public static ActivityObject buildTarget(Interaction interaction) {
-        return null;
-    }
-
-    public static void addLocationExtension(Activity activity, Interaction interaction) {
-        Map<String, Object> extensions = ensureExtensions(activity);
-        Map<String, Object> location = new HashMap<String, Object>();
-        Map<String, Double> coordinates = new HashMap<String, Double>();
-        coordinates.put("latitude", interaction.getGeo().getLatitude());
-        coordinates.put("longitude", interaction.getGeo().getLongitude());
-        location.put("coordinates", coordinates);
-        extensions.put("location", location);
-    }
-
-    public static String firstStringIfNotNull(List<Object> list) {
-        if( list != null && list.size() > 0) {
-            return (String) list.get(0);
-        } else return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java
index cb44df2..d121d65 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java
@@ -39,10 +39,16 @@ import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
 /**
  *
  */
-public class DatasiftInstagramActivitySerializer extends DatasiftDefaultActivitySerializer {
+public class DatasiftInstagramActivitySerializer extends DatasiftInteractionActivitySerializer {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(DatasiftInstagramActivitySerializer.class);
 
+    private static DatasiftInstagramActivitySerializer instance = new DatasiftInstagramActivitySerializer();
+
+    public static DatasiftInstagramActivitySerializer getInstance() {
+        return instance;
+    }
+
     @Override
     public Activity convert(Datasift event) {
         Activity activity = super.convert(event);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
index 3c7abda..6fd19e7 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
@@ -46,10 +46,16 @@ import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
 /**
  *
  */
-public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySerializer {
+public class DatasiftTweetActivitySerializer extends DatasiftInteractionActivitySerializer {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(DatasiftTweetActivitySerializer.class);
 
+    private static DatasiftTweetActivitySerializer instance = new DatasiftTweetActivitySerializer();
+
+    public static DatasiftTweetActivitySerializer getInstance() {
+        return instance;
+    }
+
     @Override
     public Activity convert(Datasift event) {
         Activity activity = new Activity();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java
index c5f2abf..93ab28b 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java
@@ -33,16 +33,21 @@ import java.io.IOException;
 
 /**
  * Created by sblackmon on 3/27/14.
+ *
+ * Depracated: Use StreamsJacksonMapper instead
  */
+@Deprecated()
 public class StreamsDatasiftMapper extends StreamsJacksonMapper {
 
-    public static final DateTimeFormatter DATASIFT_FORMAT = DateTimeFormat.forPattern("EEE, dd MMM yyyy HH:mm:ss Z");
+    public static final String DATASIFT_FORMAT = "EEE, dd MMM yyyy HH:mm:ss Z";
+
+    public static final DateTimeFormatter DATASIFT_FORMATTER = DateTimeFormat.forPattern(DATASIFT_FORMAT);
 
     public static final Long getMillis(String dateTime) {
 
         // this function is for pig which doesn't handle exceptions well
         try {
-            Long result = DATASIFT_FORMAT.parseMillis(dateTime);
+            Long result = DATASIFT_FORMATTER.parseMillis(dateTime);
             return result;
         } catch( Exception e ) {
             return null;
@@ -66,7 +71,7 @@ public class StreamsDatasiftMapper extends StreamsJacksonMapper {
                     public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException, JsonProcessingException {
                         DateTime result = null;
                         try {
-                            result = DATASIFT_FORMAT.parseDateTime(jpar.getValueAsString());
+                            result = DATASIFT_FORMATTER.parseDateTime(jpar.getValueAsString());
                         } catch (Exception e) {}
                         if (result == null) {
                             try {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/test/java17/com/datasift/test/DatasiftSerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/com/datasift/test/DatasiftSerDeTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/com/datasift/test/DatasiftSerDeTest.java
index c4422db..750915e 100644
--- a/streams-contrib/streams-provider-datasift/src/test/java17/com/datasift/test/DatasiftSerDeTest.java
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/com/datasift/test/DatasiftSerDeTest.java
@@ -20,7 +20,9 @@ package com.datasift.test;
 
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
 import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.junit.Assert;
 import org.junit.Ignore;
@@ -31,6 +33,8 @@ import org.slf4j.LoggerFactory;
 import java.io.BufferedReader;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  *
@@ -39,18 +43,11 @@ public class DatasiftSerDeTest {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftSerDeTest.class);
 
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsDatasiftMapper.DATASIFT_FORMAT));
 
-
-
-
-    @Test @Ignore
+    @Test
     public void Tests()
     {
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
-
         InputStream is = DatasiftSerDeTest.class.getResourceAsStream("/part-r-00000.json");
         InputStreamReader isr = new InputStreamReader(is);
         BufferedReader br = new BufferedReader(isr);
@@ -60,6 +57,7 @@ public class DatasiftSerDeTest {
                 String line = br.readLine();
                 LOGGER.debug(line);
                 System.out.println(line);
+
                 Datasift ser = mapper.readValue(line, Datasift.class);
 
                 String de = mapper.writeValueAsString(ser);
@@ -68,7 +66,7 @@ public class DatasiftSerDeTest {
 
                 Datasift serde = mapper.readValue(de, Datasift.class);
 
-//                Assert.assertEquals(ser, serde);
+                Assert.assertEquals(ser, serde);
 
                 LOGGER.debug(mapper.writeValueAsString(serde));
             }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java
deleted file mode 100644
index 015f4e9..0000000
--- a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.datasift.provider;
-
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.datasift.processor.DatasiftTypeConverterProcessor;
-import org.apache.streams.pojo.json.Activity;
-import org.junit.Test;
-
-import java.util.List;
-
-import static junit.framework.Assert.*;
-
-/**
- *
- */
-public class DatasiftTypeConverterProcessorTest {
-
-    private static final String DATASIFT_JSON = "{\"demographic\":{\"gender\":\"female\"},\"interaction\":{\"schema\":{\"version\":3},\"source\":\"Twitter for Android\",\"author\":{\"username\":\"ViiOLeee\",\"name\":\"Violeta Anguita\",\"id\":70931384,\"avatar\":\"http://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"link\":\"http://twitter.com/ViiOLeee\",\"language\":\"en\"},\"type\":\"twitter\",\"created_at\":\"Tue, 27 May 2014 22:38:15 +0000\",\"received_at\":1.401230295658E9,\"content\":\"RT @AliiAnguita: \\\"@Pharrell: Loved working with @edsheeran on Sing. He's a genius. https://t.co/wB2qKyJMRw\\\" @ViiOLeee  look at this!\",\"id\":\"1e3e5ef97532a580e0741841f5746728\",\"link\":\"http://twitter.com/ViiOLeee/status/471420141989666817\",\"mentions\":[\"Pharrell\",\"edsheeran\",\"ViiOLeee\",\"AliiAnguita\"],\"mention_ids\":[338084918,85452649,70931384]},\"klout\":{\"score\":34},\"language\":{\"tag\":\"en\",\"tag_extended\":\"en\",\
 "confidence\":98},\"links\":{\"code\":[200],\"created_at\":[\"Tue, 27 May 2014 14:28:06 +0000\"],\"meta\":{\"charset\":[\"UTF-8\"],\"content_type\":[\"text/html\"],\"description\":[\"Official Video for Ed Sheeran&#39;s track SING Get this track on iTunes: http://smarturl.it/EdSing Pre-order &#39;x&#39; on iTunes and get &#39;One&#39; instantly: http://smartu...\"],\"keywords\":[[\"ed sheeran\",\"ed sheeran sing\",\"ed sheeran new album\",\"Ed Sheeran (Musical Artist)\",\"ed sheeran one\",\"ed sheeran fault in our stars\",\"ed sheeran all of the stars\",\"s...\"]],\"lang\":[\"en\"],\"opengraph\":[{\"site_name\":\"YouTube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\",\"title\":\"Ed Sheeran - SING [Official Video]\",\"image\":\"https://i1.ytimg.com/vi/tlYcUqEPN58/maxresdefault.jpg\",\"description\":\"Official Video for Ed Sheeran&#39;s track SING Get this track on iTunes: http://smarturl.it/EdSing Pre-order &#39;x&#39; on iTunes and get &#39;One&#39; instantly: http://smartu
 ...\",\"type\":\"video\"}],\"twitter\":[{\"card\":\"player\",\"site\":\"@youtube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\",\"title\":\"Ed Sheeran - SING [Official Video]\",\"description\":\"Official Video for Ed Sheeran&#39;s track SING Get this track on iTunes: http://smarturl.it/EdSing Pre-order &#39;x&#39; on iTunes and get &#39;One&#39; instantly: http://smartu...\",\"image\":\"https://i1.ytimg.com/vi/tlYcUqEPN58/maxresdefault.jpg\",\"app\":{\"iphone\":{\"name\":\"YouTube\",\"id\":\"544007664\",\"url\":\"vnd.youtube://watch/tlYcUqEPN58\"},\"ipad\":{\"name\":\"YouTube\",\"id\":\"544007664\",\"url\":\"vnd.youtube://watch/tlYcUqEPN58\"},\"googleplay\":{\"name\":\"YouTube\",\"id\":\"com.google.android.youtube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\"}},\"player\":\"https://www.youtube.com/embed/tlYcUqEPN58\",\"player_width\":\"1280\",\"player_height\":\"720\"}]},\"normalized_url\":[\"https://youtube.com/watch?v=tlYcUqEPN58\"],\"retweet_count\":[0],\"tit
 le\":[\"Ed Sheeran - SING [Official Video] - YouTube\"],\"url\":[\"https://www.youtube.com/watch?v=tlYcUqEPN58\"]},\"twitter\":{\"id\":\"471420141989666817\",\"retweet\":{\"text\":\"\\\"@Pharrell: Loved working with @edsheeran on Sing. He's a genius. https://t.co/wB2qKyJMRw\\\" @ViiOLeee  look at this!\",\"id\":\"471420141989666817\",\"user\":{\"name\":\"Violeta Anguita\",\"description\":\"La vida no seria la fiesta que todos esperamos, pero mientras estemos aqui debemos BAILAR!!! #ErasmusOnceErasmusForever\",\"location\":\"Espanhaa..Olaa!\",\"statuses_count\":5882,\"followers_count\":249,\"friends_count\":1090,\"screen_name\":\"ViiOLeee\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"lang\":\"en\",\"time_zone\":\"Madrid\",\"utc_offset\":7200,\"listed_count\":1,\"id\":709
 31384,\"id_str\":\"70931384\",\"geo_enabled\":false,\"verified\":false,\"favourites_count\":275,\"created_at\":\"Wed, 02 Sep 2009 10:19:59 +0000\"},\"source\":\"<a href=\\\"http://twitter.com/download/android\\\" rel=\\\"nofollow\\\">Twitter for Android</a>\",\"count\":1,\"created_at\":\"Tue, 27 May 2014 22:38:15 +0000\",\"mentions\":[\"Pharrell\",\"edsheeran\",\"ViiOLeee\",\"AliiAnguita\"],\"mention_ids\":[338084918,85452649,70931384],\"links\":[\"https://www.youtube.com/watch?v=tlYcUqEPN58\"],\"display_urls\":[\"youtube.com/watch?v=tlYcUq���\"],\"domains\":[\"www.youtube.com\"],\"lang\":\"en\"},\"retweeted\":{\"id\":\"471419867078209536\",\"user\":{\"name\":\"Alicia Anguita \",\"description\":\"Estudiante de Ingenieria de la Edificaci��n en Granada.\",\"statuses_count\":371,\"followers_count\":185,\"friends_count\":404,\"screen_name\":\"AliiAnguita\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/424248659677442048/qCPZL8c9_normal.jpeg\",\"profile_image_url_
 https\":\"https://pbs.twimg.com/profile_images/424248659677442048/qCPZL8c9_normal.jpeg\",\"lang\":\"es\",\"listed_count\":0,\"id\":561201891,\"id_str\":\"561201891\",\"geo_enabled\":false,\"verified\":false,\"favourites_count\":17,\"created_at\":\"Mon, 23 Apr 2012 13:11:44 +0000\"},\"source\":\"<a href=\\\"http://twitter.com/download/android\\\" rel=\\\"nofollow\\\">Twitter for Android</a>\",\"created_at\":\"Tue, 27 May 2014 22:37:09 +0000\"}}}";
-
-    @Test
-    public void testTypeConverterToString() {
-        final String ID = "1";
-        StreamsProcessor processor = new DatasiftTypeConverterProcessor(String.class);
-        processor.prepare(null);
-        StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID);
-        List<StreamsDatum> result = processor.process(datum);
-        assertNotNull(result);
-        assertEquals(1, result.size());
-        StreamsDatum resultDatum = result.get(0);
-        assertNotNull(resultDatum);
-        assertNotNull(resultDatum.getDocument());
-        assertTrue(resultDatum.getDocument() instanceof String);
-        assertEquals(ID, resultDatum.getId());
-    }
-
-    @Test
-    public void testTypeConverterToActivity() {
-        final String ID = "1";
-        StreamsProcessor processor = new DatasiftTypeConverterProcessor(Activity.class);
-        processor.prepare(null);
-        StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID);
-        List<StreamsDatum> result = processor.process(datum);
-        assertNotNull(result);
-        assertEquals(1, result.size());
-        StreamsDatum resultDatum = result.get(0);
-        assertNotNull(resultDatum);
-        assertNotNull(resultDatum.getDocument());
-        assertTrue(resultDatum.getDocument() instanceof Activity);
-        assertEquals(ID, resultDatum.getId());
-    }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
index 5f9feed..162526b 100644
--- a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java
@@ -2,8 +2,10 @@ package org.apache.streams.datasift.serializer;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
 import org.apache.streams.datasift.util.StreamsDatasiftMapper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.Actor;
 import org.junit.Test;
@@ -16,7 +18,8 @@ import static org.junit.Assert.assertNotNull;
 public class DatasiftActivitySerializerTest {
 
     private static final DatasiftActivitySerializer SERIALIZER = new DatasiftActivitySerializer();
-    private static final ObjectMapper MAPPER = StreamsDatasiftMapper.getInstance();
+
+    private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsDatasiftMapper.DATASIFT_FORMAT));
 
     @Test
     public void testGeneralConversion() throws Exception {
@@ -42,6 +45,7 @@ public class DatasiftActivitySerializerTest {
             testGeneralConversion(line);
             testDeserNoNull(line);
             testDeserNoAddProps(line);
+
             System.out.println("ORIGINAL -> "+line);
             System.out.println("ACTIVITY -> "+MAPPER.writeValueAsString(SERIALIZER.deserialize(line)));
             System.out.println("NODE     -> "+MAPPER.convertValue(SERIALIZER.deserialize(line), JsonNode.class));

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java
new file mode 100644
index 0000000..2004654
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.datasift.serializer;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.instagram.Instagram;
+import org.apache.streams.datasift.twitter.Twitter;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonUserActivitySerializer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Scanner;
+
+/**
+ * Created by sblackmon on 12/13/13.
+ */
+public class DatasiftEventClassifierTest {
+
+    private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsDatasiftMapper.DATASIFT_FORMAT));
+
+    @Test
+    public void testTwitterDetection() throws Exception {
+        Scanner scanner = new Scanner(DatasiftActivitySerializerTest.class.getResourceAsStream("/twitter_datasift_json.txt"));
+        String line = null;
+        while(scanner.hasNextLine()) {
+            line = scanner.nextLine();
+            Datasift datasift = MAPPER.readValue(line, Datasift.class);
+            assert(DatasiftEventClassifier.detectClass(datasift) == Twitter.class);
+            assert(DatasiftEventClassifier.bestSerializer(datasift) instanceof DatasiftTweetActivitySerializer);
+        }
+    }
+
+    @Test
+    public void testInstagramDetection() throws Exception {
+        Scanner scanner = new Scanner(DatasiftActivitySerializerTest.class.getResourceAsStream("/instagram_datasift_json.txt"));
+        String line = null;
+        while(scanner.hasNextLine()) {
+            line = scanner.nextLine();
+            Datasift datasift = MAPPER.readValue(line, Datasift.class);
+            assert(DatasiftEventClassifier.detectClass(datasift) == Instagram.class);
+            assert(DatasiftEventClassifier.bestSerializer(datasift) instanceof DatasiftInstagramActivitySerializer);
+        }
+    }
+    
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml
index edf4959..9c99a92 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -50,6 +50,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
+            <artifactId>streams-processor-jackson</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
             <artifactId>streams-util</artifactId>
             <version>${project.version}</version>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
deleted file mode 100644
index fb4615f..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.processor;
-
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.pojo.Delete;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.provider.TwitterEventClassifier;
-import org.apache.streams.twitter.serializer.*;
-import org.apache.streams.util.ComponentUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class TwitterEventProcessor implements StreamsProcessor {
-
-    private final static String STREAMS_ID = "TwitterEventProcessor";
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterEventProcessor.class);
-
-    private ObjectMapper mapper = new StreamsTwitterMapper();
-
-    private Class inClass;
-    private Class outClass;
-
-    private TwitterJsonActivitySerializer twitterJsonActivitySerializer;
-
-    public TwitterEventProcessor(Class inClass, Class outClass) {
-        this.inClass = inClass;
-        this.outClass = outClass;
-    }
-
-    public TwitterEventProcessor( Class outClass) {
-        this(null, outClass);
-    }
-
-    public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException {
-
-        Object result = null;
-
-        Preconditions.checkNotNull(event);
-        Preconditions.checkNotNull(mapper);
-        Preconditions.checkNotNull(twitterJsonActivitySerializer);
-
-        if( outClass.equals( Activity.class )) {
-                LOGGER.debug("ACTIVITY");
-                result = twitterJsonActivitySerializer.deserialize(
-                        mapper.writeValueAsString(event));
-        } else if( outClass.equals( Tweet.class )) {
-            if ( inClass.equals( Tweet.class )) {
-                LOGGER.debug("TWEET");
-                result = mapper.convertValue(event, Tweet.class);
-            }
-        } else if( outClass.equals( Retweet.class )) {
-            if ( inClass.equals( Retweet.class )) {
-                LOGGER.debug("RETWEET");
-                result = mapper.convertValue(event, Retweet.class);
-            }
-        } else if( outClass.equals( Delete.class )) {
-            if ( inClass.equals( Delete.class )) {
-                LOGGER.debug("DELETE");
-                result = mapper.convertValue(event, Delete.class);
-            }
-        } else if( outClass.equals( ObjectNode.class )) {
-            LOGGER.debug("OBJECTNODE");
-            result = mapper.convertValue(event, ObjectNode.class);
-        }
-
-            // no supported conversion were applied
-        if( result != null )
-            return result;
-
-        LOGGER.debug("CONVERT FAILED");
-
-        return null;
-
-    }
-
-    public boolean validate(Object document, Class klass) {
-
-        // TODO
-        return true;
-    }
-
-    public boolean isValidJSON(final String json) {
-        boolean valid = false;
-        try {
-            final JsonParser parser = new ObjectMapper().getJsonFactory()
-                    .createJsonParser(json);
-            while (parser.nextToken() != null) {
-            }
-            valid = true;
-        } catch (JsonParseException jpe) {
-            LOGGER.warn("validate: {}", jpe);
-        } catch (IOException ioe) {
-            LOGGER.warn("validate: {}", ioe);
-        }
-
-        return valid;
-    }
-
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-
-        // first check for valid json
-        ObjectNode node = (ObjectNode) entry.getDocument();
-
-        LOGGER.debug("{} processing {}", STREAMS_ID, node.getClass());
-
-        String json = null;
-        try {
-            json = mapper.writeValueAsString(node);
-        } catch (JsonProcessingException e) {
-            e.printStackTrace();
-        }
-
-        if( StringUtils.isNotEmpty(json)) {
-
-            // since data is coming from outside provider, we don't know what type the events are
-            Class inClass = TwitterEventClassifier.detectClass(json);
-
-            // if the target is string, just pass-through
-            if (java.lang.String.class.equals(outClass))
-                return Lists.newArrayList(new StreamsDatum(json));
-            else {
-                // convert to desired format
-                Object out = null;
-                try {
-                    out = convert(node, inClass, outClass);
-                } catch (ActivitySerializerException e) {
-                    LOGGER.warn("Failed deserializing", e);
-                    return Lists.newArrayList();
-                } catch (JsonProcessingException e) {
-                    LOGGER.warn("Failed parsing JSON", e);
-                    return Lists.newArrayList();
-                }
-
-                if (out != null && validate(out, outClass))
-                    return Lists.newArrayList(new StreamsDatum(out));
-            }
-        }
-
-        return Lists.newArrayList();
-
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-        mapper = new StreamsJacksonMapper();
-        twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
-    }
-
-    @Override
-    public void cleanUp() {
-
-    }
-};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
deleted file mode 100644
index 674eef1..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.processor;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.pojo.User;
-import org.apache.streams.twitter.provider.TwitterEventClassifier;
-import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Queue;
-import java.util.Random;
-
-public class TwitterProfileProcessor implements StreamsProcessor, Runnable {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterProfileProcessor.class);
-
-    private ObjectMapper mapper = new StreamsTwitterMapper();
-
-    private Queue<StreamsDatum> inQueue;
-    private Queue<StreamsDatum> outQueue;
-
-    public final static String TERMINATE = new String("TERMINATE");
-
-    @Override
-    public void run() {
-
-        while(true) {
-            StreamsDatum item;
-            try {
-                item = inQueue.poll();
-                if(item.getDocument() instanceof String && item.equals(TERMINATE)) {
-                    LOGGER.info("Terminating!");
-                    break;
-                }
-
-                Thread.sleep(new Random().nextInt(100));
-
-                for( StreamsDatum entry : process(item)) {
-                    outQueue.offer(entry);
-                }
-
-
-            } catch (Exception e) {
-                e.printStackTrace();
-
-            }
-        }
-    }
-
-    public StreamsDatum createStreamsDatum(User user) {
-        return new StreamsDatum(user, user.getIdStr());
-    }
-
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-
-        List<StreamsDatum> result = Lists.newArrayList();
-        String item;
-        try {
-            // first check for valid json
-            // since data is coming from outside provider, we don't know what type the events are
-            if( entry.getDocument() instanceof String) {
-                item = (String) entry.getDocument();
-            } else {
-                item = mapper.writeValueAsString((ObjectNode)entry.getDocument());
-            }
-
-            Class inClass = TwitterEventClassifier.detectClass(item);
-
-            User user;
-
-            if ( inClass.equals( Tweet.class )) {
-                LOGGER.debug("TWEET");
-                Tweet tweet = mapper.readValue(item, Tweet.class);
-                user = tweet.getUser();
-                result.add(createStreamsDatum(user));
-            }
-            else if ( inClass.equals( Retweet.class )) {
-                LOGGER.debug("RETWEET");
-                Retweet retweet = mapper.readValue(item, Retweet.class);
-                user = retweet.getRetweetedStatus().getUser();
-                result.add(createStreamsDatum(user));
-            } else if ( inClass.equals( User.class )) {
-                LOGGER.debug("USER");
-                user = mapper.readValue(item, User.class);
-                result.add(createStreamsDatum(user));
-            } else {
-                return Lists.newArrayList();
-            }
-
-            return result;
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOGGER.warn("Error processing " + entry.toString());
-            return Lists.newArrayList();
-        }
-    }
-
-    @Override
-    public void prepare(Object o) {
-
-    }
-
-    @Override
-    public void cleanUp() {
-
-    }
-};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
deleted file mode 100644
index 74cce27..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.processor;
-
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.pojo.Delete;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.provider.TwitterEventClassifier;
-import org.apache.streams.twitter.serializer.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Queue;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class TwitterTypeConverter implements StreamsProcessor {
-
-    public final static String STREAMS_ID = "TwitterTypeConverter";
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTypeConverter.class);
-
-    private ObjectMapper mapper;
-
-    private Queue<StreamsDatum> inQueue;
-    private Queue<StreamsDatum> outQueue;
-
-    private Class inClass;
-    private Class outClass;
-
-    private TwitterJsonActivitySerializer twitterJsonActivitySerializer;
-
-    private int count = 0;
-
-    public final static String TERMINATE = new String("TERMINATE");
-
-    public TwitterTypeConverter(Class inClass, Class outClass) {
-        this.inClass = inClass;
-        this.outClass = outClass;
-    }
-
-    public Queue<StreamsDatum> getProcessorOutputQueue() {
-        return outQueue;
-    }
-
-    public void setProcessorInputQueue(Queue<StreamsDatum> inputQueue) {
-        inQueue = inputQueue;
-    }
-
-    public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException {
-
-        Object result = null;
-
-        if( outClass.equals( Activity.class )) {
-            LOGGER.debug("ACTIVITY");
-            result = twitterJsonActivitySerializer.deserialize(
-                    mapper.writeValueAsString(event));
-        } else if( outClass.equals( Tweet.class )) {
-            if ( inClass.equals( Tweet.class )) {
-                LOGGER.debug("TWEET");
-                result = mapper.convertValue(event, Tweet.class);
-            }
-        } else if( outClass.equals( Retweet.class )) {
-            if ( inClass.equals( Retweet.class )) {
-                LOGGER.debug("RETWEET");
-                result = mapper.convertValue(event, Retweet.class);
-            }
-        } else if( outClass.equals( Delete.class )) {
-            if ( inClass.equals( Delete.class )) {
-                LOGGER.debug("DELETE");
-                result = mapper.convertValue(event, Delete.class);
-            }
-        } else if( outClass.equals( ObjectNode.class )) {
-            LOGGER.debug("OBJECTNODE");
-            result = mapper.convertValue(event, ObjectNode.class);
-        }
-
-        // no supported conversion were applied
-        if( result != null ) {
-            count ++;
-            return result;
-        }
-
-        LOGGER.debug("CONVERT FAILED");
-
-        return null;
-
-    }
-
-    public boolean validate(Object document, Class klass) {
-
-        // TODO
-        return true;
-    }
-
-    public boolean isValidJSON(final String json) {
-        boolean valid = false;
-        try {
-            final JsonParser parser = new ObjectMapper().getJsonFactory()
-                    .createJsonParser(json);
-            while (parser.nextToken() != null) {
-            }
-            valid = true;
-        } catch (JsonParseException jpe) {
-            LOGGER.warn("validate: {}", jpe);
-        } catch (IOException ioe) {
-            LOGGER.warn("validate: {}", ioe);
-        }
-
-        return valid;
-    }
-
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-
-        StreamsDatum result = null;
-
-        try {
-
-            Object item = entry.getDocument();
-            ObjectNode node;
-
-            LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
-
-            if( item instanceof String ) {
-
-                // if the target is string, just pass-through
-                if( String.class.equals(outClass)) {
-                    result = entry;
-                }
-                else {
-                    // first check for valid json
-                    node = (ObjectNode)mapper.readTree((String)item);
-
-                    // since data is coming from outside provider, we don't know what type the events are
-                    Class inClass = TwitterEventClassifier.detectClass((String) item);
-
-                    Object out = convert(node, inClass, outClass);
-
-                    if( out != null && validate(out, outClass))
-                        result = new StreamsDatum(out);
-                }
-
-            } else if( item instanceof ObjectNode ) {
-
-                // first check for valid json
-                node = (ObjectNode)mapper.valueToTree(item);
-
-                // since data is coming from outside provider, we don't know what type the events are
-                Class inClass = TwitterEventClassifier.detectClass(mapper.writeValueAsString(item));
-
-                Object out = convert(node, inClass, outClass);
-
-                if( out != null && validate(out, outClass))
-                    result = new StreamsDatum(out);
-
-            }
-
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-
-        if( result != null )
-            return Lists.newArrayList(result);
-        else
-            return Lists.newArrayList();
-    }
-
-    @Override
-    public void prepare(Object o) {
-        mapper = new StreamsTwitterMapper();
-        twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
-    }
-
-    @Override
-    public void cleanUp() {
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
index 432a047..2234739 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
@@ -18,41 +18,38 @@
 
 package org.apache.streams.twitter.provider;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.pojo.*;
 import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
+import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonUserActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonUserstreameventActivitySerializer;
 
 import java.io.IOException;
+import java.io.Serializable;
 
 /**
  * Created by sblackmon on 12/13/13.
  */
-public class TwitterEventClassifier {
+public class TwitterEventClassifier implements Serializable {
 
-    public static Class detectClass( String json ) {
+    private static ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
 
+    public static Class detectClass( String json ) {
         Preconditions.checkNotNull(json);
         Preconditions.checkArgument(StringUtils.isNotEmpty(json));
 
-//        try {
-//            JsonAssert.with(json).assertNull("$.delete");
-//        } catch( AssertionError ae ) {
-//            return Delete.class;
-//        }
-//
-//        try {
-//            JsonAssert.with(json).assertNull("$.retweeted_status");
-//        } catch( AssertionError ae ) {
-//            return Retweet.class;
-//        }
-//
-//        return Tweet.class;
-
         ObjectNode objectNode;
         try {
-            objectNode = (ObjectNode) StreamsTwitterMapper.getInstance().readTree(json);
+            objectNode = (ObjectNode) mapper.readTree(json);
         } catch (IOException e) {
             e.printStackTrace();
             return null;
@@ -72,4 +69,31 @@ public class TwitterEventClassifier {
         else
             return Tweet.class;
     }
+    public static ActivitySerializer bestSerializer( String json ) {
+
+        Preconditions.checkNotNull(json);
+        Preconditions.checkArgument(StringUtils.isNotEmpty(json));
+
+        ObjectNode objectNode;
+        try {
+            objectNode = (ObjectNode) mapper.readTree(json);
+        } catch (IOException e) {
+            e.printStackTrace();
+            return null;
+        }
+
+        if( objectNode.findValue("retweeted_status") != null && objectNode.get("retweeted_status") != null)
+            return TwitterJsonRetweetActivitySerializer.getInstance();
+        else if( objectNode.findValue("delete") != null )
+            return TwitterJsonDeleteActivitySerializer.getInstance();
+//        else if( objectNode.findValue("friends") != null ||
+//                objectNode.findValue("friends_str") != null )
+//            return FriendList.class;
+        else if( objectNode.findValue("target_object") != null )
+            return TwitterJsonUserstreameventActivitySerializer.getInstance();
+        else if ( objectNode.findValue("location") != null && objectNode.findValue("user") == null)
+            return TwitterJsonUserActivitySerializer.getInstance();
+        else
+            return TwitterJsonTweetActivitySerializer.getInstance();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
index 2ed16ba..395bd95 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
@@ -40,16 +40,21 @@ import java.io.IOException;
 
 /**
  * Created by sblackmon on 3/27/14.
+ *
+ * Deprecated: Use StreamsJacksonMapper
  */
+@Deprecated
 public class StreamsTwitterMapper extends StreamsJacksonMapper {
 
-    public static final DateTimeFormatter TWITTER_FORMAT = DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss Z yyyy");
+    public static final String TWITTER_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy";
+
+    public static final DateTimeFormatter TWITTER_FORMATTER = DateTimeFormat.forPattern(TWITTER_FORMAT);
 
     public static final Long getMillis(String dateTime) {
 
         // this function is for pig which doesn't handle exceptions well
         try {
-            Long result = TWITTER_FORMAT.parseMillis(dateTime);
+            Long result = TWITTER_FORMATTER.parseMillis(dateTime);
             return result;
         } catch( Exception e ) {
             return null;
@@ -73,7 +78,7 @@ public class StreamsTwitterMapper extends StreamsJacksonMapper {
                     public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException, JsonProcessingException {
                         DateTime result = null;
                         try {
-                            result = TWITTER_FORMAT.parseDateTime(jpar.getValueAsString());
+                            result = TWITTER_FORMATTER.parseDateTime(jpar.getValueAsString());
                         } catch( Exception e ) { }
                         try {
                             result = RFC3339Utils.getInstance().parseToUTC(jpar.getValueAsString());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
index 5b5be96..d1f0de9 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
@@ -35,10 +35,11 @@ public class TwitterJsonActivitySerializer implements ActivitySerializer<String>
 
     }
 
-    TwitterJsonTweetActivitySerializer tweetActivitySerializer = new TwitterJsonTweetActivitySerializer();
-    TwitterJsonRetweetActivitySerializer retweetActivitySerializer = new TwitterJsonRetweetActivitySerializer();
-    TwitterJsonDeleteActivitySerializer deleteActivitySerializer = new TwitterJsonDeleteActivitySerializer();
-    TwitterJsonUserActivitySerializer userActivitySerializer = new TwitterJsonUserActivitySerializer();
+    private static TwitterJsonActivitySerializer instance = new TwitterJsonActivitySerializer();
+
+    public static TwitterJsonActivitySerializer getInstance() {
+        return instance;
+    }
 
     @Override
     public String serializationFormat() {
@@ -53,18 +54,11 @@ public class TwitterJsonActivitySerializer implements ActivitySerializer<String>
     @Override
     public Activity deserialize(String serialized) throws ActivitySerializerException {
 
-        Class documentSubType = TwitterEventClassifier.detectClass(serialized);
+        ActivitySerializer serializer = TwitterEventClassifier.bestSerializer(serialized);
+        Activity activity = serializer.deserialize(serialized);
 
-        Activity activity;
-        if( documentSubType == Tweet.class )
-            activity = tweetActivitySerializer.deserialize(serialized);
-        else if( documentSubType == Retweet.class )
-            activity = retweetActivitySerializer.deserialize(serialized);
-        else if( documentSubType == Delete.class )
-            activity = deleteActivitySerializer.deserialize(serialized);
-        else if( documentSubType == User.class )
-            activity = userActivitySerializer.deserialize(serialized);
-        else throw new ActivitySerializerException("unrecognized type");
+        if( activity == null )
+            throw new ActivitySerializerException("unrecognized type");
 
         return activity;
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
index cb1618a..b368f71 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
@@ -46,6 +46,12 @@ import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*;
 */
 public class TwitterJsonDeleteActivitySerializer implements ActivitySerializer<String>, Serializable {
 
+    private static TwitterJsonDeleteActivitySerializer instance = new TwitterJsonDeleteActivitySerializer();
+
+    public static TwitterJsonDeleteActivitySerializer getInstance() {
+        return instance;
+    }
+
     @Override
     public String serializationFormat() {
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
index 4f141bb..58cb769 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
@@ -37,6 +37,12 @@ public class TwitterJsonRetweetActivitySerializer implements ActivitySerializer<
 
     }
 
+    private static TwitterJsonRetweetActivitySerializer instance = new TwitterJsonRetweetActivitySerializer();
+
+    public static TwitterJsonRetweetActivitySerializer getInstance() {
+        return instance;
+    }
+
     @Override
     public String serializationFormat() {
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
index 8e58326..e6fc05f 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
@@ -34,6 +34,12 @@ import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*;
 
 public class TwitterJsonTweetActivitySerializer implements ActivitySerializer<String>, Serializable {
 
+    private static TwitterJsonTweetActivitySerializer instance = new TwitterJsonTweetActivitySerializer();
+
+    public static TwitterJsonTweetActivitySerializer getInstance() {
+        return instance;
+    }
+
     @Override
     public String serializationFormat() {
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java
index 2ae5355..1bf935c 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java
@@ -36,6 +36,12 @@ public class TwitterJsonUserActivitySerializer implements ActivitySerializer<Str
 
     public TwitterJsonUserActivitySerializer() {}
 
+    private static TwitterJsonUserActivitySerializer instance = new TwitterJsonUserActivitySerializer();
+
+    public static TwitterJsonUserActivitySerializer getInstance() {
+        return instance;
+    }
+
     @Override
     public String serializationFormat() {
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java
index edaec01..e2832dd 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java
@@ -44,6 +44,12 @@ import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*;
 */
 public class TwitterJsonUserstreameventActivitySerializer implements ActivitySerializer<String> {
 
+    private static TwitterJsonUserstreameventActivitySerializer instance = new TwitterJsonUserstreameventActivitySerializer();
+
+    public static TwitterJsonUserstreameventActivitySerializer getInstance() {
+        return instance;
+    }
+
     @Override
     public String serializationFormat() {
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
index 2d18db9..6b62fe3 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
@@ -21,13 +21,15 @@ package org.apache.streams.twitter.test;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.jackson.TypeConverterProcessor;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.processor.TwitterTypeConverter;
 import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
 import org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer;
 import org.junit.Assert;
@@ -55,14 +57,13 @@ import static org.junit.Assert.assertThat;
 public class SimpleTweetTest {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(SimpleTweetTest.class);
-    private ObjectMapper mapper = StreamsTwitterMapper.getInstance();
+
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
 
     private static final String TWITTER_JSON= "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":410898682356047872,\"id_str\":\"410898682356047872\",\"text\":\"RT @ughhblog: RRome (Brooklyn, NY) \\u2013 MY GIRL http:\\/\\/t.co\\/x6uxX9PLsH via @indierapblog @RRoseRRome\",\"source\":\"\\u003ca href=\\\"https:\\/\\/about.twitter.com\\/products\\/tweetdeck\\\" rel=\\\"nofollow\\\"\\u003eTweetDeck\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":70463906,\"id_str\":\"70463906\",\"name\":\"MHM DESIGNS, LLC\",\"screen_name\":\"MHMDESIGNS\",\"location\":\"Los Angeles New York\",\"url\":\"http:\\/\\/www.mhmdesigns.com\",\"description\":\"Multi Media Made Simple- Web desig, Graphic Design, Internet Marketing, Photography, Video Production and much much more.\",\"protected\":false,\"followers_count\":10,\"friends_coun
 t\":64,\"listed_count\":1,\"created_at\":\"Mon Aug 31 18:31:54 +0000 2009\",\"favourites_count\":0,\"utc_offset\":-28800,\"time_zone\":\"Pacific Time (US & Canada)\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":87,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"9AE4E8\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_background_images\\/33456434\\/body.png\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/33456434\\/body.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/391494416\\/mhm_design_logo__normal.png\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/391494416\\/mhm_design_logo__normal.png\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"BDDCAD\",\"profile_sidebar_fill_color\":\"DDFFCC\",\"profile_text_color\":\"333333\",\"profile_us
 e_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweeted_status\":{\"created_at\":\"Wed Dec 11 10:56:49 +0000 2013\",\"id\":410724848306892800,\"id_str\":\"410724848306892800\",\"text\":\"RRome (Brooklyn, NY) \\u2013 MY GIRL http:\\/\\/t.co\\/x6uxX9PLsH via @indierapblog @RRoseRRome\",\"source\":\"\\u003ca href=\\\"http:\\/\\/twitter.com\\/tweetbutton\\\" rel=\\\"nofollow\\\"\\u003eTweet Button\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":538836510,\"id_str\":\"538836510\",\"name\":\"UGHHBlog\",\"screen_name\":\"ughhblog\",\"location\":\"Los Angeles\",\"url\":\"http:\\/\\/www.undergroundhiphopblog.com\",\"description\":\"http:\\/\\/UN
 DERGROUNDHIPHOPBLOG.com: A top Indie\\/Underground Hip Hop community blog. Submission Email: ughhblog@gmail.com \\/\\/\\/ Official Host: @pawz1\",\"protected\":false,\"followers_count\":2598,\"friends_count\":373,\"listed_count\":25,\"created_at\":\"Wed Mar 28 05:40:49 +0000 2012\",\"favourites_count\":423,\"utc_offset\":-28800,\"time_zone\":\"Pacific Time (US & Canada)\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":9623,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"131516\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_background_images\\/544717772\\/UGHHBlogLogo.jpg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/544717772\\/UGHHBlogLogo.jpg\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/2583702975\\/uas8528qzzdlnsb7igzn_normal.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.tw
 img.com\\/profile_images\\/2583702975\\/uas8528qzzdlnsb7igzn_normal.jpeg\",\"profile_link_color\":\"009999\",\"profile_sidebar_border_color\":\"EEEEEE\",\"profile_sidebar_fill_color\":\"EFEFEF\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":4,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/t.co\\/x6uxX9PLsH\",\"expanded_url\":\"http:\\/\\/indierapblog.com\\/rrome-brooklyn-ny-my-girl\\/\",\"display_url\":\"indierapblog.com\\/rrome-brooklyn\\u2026\",\"indices\":[31,53]}],\"user_mentions\":[{\"screen_name\":\"IndieRapBlog\",\"name\":\"IndieRapBlog.com\",\"id\":922776728,\"id_str\":\"922776728\",\"indices\":[58,71]},{\"screen_name\":\"RRoseRRome\",\"name\":\"RRome\",\"id\":76371478,\"id_str\":\"76
 371478\",\"indices\":[72,83]}]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"lang\":\"en\"},\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/t.co\\/x6uxX9PLsH\",\"expanded_url\":\"http:\\/\\/indierapblog.com\\/rrome-brooklyn-ny-my-girl\\/\",\"display_url\":\"indierapblog.com\\/rrome-brooklyn\\u2026\",\"indices\":[45,67]}],\"user_mentions\":[{\"screen_name\":\"ughhblog\",\"name\":\"UGHHBlog\",\"id\":538836510,\"id_str\":\"538836510\",\"indices\":[3,12]},{\"screen_name\":\"IndieRapBlog\",\"name\":\"IndieRapBlog.com\",\"id\":922776728,\"id_str\":\"922776728\",\"indices\":[72,85]},{\"screen_name\":\"RRoseRRome\",\"name\":\"RRome\",\"id\":76371478,\"id_str\":\"76371478\",\"indices\":[86,97]}]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"}";
 
     private TwitterJsonActivitySerializer twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
 
-
-    //    @Ignore
     @Test
     public void Tests()
     {
@@ -100,7 +101,7 @@ public class SimpleTweetTest {
         }
 
         try {
-            TwitterTypeConverter converter = new TwitterTypeConverter(String.class, Activity.class);
+            TypeConverterProcessor converter = new TypeConverterProcessor(String.class, Activity.class);
             converter.prepare(null);
             converter.process(new StreamsDatum(TWITTER_JSON));
         } catch (Throwable e) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java
index c7f6434..d6af4d9 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java
@@ -21,7 +21,9 @@ package org.apache.streams.twitter.test;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
@@ -52,11 +54,11 @@ import static org.junit.Assert.assertThat;
 public class TweetActivitySerDeTest {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TweetActivitySerDeTest.class);
-    private ObjectMapper mapper = StreamsTwitterMapper.getInstance();
+
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
 
     private TwitterJsonActivitySerializer twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
 
-    //    @Ignore
     @Test
     public void Tests()
     {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
index d0a6714..eba5fd0 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
@@ -22,7 +22,9 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Retweet;
@@ -54,11 +56,11 @@ import static org.junit.Assert.assertThat;
 public class TweetSerDeTest {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TweetSerDeTest.class);
-    private ObjectMapper mapper = StreamsTwitterMapper.getInstance();
+
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
 
     private TwitterJsonActivitySerializer twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
 
-    //    @Ignore
     @Test
     public void Tests()
     {