You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/14 01:28:56 UTC

[3/9] incubator-streams git commit: omni-bus update

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivityConverter.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivityConverter.java
new file mode 100644
index 0000000..112b6a5
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivityConverter.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.data;
+
+import com.moreover.api.Article;
+import com.moreover.api.ArticlesResponse;
+import com.moreover.api.ObjectFactory;
+import org.apache.commons.lang.SerializationException;
+import org.apache.streams.data.util.MoreoverUtils;
+import org.apache.streams.pojo.json.Activity;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import java.io.StringReader;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Deserializes the Moreover Article XML and converts it to an instance of {@link Activity}
+ */
+public class MoreoverXmlActivityConverter implements ActivityConverter<String> {
+
+    //JAXBContext is threadsafe (supposedly)
+    private final JAXBContext articleContext;
+    private final JAXBContext articlesContext;
+
+    public MoreoverXmlActivityConverter() {
+        articleContext = createContext(Article.class);
+        articlesContext = createContext(ArticlesResponse.class);
+    }
+
+    @Override
+    public String serializationFormat() {
+        return "application/xml+vnd.moreover.com.v1";
+    }
+
+    @Override
+    public String serialize(Activity deserialized) {
+        throw new UnsupportedOperationException("Cannot currently serialize to Moreover");
+    }
+
+    @Override
+    public Activity deserialize(String serialized) {
+        Article article = deserializeMoreover(serialized);
+        return MoreoverUtils.convert(article);
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<String> serializedList) {
+        List<Activity> activities = new LinkedList<Activity>();
+        for(String item : serializedList) {
+            ArticlesResponse response = deserializeMoreoverResponse(item);
+            for(Article article : response.getArticles().getArticle()) {
+                activities.add(MoreoverUtils.convert(article));
+            }
+        }
+        return activities;
+    }
+
+    private Article deserializeMoreover(String serialized){
+        try {
+            Unmarshaller unmarshaller = articleContext.createUnmarshaller();
+            return (Article) unmarshaller.unmarshal(new StringReader(serialized));
+        } catch (JAXBException e) {
+            throw new SerializationException("Unable to deserialize Moreover data", e);
+        }
+    }
+
+    private ArticlesResponse deserializeMoreoverResponse(String serialized){
+        try {
+            Unmarshaller unmarshaller = articlesContext.createUnmarshaller();
+            return ((JAXBElement<ArticlesResponse>) unmarshaller.unmarshal(new StringReader(serialized))).getValue();
+        } catch (JAXBException e) {
+            throw new SerializationException("Unable to deserialize Moreover data", e);
+        }
+    }
+
+    private JAXBContext createContext(Class articleClass) {
+        JAXBContext context;
+        try {
+            context = JAXBContext.newInstance(articleClass.getPackage().getName(), ObjectFactory.class.getClassLoader());
+        } catch (JAXBException e) {
+            throw new IllegalStateException("Unable to create JAXB Context for Moreover data", e);
+        }
+        return context;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java
deleted file mode 100644
index d60bcb8..0000000
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data;
-
-import com.moreover.api.Article;
-import com.moreover.api.ArticlesResponse;
-import com.moreover.api.ObjectFactory;
-import org.apache.commons.lang.SerializationException;
-import org.apache.streams.data.util.MoreoverUtils;
-import org.apache.streams.pojo.json.Activity;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-import java.io.StringReader;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Deserializes the Moreover Article XML and converts it to an instance of {@link Activity}
- */
-public class MoreoverXmlActivitySerializer implements ActivitySerializer<String> {
-
-    //JAXBContext is threadsafe (supposedly)
-    private final JAXBContext articleContext;
-    private final JAXBContext articlesContext;
-
-    public MoreoverXmlActivitySerializer() {
-        articleContext = createContext(Article.class);
-        articlesContext = createContext(ArticlesResponse.class);
-    }
-
-    @Override
-    public String serializationFormat() {
-        return "application/xml+vnd.moreover.com.v1";
-    }
-
-    @Override
-    public String serialize(Activity deserialized) {
-        throw new UnsupportedOperationException("Cannot currently serialize to Moreover");
-    }
-
-    @Override
-    public Activity deserialize(String serialized) {
-        Article article = deserializeMoreover(serialized);
-        return MoreoverUtils.convert(article);
-    }
-
-    @Override
-    public List<Activity> deserializeAll(List<String> serializedList) {
-        List<Activity> activities = new LinkedList<Activity>();
-        for(String item : serializedList) {
-            ArticlesResponse response = deserializeMoreoverResponse(item);
-            for(Article article : response.getArticles().getArticle()) {
-                activities.add(MoreoverUtils.convert(article));
-            }
-        }
-        return activities;
-    }
-
-    private Article deserializeMoreover(String serialized){
-        try {
-            Unmarshaller unmarshaller = articleContext.createUnmarshaller();
-            return (Article) unmarshaller.unmarshal(new StringReader(serialized));
-        } catch (JAXBException e) {
-            throw new SerializationException("Unable to deserialize Moreover data", e);
-        }
-    }
-
-    private ArticlesResponse deserializeMoreoverResponse(String serialized){
-        try {
-            Unmarshaller unmarshaller = articlesContext.createUnmarshaller();
-            return ((JAXBElement<ArticlesResponse>) unmarshaller.unmarshal(new StringReader(serialized))).getValue();
-        } catch (JAXBException e) {
-            throw new SerializationException("Unable to deserialize Moreover data", e);
-        }
-    }
-
-    private JAXBContext createContext(Class articleClass) {
-        JAXBContext context;
-        try {
-            context = JAXBContext.newInstance(articleClass.getPackage().getName(), ObjectFactory.class.getClassLoader());
-        } catch (JAXBException e) {
-            throw new IllegalStateException("Unable to create JAXB Context for Moreover data", e);
-        }
-        return context;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivityConverterTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivityConverterTest.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivityConverterTest.java
new file mode 100644
index 0000000..a9e8e3a
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivityConverterTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.data;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.streams.data.util.JsonUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.streams.data.util.MoreoverTestUtil.test;
+import static org.junit.Assert.assertThat;
+
+public class MoreoverJsonActivityConverterTest {
+    JsonNode json;
+    ActivityConverter serializer = new MoreoverJsonActivityConverter();
+    ObjectMapper mapper;
+
+    @Before
+    public void setup() throws IOException {
+        json = JsonUtil.getFromFile("classpath:org/apache/streams/data/moreover.json");
+
+        mapper = new ObjectMapper();
+        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
+        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
+        mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
+    }
+
+    @Test
+    public void loadData() throws Exception {
+        for (JsonNode item : json) {
+            test(serializer.deserialize(getString(item)));
+        }
+    }
+
+
+    private String getString(JsonNode jsonNode)  {
+        try {
+            return new ObjectMapper().writeValueAsString(jsonNode);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivitySerializerTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivitySerializerTest.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivitySerializerTest.java
deleted file mode 100644
index f5d66b1..0000000
--- a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivitySerializerTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.streams.data.util.JsonUtil;
-import org.apache.streams.pojo.json.Activity;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.regex.Pattern;
-
-import static java.util.regex.Pattern.matches;
-import static org.apache.streams.data.util.MoreoverTestUtil.test;
-import static org.hamcrest.CoreMatchers.*;
-import static org.junit.Assert.assertThat;
-
-public class MoreoverJsonActivitySerializerTest {
-    JsonNode json;
-    ActivitySerializer serializer = new MoreoverJsonActivitySerializer();
-    ObjectMapper mapper;
-
-    @Before
-    public void setup() throws IOException {
-        json = JsonUtil.getFromFile("classpath:org/apache/streams/data/moreover.json");
-
-        mapper = new ObjectMapper();
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
-        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
-    }
-
-    @Test
-    public void loadData() throws Exception {
-        for (JsonNode item : json) {
-            test(serializer.deserialize(getString(item)));
-        }
-    }
-
-
-    private String getString(JsonNode jsonNode)  {
-        try {
-            return new ObjectMapper().writeValueAsString(jsonNode);
-        } catch (JsonProcessingException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivityConverterTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivityConverterTest.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivityConverterTest.java
new file mode 100644
index 0000000..9834032
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivityConverterTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.data;
+
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
+import org.apache.streams.pojo.json.Activity;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.util.List;
+
+import static org.apache.streams.data.util.MoreoverTestUtil.test;
+
+public class MoreoverXmlActivityConverterTest {
+    ActivityConverter serializer;
+    private String xml;
+
+    @Before
+    public void setup() throws IOException {
+        serializer = new MoreoverXmlActivityConverter();
+        xml = loadXml();
+    }
+
+    @Test
+    public void loadData() throws Exception {
+        List<Activity> activities = serializer.deserializeAll(Lists.newArrayList(xml));
+        for (Activity activity : activities) {
+            test(activity);
+        }
+    }
+
+    private String loadXml() throws IOException {
+        StringWriter writer = new StringWriter();
+        InputStream resourceAsStream = this.getClass().getResourceAsStream("moreover.xml");
+        IOUtils.copy(resourceAsStream, writer, Charset.forName("UTF-8"));
+        return writer.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerTest.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerTest.java
deleted file mode 100644
index dbebee2..0000000
--- a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data;
-
-
-import com.google.common.collect.Lists;
-import org.apache.commons.io.IOUtils;
-import org.apache.streams.pojo.json.Activity;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.nio.charset.Charset;
-import java.util.List;
-
-import static org.apache.streams.data.util.MoreoverTestUtil.test;
-
-public class MoreoverXmlActivitySerializerTest {
-    ActivitySerializer serializer;
-    private String xml;
-
-    @Before
-    public void setup() throws IOException {
-        serializer = new MoreoverXmlActivitySerializer();
-        xml = loadXml();
-    }
-
-    @Test
-    public void loadData() throws Exception {
-        List<Activity> activities = serializer.deserializeAll(Lists.newArrayList(xml));
-        for (Activity activity : activities) {
-            test(activity);
-        }
-    }
-
-    private String loadXml() throws IOException {
-        StringWriter writer = new StringWriter();
-        InputStream resourceAsStream = this.getClass().getResourceAsStream("moreover.xml");
-        IOUtils.copy(resourceAsStream, writer, Charset.forName("UTF-8"));
-        return writer.toString();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java
index 339b922..2c86fa0 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/processor/RssTypeConverter.java
@@ -24,7 +24,7 @@ import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.rss.serializer.SyndEntryActivitySerializer;
+import org.apache.streams.rss.serializer.SyndEntryActivityConverter;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.slf4j.Logger;
@@ -39,7 +39,7 @@ public class RssTypeConverter implements StreamsProcessor{
 
     private static final Logger LOGGER = LoggerFactory.getLogger(RssTypeConverter.class);
 
-    private SyndEntryActivitySerializer serializer;
+    private SyndEntryActivityConverter serializer;
     private int successCount = 0;
     private int failCount = 0;
 
@@ -62,7 +62,7 @@ public class RssTypeConverter implements StreamsProcessor{
 
     @Override
     public void prepare(Object o) {
-        this.serializer = new SyndEntryActivitySerializer();
+        this.serializer = new SyndEntryActivityConverter();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java
index 75d275d..347f593 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssEventProcessor.java
@@ -19,11 +19,10 @@
 package org.apache.streams.rss.provider;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.sun.syndication.feed.synd.SyndEntry;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.rss.serializer.SyndEntryActivitySerializer;
+import org.apache.streams.rss.serializer.SyndEntryActivityConverter;
 import org.apache.streams.rss.serializer.SyndEntrySerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,7 +42,7 @@ public class RssEventProcessor implements Runnable {
     private Class inClass;
     private Class outClass;
 
-    private SyndEntryActivitySerializer syndEntryActivitySerializer = new SyndEntryActivitySerializer();
+    private SyndEntryActivityConverter syndEntryActivitySerializer = new SyndEntryActivityConverter();
     private SyndEntrySerializer syndEntrySerializer = new SyndEntrySerializer();
 
     public final static String TERMINATE = new String("TERMINATE");

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivityConverter.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivityConverter.java
new file mode 100644
index 0000000..9801c30
--- /dev/null
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivityConverter.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.rss.serializer;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.streams.data.ActivityConverter;
+import org.apache.streams.data.util.RFC3339Utils;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.*;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class SyndEntryActivityConverter implements ActivityConverter<ObjectNode> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(SyndEntryActivityConverter.class);
+
+    private boolean includeRomeExtension;
+
+    public SyndEntryActivityConverter() {
+        this(true);
+    }
+
+    public SyndEntryActivityConverter(boolean includeRomeExtension) {
+        this.includeRomeExtension = includeRomeExtension;
+    }
+
+
+    @Override
+    public List<Activity> deserializeAll(List<ObjectNode> objectNodes) {
+        List<Activity> result = Lists.newLinkedList();
+        for (ObjectNode node : objectNodes) {
+            result.add(deserialize(node));
+        }
+        return result;
+    }
+
+    @Override
+    public String serializationFormat() {
+        return "application/streams-provider-rss";
+    }
+
+    @Override
+    public ObjectNode serialize(Activity deserialized) {
+        throw new UnsupportedOperationException("Cannot currently serialize to Rome");
+    }
+
+    @Override
+    public Activity deserialize(ObjectNode syndEntry) {
+        return deserializeWithRomeExtension(syndEntry, this.includeRomeExtension);
+    }
+
+    public Activity deserializeWithRomeExtension(ObjectNode entry, boolean withExtension) {
+        Preconditions.checkNotNull(entry);
+
+        Activity activity = new Activity();
+        Provider provider = buildProvider(entry);
+        Actor actor = buildActor(entry);
+        ActivityObject activityObject = buildActivityObject(entry);
+
+        activityObject.setUrl(provider.getUrl());
+        activityObject.setAuthor(actor.getAuthor());
+
+        activity.setUrl(provider.getUrl());
+        activity.setProvider(provider);
+        activity.setActor(actor);
+        activity.setVerb("post");
+        activity.setId("id:rss:post:" + activity.getUrl());
+
+        JsonNode published = entry.get("publishedDate");
+        if (published != null) {
+            try {
+                activity.setPublished(RFC3339Utils.parseToUTC(published.textValue()));
+            } catch (Exception e) {
+                LOGGER.warn("Failed to parse date : {}", published.textValue());
+
+                DateTime now = DateTime.now().withZone(DateTimeZone.UTC);
+                activity.setPublished(now);
+            }
+        }
+
+        activity.setUpdated(activityObject.getUpdated());
+        activity.setObject(activityObject);
+
+        if (withExtension) {
+            activity = addRomeExtension(activity, entry);
+        }
+
+        return activity;
+    }
+
+    /**
+     * Given an RSS entry, extra out the author and actor information and return it
+     * in an actor object
+     *
+     * @param entry
+     * @return
+     */
+    private Actor buildActor(ObjectNode entry) {
+        Author author = new Author();
+        Actor actor = new Actor();
+
+        if (entry.get("author") != null) {
+            author.setId(entry.get("author").textValue());
+            author.setDisplayName(entry.get("author").textValue());
+
+            actor.setAuthor(author);
+            String uriToSet = entry.get("rssFeed") != null ? entry.get("rssFeed").asText() : null;
+
+            actor.setId("id:rss:" + uriToSet + ":" + author.getId());
+            actor.setDisplayName(author.getDisplayName());
+        }
+
+        return actor;
+    }
+
+    /**
+     * Given an RSS object, build the ActivityObject
+     *
+     * @param entry
+     * @return
+     */
+    private ActivityObject buildActivityObject(ObjectNode entry) {
+        ActivityObject activityObject = new ActivityObject();
+
+        JsonNode summary = entry.get("description");
+        if (summary != null)
+            activityObject.setSummary(summary.textValue());
+        else if((summary = entry.get("title")) != null) {
+            activityObject.setSummary(summary.textValue());
+        }
+
+        return activityObject;
+    }
+
+    /**
+     * Given an RSS object, build and return the Provider object
+     *
+     * @param entry
+     * @return
+     */
+    private Provider buildProvider(ObjectNode entry) {
+        Provider provider = new Provider();
+
+        String link = null;
+        String uri = null;
+        String resourceLocation = null;
+
+        if (entry.get("link") != null)
+            link = entry.get("link").textValue();
+        if (entry.get("uri") != null)
+            uri = entry.get("uri").textValue();
+
+        /**
+         * Order of precedence for resourceLocation selection
+         *
+         * 1. Valid URI
+         * 2. Valid Link
+         * 3. Non-null URI
+         * 4. Non-null Link
+         */
+        if(isValidResource(uri))
+            resourceLocation = uri;
+        else if(isValidResource(link))
+            resourceLocation = link;
+        else if(uri != null || link != null) {
+            resourceLocation = (uri != null) ? uri : link;
+        }
+
+        provider.setId("id:providers:rss");
+        provider.setUrl(resourceLocation);
+        provider.setDisplayName("RSS");
+
+        return provider;
+    }
+
+    /**
+     * Tests whether or not the passed in resource is a valid URI
+     * @param resource
+     * @return boolean of whether or not the resource is valid
+     */
+    private boolean isValidResource(String resource) {
+        if(resource != null && resource.startsWith("http") || resource.startsWith("www"))
+            return true;
+        return false;
+    }
+
+    /**
+     * Given an RSS object and an existing activity,
+     * add the Rome extension to that activity and return it
+     *
+     * @param activity
+     * @param entry
+     * @return
+     */
+    private Activity addRomeExtension(Activity activity, ObjectNode entry) {
+        ObjectMapper mapper = new StreamsJacksonMapper();
+        ObjectNode activityRoot = mapper.convertValue(activity, ObjectNode.class);
+        ObjectNode extensions = JsonNodeFactory.instance.objectNode();
+
+        extensions.put("rome", entry);
+        activityRoot.put("extensions", extensions);
+
+        activity = mapper.convertValue(activityRoot, Activity.class);
+
+        return activity;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
deleted file mode 100644
index 06839f3..0000000
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.rss.serializer;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.data.util.RFC3339Utils;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.*;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
-
-public class SyndEntryActivitySerializer implements ActivitySerializer<ObjectNode> {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(SyndEntryActivitySerializer.class);
-
-    private boolean includeRomeExtension;
-
-    public SyndEntryActivitySerializer() {
-        this(true);
-    }
-
-    public SyndEntryActivitySerializer(boolean includeRomeExtension) {
-        this.includeRomeExtension = includeRomeExtension;
-    }
-
-
-    @Override
-    public List<Activity> deserializeAll(List<ObjectNode> objectNodes) {
-        List<Activity> result = Lists.newLinkedList();
-        for (ObjectNode node : objectNodes) {
-            result.add(deserialize(node));
-        }
-        return result;
-    }
-
-    @Override
-    public String serializationFormat() {
-        return "application/streams-provider-rss";
-    }
-
-    @Override
-    public ObjectNode serialize(Activity deserialized) {
-        throw new UnsupportedOperationException("Cannot currently serialize to Rome");
-    }
-
-    @Override
-    public Activity deserialize(ObjectNode syndEntry) {
-        return deserializeWithRomeExtension(syndEntry, this.includeRomeExtension);
-    }
-
-    public Activity deserializeWithRomeExtension(ObjectNode entry, boolean withExtension) {
-        Preconditions.checkNotNull(entry);
-
-        Activity activity = new Activity();
-        Provider provider = buildProvider(entry);
-        Actor actor = buildActor(entry);
-        ActivityObject activityObject = buildActivityObject(entry);
-
-        activityObject.setUrl(provider.getUrl());
-        activityObject.setAuthor(actor.getAuthor());
-
-        activity.setUrl(provider.getUrl());
-        activity.setProvider(provider);
-        activity.setActor(actor);
-        activity.setVerb("post");
-        activity.setId("id:rss:post:" + activity.getUrl());
-
-        JsonNode published = entry.get("publishedDate");
-        if (published != null) {
-            try {
-                activity.setPublished(RFC3339Utils.parseToUTC(published.textValue()));
-            } catch (Exception e) {
-                LOGGER.warn("Failed to parse date : {}", published.textValue());
-
-                DateTime now = DateTime.now().withZone(DateTimeZone.UTC);
-                activity.setPublished(now);
-            }
-        }
-
-        activity.setUpdated(activityObject.getUpdated());
-        activity.setObject(activityObject);
-
-        if (withExtension) {
-            activity = addRomeExtension(activity, entry);
-        }
-
-        return activity;
-    }
-
-    /**
-     * Given an RSS entry, extra out the author and actor information and return it
-     * in an actor object
-     *
-     * @param entry
-     * @return
-     */
-    private Actor buildActor(ObjectNode entry) {
-        Author author = new Author();
-        Actor actor = new Actor();
-
-        if (entry.get("author") != null) {
-            author.setId(entry.get("author").textValue());
-            author.setDisplayName(entry.get("author").textValue());
-
-            actor.setAuthor(author);
-            String uriToSet = entry.get("rssFeed") != null ? entry.get("rssFeed").asText() : null;
-
-            actor.setId("id:rss:" + uriToSet + ":" + author.getId());
-            actor.setDisplayName(author.getDisplayName());
-        }
-
-        return actor;
-    }
-
-    /**
-     * Given an RSS object, build the ActivityObject
-     *
-     * @param entry
-     * @return
-     */
-    private ActivityObject buildActivityObject(ObjectNode entry) {
-        ActivityObject activityObject = new ActivityObject();
-
-        JsonNode summary = entry.get("description");
-        if (summary != null)
-            activityObject.setSummary(summary.textValue());
-        else if((summary = entry.get("title")) != null) {
-            activityObject.setSummary(summary.textValue());
-        }
-
-        return activityObject;
-    }
-
-    /**
-     * Given an RSS object, build and return the Provider object
-     *
-     * @param entry
-     * @return
-     */
-    private Provider buildProvider(ObjectNode entry) {
-        Provider provider = new Provider();
-
-        String link = null;
-        String uri = null;
-        String resourceLocation = null;
-
-        if (entry.get("link") != null)
-            link = entry.get("link").textValue();
-        if (entry.get("uri") != null)
-            uri = entry.get("uri").textValue();
-
-        /**
-         * Order of precedence for resourceLocation selection
-         *
-         * 1. Valid URI
-         * 2. Valid Link
-         * 3. Non-null URI
-         * 4. Non-null Link
-         */
-        if(isValidResource(uri))
-            resourceLocation = uri;
-        else if(isValidResource(link))
-            resourceLocation = link;
-        else if(uri != null || link != null) {
-            resourceLocation = (uri != null) ? uri : link;
-        }
-
-        provider.setId("id:providers:rss");
-        provider.setUrl(resourceLocation);
-        provider.setDisplayName("RSS");
-
-        return provider;
-    }
-
-    /**
-     * Tests whether or not the passed in resource is a valid URI
-     * @param resource
-     * @return boolean of whether or not the resource is valid
-     */
-    private boolean isValidResource(String resource) {
-        if(resource != null && resource.startsWith("http") || resource.startsWith("www"))
-            return true;
-        return false;
-    }
-
-    /**
-     * Given an RSS object and an existing activity,
-     * add the Rome extension to that activity and return it
-     *
-     * @param activity
-     * @param entry
-     * @return
-     */
-    private Activity addRomeExtension(Activity activity, ObjectNode entry) {
-        ObjectMapper mapper = new StreamsJacksonMapper();
-        ObjectNode activityRoot = mapper.convertValue(activity, ObjectNode.class);
-        ObjectNode extensions = JsonNodeFactory.instance.objectNode();
-
-        extensions.put("rome", entry);
-        activityRoot.put("extensions", extensions);
-
-        activity = mapper.convertValue(activityRoot, Activity.class);
-
-        return activity;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerizlizerTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerizlizerTest.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerizlizerTest.java
index fd9a996..446f998 100644
--- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerizlizerTest.java
+++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerizlizerTest.java
@@ -26,7 +26,7 @@ import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.Actor;
 import org.apache.streams.pojo.json.Author;
 import org.apache.streams.pojo.json.Provider;
-import org.apache.streams.rss.serializer.SyndEntryActivitySerializer;
+import org.apache.streams.rss.serializer.SyndEntryActivityConverter;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.junit.Test;
@@ -49,7 +49,7 @@ public class SyndEntryActivitySerizlizerTest {
         List<Activity> activities = Lists.newLinkedList();
         List<ObjectNode> objects = Lists.newLinkedList();
 
-        SyndEntryActivitySerializer serializer = new SyndEntryActivitySerializer();
+        SyndEntryActivityConverter serializer = new SyndEntryActivityConverter();
 
         while(scanner.hasNext()) {
             String line = scanner.nextLine();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml
index 3880135..1c0e8cb 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -50,9 +50,8 @@
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
-            <artifactId>streams-processor-jackson</artifactId>
+            <artifactId>streams-converters</artifactId>
             <version>${project.version}</version>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
index 4ca73df..5ad811d 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
@@ -31,7 +31,7 @@ import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.twitter.provider.TwitterConfigurator;
-import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.serializer.TwitterDocumentClassifier;
 import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -111,7 +111,7 @@ public class FetchAndReplaceTwitterProcessor implements StreamsProcessor {
     }
 
     protected void replace(Activity doc, String json) throws java.io.IOException, ActivitySerializerException {
-        Class documentSubType = TwitterEventClassifier.detectClass(json);
+        Class documentSubType = TwitterDocumentClassifier.getInstance().detectClass(json);
         Object object = mapper.readValue(json, documentSubType);
 
         if(documentSubType.equals(Retweet.class) || documentSubType.equals(Tweet.class)) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
index 674eef1..bffdef0 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java
@@ -26,7 +26,7 @@ import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.twitter.pojo.User;
-import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.serializer.TwitterDocumentClassifier;
 import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -90,7 +90,7 @@ public class TwitterProfileProcessor implements StreamsProcessor, Runnable {
                 item = mapper.writeValueAsString((ObjectNode)entry.getDocument());
             }
 
-            Class inClass = TwitterEventClassifier.detectClass(item);
+            Class inClass = TwitterDocumentClassifier.getInstance().detectClass(item);
 
             User user;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
deleted file mode 100644
index 2234739..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.provider;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.pojo.*;
-import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
-import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonUserActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonUserstreameventActivitySerializer;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * Created by sblackmon on 12/13/13.
- */
-public class TwitterEventClassifier implements Serializable {
-
-    private static ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
-
-    public static Class detectClass( String json ) {
-        Preconditions.checkNotNull(json);
-        Preconditions.checkArgument(StringUtils.isNotEmpty(json));
-
-        ObjectNode objectNode;
-        try {
-            objectNode = (ObjectNode) mapper.readTree(json);
-        } catch (IOException e) {
-            e.printStackTrace();
-            return null;
-        }
-
-        if( objectNode.findValue("retweeted_status") != null && objectNode.get("retweeted_status") != null)
-            return Retweet.class;
-        else if( objectNode.findValue("delete") != null )
-            return Delete.class;
-        else if( objectNode.findValue("friends") != null ||
-                objectNode.findValue("friends_str") != null )
-            return FriendList.class;
-        else if( objectNode.findValue("target_object") != null )
-            return UserstreamEvent.class;
-        else if ( objectNode.findValue("location") != null && objectNode.findValue("user") == null)
-            return User.class;
-        else
-            return Tweet.class;
-    }
-    public static ActivitySerializer bestSerializer( String json ) {
-
-        Preconditions.checkNotNull(json);
-        Preconditions.checkArgument(StringUtils.isNotEmpty(json));
-
-        ObjectNode objectNode;
-        try {
-            objectNode = (ObjectNode) mapper.readTree(json);
-        } catch (IOException e) {
-            e.printStackTrace();
-            return null;
-        }
-
-        if( objectNode.findValue("retweeted_status") != null && objectNode.get("retweeted_status") != null)
-            return TwitterJsonRetweetActivitySerializer.getInstance();
-        else if( objectNode.findValue("delete") != null )
-            return TwitterJsonDeleteActivitySerializer.getInstance();
-//        else if( objectNode.findValue("friends") != null ||
-//                objectNode.findValue("friends_str") != null )
-//            return FriendList.class;
-        else if( objectNode.findValue("target_object") != null )
-            return TwitterJsonUserstreameventActivitySerializer.getInstance();
-        else if ( objectNode.findValue("location") != null && objectNode.findValue("user") == null)
-            return TwitterJsonUserActivitySerializer.getInstance();
-        else
-            return TwitterJsonTweetActivitySerializer.getInstance();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index bd67765..45bd071 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.typesafe.config.Config;
+import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
@@ -31,7 +32,6 @@ import org.apache.streams.util.ComponentUtils;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
 import twitter4j.Twitter;
 import twitter4j.TwitterException;
 import twitter4j.TwitterFactory;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterConverterResolver.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterConverterResolver.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterConverterResolver.java
new file mode 100644
index 0000000..f14c9f5
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterConverterResolver.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.serializer;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.data.ActivityConverter;
+import org.apache.streams.data.ActivityConverterResolver;
+import org.apache.streams.data.DocumentClassifier;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.FriendList;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.twitter.pojo.UserstreamEvent;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Created by sblackmon on 12/13/13.
+ */
+public class TwitterConverterResolver implements ActivityConverterResolver {
+
+    public TwitterConverterResolver() {
+
+    }
+
+    private static TwitterConverterResolver instance = new TwitterConverterResolver();
+
+    public static TwitterConverterResolver getInstance() {
+        return instance;
+    }
+
+    private static ObjectMapper mapper = new StreamsJacksonMapper(StreamsTwitterMapper.TWITTER_FORMAT);
+
+    @Override
+    public Class bestSerializer(Class documentClass) throws ActivitySerializerException {
+
+        if (documentClass == Retweet.class)
+            return TwitterJsonRetweetActivityConverter.class;
+        else if (documentClass == Delete.class)
+            return TwitterJsonDeleteActivityConverter.class;
+        else if (documentClass == User.class)
+            return TwitterJsonUserActivityConverter.class;
+        else if (documentClass == UserstreamEvent.class)
+            return TwitterJsonUserstreameventActivityConverter.class;
+        else return TwitterJsonTweetActivityConverter.class;
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterDocumentClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterDocumentClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterDocumentClassifier.java
new file mode 100644
index 0000000..b9ca789
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterDocumentClassifier.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.serializer;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.data.ActivityConverter;
+import org.apache.streams.data.DocumentClassifier;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.pojo.*;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Created by sblackmon on 12/13/13.
+ */
+public class TwitterDocumentClassifier implements DocumentClassifier {
+
+    public TwitterDocumentClassifier() {
+
+    }
+
+    private static TwitterDocumentClassifier instance = new TwitterDocumentClassifier();
+
+    public static TwitterDocumentClassifier getInstance() {
+        return instance;
+    }
+
+    private static ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
+
+    public Class detectClass(Object document) {
+
+        Preconditions.checkNotNull(document);
+        Preconditions.checkArgument(document instanceof String);
+
+        String json = (String)document;
+        Preconditions.checkArgument(StringUtils.isNotEmpty(json));
+
+        ObjectNode objectNode;
+        try {
+            objectNode = (ObjectNode) mapper.readTree(json);
+        } catch (IOException e) {
+            e.printStackTrace();
+            return null;
+        }
+
+        if( objectNode.findValue("retweeted_status") != null && objectNode.get("retweeted_status") != null)
+            return Retweet.class;
+        else if( objectNode.findValue("delete") != null )
+            return Delete.class;
+        else if( objectNode.findValue("friends") != null ||
+                objectNode.findValue("friends_str") != null )
+            return FriendList.class;
+        else if( objectNode.findValue("target_object") != null )
+            return UserstreamEvent.class;
+        else if ( objectNode.findValue("location") != null && objectNode.findValue("user") == null)
+            return User.class;
+        else
+            return Tweet.class;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivityConverter.java
new file mode 100644
index 0000000..3e123b6
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivityConverter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.serializer;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.converter.TypeConverterUtil;
+import org.apache.streams.data.ActivityConverter;
+import org.apache.streams.data.ActivityConverterFactory;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.pojo.json.Activity;
+
+import java.util.List;
+import java.io.Serializable;
+
+/*
+ * Now that we have ActivityConverterProcessor, this shouldn't be neededß
+ */
+@Deprecated
+public class TwitterJsonActivityConverter implements ActivityConverter<String>, Serializable
+{
+
+    public TwitterJsonActivityConverter() {
+
+    }
+
+    private static TwitterJsonActivityConverter instance = new TwitterJsonActivityConverter();
+
+    public static TwitterJsonActivityConverter getInstance() {
+        return instance;
+    }
+
+    @Override
+    public String serializationFormat() {
+        return null;
+    }
+
+    @Override
+    public String serialize(Activity deserialized) throws ActivitySerializerException {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public Activity deserialize(String serialized) throws ActivitySerializerException {
+
+        Class converterClass = TwitterDocumentClassifier.getInstance().detectClass(serialized);
+
+        ActivityConverter converter = ActivityConverterFactory.getInstance(converterClass);
+
+        Object typedObject = TypeConverterUtil.convert(serialized, converterClass);
+
+        Activity activity = converter.deserialize(typedObject);
+
+        if( activity == null )
+            throw new ActivitySerializerException("unrecognized type");
+
+        return activity;
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<String> serializedList) {
+        throw new NotImplementedException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
deleted file mode 100644
index d1f0de9..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.serializer;
-
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.pojo.*;
-import org.apache.streams.twitter.provider.TwitterEventClassifier;
-
-import java.util.List;
-import java.io.Serializable;
-
-public class TwitterJsonActivitySerializer implements ActivitySerializer<String>, Serializable
-{
-
-    public TwitterJsonActivitySerializer() {
-
-    }
-
-    private static TwitterJsonActivitySerializer instance = new TwitterJsonActivitySerializer();
-
-    public static TwitterJsonActivitySerializer getInstance() {
-        return instance;
-    }
-
-    @Override
-    public String serializationFormat() {
-        return null;
-    }
-
-    @Override
-    public String serialize(Activity deserialized) throws ActivitySerializerException {
-        throw new NotImplementedException();
-    }
-
-    @Override
-    public Activity deserialize(String serialized) throws ActivitySerializerException {
-
-        ActivitySerializer serializer = TwitterEventClassifier.bestSerializer(serialized);
-        Activity activity = serializer.deserialize(serialized);
-
-        if( activity == null )
-            throw new ActivitySerializerException("unrecognized type");
-
-        return activity;
-    }
-
-    @Override
-    public List<Activity> deserializeAll(List<String> serializedList) {
-        throw new NotImplementedException();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivityConverter.java
new file mode 100644
index 0000000..8d8da28
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivityConverter.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.serializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivityConverter;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Tweet;
+
+import java.io.Serializable;
+import java.util.List;
+
+import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*;
+
+
+/**
+* Created with IntelliJ IDEA.
+* User: mdelaet
+* Date: 9/30/13
+* Time: 9:24 AM
+* To change this template use File | Settings | File Templates.
+*/
+public class TwitterJsonDeleteActivityConverter implements ActivityConverter<Delete>, Serializable {
+
+    private static TwitterJsonDeleteActivityConverter instance = new TwitterJsonDeleteActivityConverter();
+
+    public static TwitterJsonDeleteActivityConverter getInstance() {
+        return instance;
+    }
+
+    @Override
+    public String serializationFormat() {
+        return null;
+    }
+
+    @Override
+    public Delete serialize(Activity deserialized) throws ActivitySerializerException {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public Activity deserialize(Delete serialized) throws ActivitySerializerException {
+        return null;
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<Delete> serializedList) {
+        return null;
+    }
+
+    public Activity convert(Delete delete) throws ActivitySerializerException {
+
+        Activity activity = new Activity();
+        updateActivity(delete, activity);
+        return activity;
+    }
+
+    public ActivityObject buildTarget(Tweet tweet) {
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
deleted file mode 100644
index b368f71..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.serializer;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Strings;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-import org.apache.streams.pojo.json.Actor;
-import org.apache.streams.twitter.pojo.Delete;
-import org.apache.streams.twitter.pojo.Tweet;
-
-import java.io.Serializable;
-import java.util.List;
-
-import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*;
-
-
-/**
-* Created with IntelliJ IDEA.
-* User: mdelaet
-* Date: 9/30/13
-* Time: 9:24 AM
-* To change this template use File | Settings | File Templates.
-*/
-public class TwitterJsonDeleteActivitySerializer implements ActivitySerializer<String>, Serializable {
-
-    private static TwitterJsonDeleteActivitySerializer instance = new TwitterJsonDeleteActivitySerializer();
-
-    public static TwitterJsonDeleteActivitySerializer getInstance() {
-        return instance;
-    }
-
-    @Override
-    public String serializationFormat() {
-        return null;
-    }
-
-    @Override
-    public String serialize(Activity deserialized) throws ActivitySerializerException {
-        throw new NotImplementedException();
-    }
-
-    @Override
-    public Activity deserialize(String serialized) throws ActivitySerializerException {
-        return null;
-    }
-
-    @Override
-    public List<Activity> deserializeAll(List<String> serializedList) {
-        return null;
-    }
-
-    public Activity convert(ObjectNode event) throws ActivitySerializerException {
-
-        ObjectMapper mapper = StreamsTwitterMapper.getInstance();
-        Delete delete = null;
-        try {
-            delete = mapper.treeToValue(event, Delete.class);
-        } catch (JsonProcessingException e) {
-            e.printStackTrace();
-        }
-
-        Activity activity = new Activity();
-        updateActivity(delete, activity);
-        return activity;
-    }
-
-    public ActivityObject buildTarget(Tweet tweet) {
-        return null;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivityConverter.java
new file mode 100644
index 0000000..4b64932
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivityConverter.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.serializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.streams.data.ActivityConverter;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Retweet;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*;
+
+public class TwitterJsonRetweetActivityConverter implements ActivityConverter<Retweet>, Serializable {
+
+    public TwitterJsonRetweetActivityConverter() {
+
+    }
+
+    private static TwitterJsonRetweetActivityConverter instance = new TwitterJsonRetweetActivityConverter();
+
+    public static TwitterJsonRetweetActivityConverter getInstance() {
+        return instance;
+    }
+
+    @Override
+    public String serializationFormat() {
+        return null;
+    }
+
+    @Override
+    public Retweet serialize(Activity deserialized) throws ActivitySerializerException {
+        return null;
+    }
+
+    @Override
+    public Activity deserialize(Retweet retweet) throws ActivitySerializerException {
+
+        Activity activity = new Activity();
+        updateActivity(retweet, activity);
+
+        return activity;
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<Retweet> serializedList) {
+        List<Activity> result = Lists.newArrayList();
+        for( Retweet item : serializedList ) {
+            try {
+                Activity activity = deserialize(item);
+                result.add(activity);
+            } catch (ActivitySerializerException e) {}
+        }
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
deleted file mode 100644
index 58cb769..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.serializer;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.pojo.Retweet;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-
-import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*;
-
-public class TwitterJsonRetweetActivitySerializer implements ActivitySerializer<String>, Serializable {
-
-    public TwitterJsonRetweetActivitySerializer() {
-
-    }
-
-    private static TwitterJsonRetweetActivitySerializer instance = new TwitterJsonRetweetActivitySerializer();
-
-    public static TwitterJsonRetweetActivitySerializer getInstance() {
-        return instance;
-    }
-
-    @Override
-    public String serializationFormat() {
-        return null;
-    }
-
-    @Override
-    public String serialize(Activity deserialized) throws ActivitySerializerException {
-        return null;
-    }
-
-    @Override
-    public Activity deserialize(String event) throws ActivitySerializerException {
-
-        ObjectMapper mapper = StreamsTwitterMapper.getInstance();
-        Retweet retweet = null;
-        try {
-            retweet = mapper.readValue(event, Retweet.class);
-        } catch (JsonProcessingException e) {
-            e.printStackTrace();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-
-        Activity activity = new Activity();
-        updateActivity(retweet, activity);
-
-        return activity;
-    }
-
-    @Override
-    public List<Activity> deserializeAll(List<String> serializedList) {
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivityConverter.java
new file mode 100644
index 0000000..5cd1075
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivityConverter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.serializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivityConverter;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Tweet;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*;
+
+public class TwitterJsonTweetActivityConverter implements ActivityConverter<Tweet>, Serializable {
+
+    private static TwitterJsonTweetActivityConverter instance = new TwitterJsonTweetActivityConverter();
+
+    public static TwitterJsonTweetActivityConverter getInstance() {
+        return instance;
+    }
+
+    @Override
+    public String serializationFormat() {
+        return null;
+    }
+
+    @Override
+    public Tweet serialize(Activity deserialized) throws ActivitySerializerException {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public Activity deserialize(Tweet tweet) throws ActivitySerializerException {
+
+        Activity activity = new Activity();
+
+        updateActivity(tweet, activity);
+
+        return activity;
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<Tweet> serializedList) {
+        List<Activity> result = Lists.newArrayList();
+        for( Tweet item : serializedList ) {
+            try {
+                Activity activity = deserialize(item);
+                result.add(activity);
+            } catch (ActivitySerializerException e) {}
+        }
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
deleted file mode 100644
index e6fc05f..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.serializer;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.pojo.Tweet;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-
-import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*;
-
-public class TwitterJsonTweetActivitySerializer implements ActivitySerializer<String>, Serializable {
-
-    private static TwitterJsonTweetActivitySerializer instance = new TwitterJsonTweetActivitySerializer();
-
-    public static TwitterJsonTweetActivitySerializer getInstance() {
-        return instance;
-    }
-
-    @Override
-    public String serializationFormat() {
-        return null;
-    }
-
-    @Override
-    public String serialize(Activity deserialized) throws ActivitySerializerException {
-        throw new NotImplementedException();
-    }
-
-    @Override
-    public Activity deserialize(String serialized) throws ActivitySerializerException {
-
-        ObjectMapper mapper = StreamsTwitterMapper.getInstance();
-        Tweet tweet = null;
-        try {
-            tweet = mapper.readValue(serialized, Tweet.class);
-        } catch (JsonProcessingException e) {
-            e.printStackTrace();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-
-        Activity activity = new Activity();
-
-        updateActivity(tweet, activity);
-
-        return activity;
-    }
-
-    @Override
-    public List<Activity> deserializeAll(List<String> serializedList) {
-        return null;
-    }
-}