You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/31 21:24:13 UTC

[1/2] attempting to fix jackson date deserialization setting up a pig runtime test capability

Repository: incubator-streams
Updated Branches:
  refs/heads/springcleaning 5d0f6bc42 -> 9edf1766f


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-runtimes/streams-runtime-pig/src/test/resources/serializertestout.txt
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/resources/serializertestout.txt b/streams-runtimes/streams-runtime-pig/src/test/resources/serializertestout.txt
new file mode 100644
index 0000000..fc036ee
--- /dev/null
+++ b/streams-runtimes/streams-runtime-pig/src/test/resources/serializertestout.txt
@@ -0,0 +1 @@
+159475541894897679	twitter,statuses/user_timeline	1384499359006	{"id":"id:twitter:share:159475541894897679","actor":{"id":"id:twitter:27552112","displayName":"rmedinaflores","attachments":[],"upstreamDuplicates":[],"downstreamDuplicates":[]},"verb":"share","object":{"id":"159470076259602432","objectType":"tweet","attachments":[],"upstreamDuplicates":[],"downstreamDuplicates":[]},"published":{"year":2012,"era":1,"dayOfMonth":17,"dayOfWeek":2,"dayOfYear":17,"weekOfWeekyear":3,"weekyear":2012,"monthOfYear":1,"yearOfEra":2012,"yearOfCentury":12,"centuryOfEra":20,"millisOfSecond":0,"millisOfDay":69706000,"secondOfMinute":46,"secondOfDay":69706,"minuteOfHour":21,"minuteOfDay":1161,"hourOfDay":19,"zone":{"fixed":false,"uncachedZone":{"cachable":true,"fixed":false,"id":"America/Los_Angeles"},"id":"America/Los_Angeles"},"millis":1326856906000,"chronology":{"zone":{"fixed":false,"uncachedZone":{"cachable":true,"fixed":false,"id":"America/Los_Angeles"},"id":"America/Los_Angeles"}},"afterNow":f
 alse,"beforeNow":true,"equalNow":false},"provider":{"id":"id:providers:twitter","attachments":[],"upstreamDuplicates":[],"downstreamDuplicates":[]},"title":"","content":"The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)","url":"http://twitter.com/159475541894897679","links":["http://ti.me/zYyEtD"],"extensions":{"twitter":{"retweeted_status":{"text":"The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)","retweeted":false,"truncated":false,"entities":{"user_mentions":[{"id":245888431,"name":"TIME Moneyland","indices":[106,120],"screen_name":"TIMEMoneyland","id_str":"245888431","additionalProperties":{}}],"hashtags":[],"urls":[{"expanded_url":"http://ti.me/zYyEtD","indices":[80,100],"display_url":"ti.me/zYyEtD","url":"http://t.co/M9UUNvZi","additionalProperties":{}}],"additionalProperties":{"symbols":[]},"symbols":[]},"id":159470076259602432,"sour
 ce":"<a href=\"http://www.hootsuite.com\" rel=\"nofollow\">HootSuite</a>","lang":"en","favorited":false,"possibly_sensitive":false,"created_at":"20120117T190003.000-0800","retweet_count":71,"id_str":"159470076259602432","user":{"location":"","default_profile":false,"statuses_count":70754,"profile_background_tile":true,"lang":"en","profile_link_color":"1B4F89","id":14293310,"protected":false,"favourites_count":59,"profile_text_color":"000000","verified":true,"description":"Breaking news and current events from around the globe. Hosted by TIME staff. Tweet questions to our customer service team @TIMEmag_Service.","contributors_enabled":false,"name":"TIME.com","profile_sidebar_border_color":"000000","profile_background_color":"CC0000","created_at":"20080403T065430.000-0700","default_profile_image":false,"followers_count":5146268,"geo_enabled":false,"profile_image_url_https":"https://pbs.twimg.com/profile_images/1700796190/Picture_24_normal.png","profile_background_image_url":"http://a0
 .twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg","profile_background_image_url_https":"https://si0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg","follow_request_sent":false,"url":"http://t.co/4aYbUuAeSh","utc_offset":-18000,"time_zone":"Eastern Time (US & Canada)","profile_use_background_image":true,"friends_count":742,"profile_sidebar_fill_color":"D9D9D9","screen_name":"TIME","id_str":"14293310","profile_image_url":"http://pbs.twimg.com/profile_images/1700796190/Picture_24_normal.png","is_translator":false,"listed_count":76944,"additionalProperties":{"following":false,"notifications":false,"entities":{"description":{"urls":[]},"url":{"urls":[{"expanded_url":"http://www.time.com","indices":[0,22],"display_url":"time.com","url":"http://t.co/4aYbUuAeSh"}]}},"profile_banner_url":"https://pbs.twimg.com/profile_banners/14293310/1355243462"},"following":false,"notifications":false,"entities":{"description":{"urls"
 :[]},"url":{"urls":[{"expanded_url":"http://www.time.com","indices":[0,22],"display_url":"time.com","url":"http://t.co/4aYbUuAeSh"}]}},"profile_banner_url":"https://pbs.twimg.com/profile_banners/14293310/1355243462"},"additionalProperties":{"geo":null,"favorite_count":14,"place":null},"geo":null,"favorite_count":14,"place":null},"additionalProperties":{"geo":null,"favorite_count":0,"place":null},"text":"RT @TIME: The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)","retweeted":false,"truncated":false,"entities":{"user_mentions":[{"id":14293310,"name":"TIME.com","indices":[3,8],"screen_name":"TIME","id_str":"14293310","additionalProperties":{}},{"id":245888431,"name":"TIME Moneyland","indices":[116,130],"screen_name":"TIMEMoneyland","id_str":"245888431","additionalProperties":{}}],"hashtags":[],"urls":[{"expanded_url":"http://ti.me/zYyEtD","indices":[90,110],"display_url":"ti.me/zYyEtD","url":"http://t.co/M9UUNvZi"
 ,"additionalProperties":{}}],"additionalProperties":{"symbols":[]},"symbols":[]},"id":159475541894897679,"source":"<a href=\"http://twitter.com/download/iphone\" rel=\"nofollow\">Twitter for iPhone</a>","lang":"en","favorited":false,"possibly_sensitive":false,"created_at":"20120117T192146.000-0800","retweet_count":71,"id_str":"159475541894897679","user":{"location":"","default_profile":false,"statuses_count":5053,"profile_background_tile":true,"lang":"en","profile_link_color":"738D84","id":27552112,"protected":false,"favourites_count":52,"profile_text_color":"97CEC9","verified":false,"description":"","contributors_enabled":false,"name":"rafael medina-flores","profile_sidebar_border_color":"A9AC00","profile_background_color":"C5EFE3","created_at":"20090329T182155.000-0700","default_profile_image":false,"followers_count":963,"geo_enabled":true,"profile_image_url_https":"https://pbs.twimg.com/profile_images/2519547938/image_normal.jpg","profile_background_image_url":"http://a0.twimg.co
 m/profile_background_images/167479660/trireme.jpg","profile_background_image_url_https":"https://si0.twimg.com/profile_background_images/167479660/trireme.jpg","follow_request_sent":false,"utc_offset":-25200,"time_zone":"Mountain Time (US & Canada)","profile_use_background_image":true,"friends_count":1800,"profile_sidebar_fill_color":"5C4F3C","screen_name":"rmedinaflores","id_str":"27552112","profile_image_url":"http://pbs.twimg.com/profile_images/2519547938/image_normal.jpg","is_translator":false,"listed_count":50,"additionalProperties":{"following":false,"notifications":false,"entities":{"description":{"urls":[]}}},"following":false,"notifications":false,"entities":{"description":{"urls":[]}}},"geo":null,"favorite_count":0,"place":null},"location":{"id":"id:twitter:159475541894897679","coordinates":null}}}


[2/2] git commit: attempting to fix jackson date deserialization setting up a pig runtime test capability

Posted by sb...@apache.org.
attempting to fix jackson date deserialization
setting up a pig runtime test capability


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9edf1766
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9edf1766
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9edf1766

Branch: refs/heads/springcleaning
Commit: 9edf1766f3f4b11f2353a15c9d76368a522f30e0
Parents: 5d0f6bc
Author: sblackmon <sb...@w2odigital.com>
Authored: Mon Mar 31 14:23:22 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Mon Mar 31 14:23:36 2014 -0500

----------------------------------------------------------------------
 .../streams/urls/LinkUnwinderProcessor.java     | 85 +++++++++++++++-----
 .../streams/urls/TestLinkUnwinderProcessor.java | 30 +++++++
 .../processor/TwitterEventProcessor.java        | 13 ++-
 .../twitter/processor/TwitterTypeConverter.java |  6 +-
 .../serializer/StreamsTwitterMapper.java        | 49 +++++++++++
 .../TwitterJsonActivitySerializer.java          | 31 +------
 .../TwitterJsonDeleteActivitySerializer.java    |  4 +-
 .../TwitterJsonRetweetActivitySerializer.java   |  6 +-
 .../TwitterJsonTweetActivitySerializer.java     |  7 +-
 .../twitter/test/TweetActivitySerDeTest.java    |  8 +-
 .../streams/twitter/test/TweetSerDeTest.java    |  2 +-
 .../ActivityDeserializerException.java          | 27 +++++++
 .../jackson/StreamsDateTimeDeserializer.java    | 27 ++++++-
 .../jackson/StreamsDateTimeSerializer.java      |  6 +-
 .../streams/jackson/StreamsJacksonMapper.java   | 40 +++++++++
 .../streams/jackson/StreamsJacksonModule.java   |  2 +
 streams-runtimes/streams-runtime-local/pom.xml  | 11 +++
 .../test/processors/DoNothingProcessor.java     |  3 +
 streams-runtimes/streams-runtime-pig/pom.xml    | 51 +++++++++++-
 .../streams/pig/StreamsProcessorExec.java       |  6 +-
 .../src/test/java/PigProcessorTest.java         | 32 ++++++++
 .../src/test/java/PigSerializerTest.java        | 40 +++++++++
 .../src/test/resources/pigprocessortest.pig     |  7 ++
 .../src/test/resources/pigserializertest.pig    |  7 ++
 .../src/test/resources/serializertestin.txt     |  1 +
 .../src/test/resources/serializertestout.txt    |  1 +
 26 files changed, 428 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinderProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinderProcessor.java b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinderProcessor.java
index 45ec04d..2496061 100644
--- a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinderProcessor.java
+++ b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinderProcessor.java
@@ -1,15 +1,29 @@
 package org.apache.streams.urls;
 
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
 import com.google.common.collect.Lists;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.jackson.StreamsJacksonModule;
 import org.apache.streams.urls.Link;
 import org.apache.streams.urls.LinkUnwinder;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.pojo.json.Activity;
+import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.*;
 
 /**
@@ -28,7 +42,7 @@ public class LinkUnwinderProcessor implements StreamsProcessor
 
     private final static Logger LOGGER = LoggerFactory.getLogger(LinkUnwinderProcessor.class);
 
-
+    private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
     @Override
     public List<StreamsDatum> process(StreamsDatum entry) {
@@ -37,31 +51,47 @@ public class LinkUnwinderProcessor implements StreamsProcessor
 
         LOGGER.debug("{} processing {}", STREAMS_ID, entry.getDocument().getClass());
 
+        Activity activity;
+
         // get list of shared urls
         if( entry.getDocument() instanceof Activity) {
-            Activity activity = (Activity) entry.getDocument();
-            List<String> inputLinks = activity.getLinks();
-            List<String> outputLinks = Lists.newArrayList();
-            for( String link : inputLinks ) {
-                try {
-                    LinkUnwinder unwinder = new LinkUnwinder((String)link);
-                    unwinder.run();
-                    if( !unwinder.isFailure()) {
-                        outputLinks.add(unwinder.getFinalURL());
-                    }
-                } catch (Exception e) {
-                    //if unwindable drop
-                    LOGGER.debug("Failed to unwind link : {}", link);
-                    LOGGER.debug("Excpetion unwind link : {}", e);
-                }
-            }
-            activity.setLinks(outputLinks);
+            activity = (Activity) entry.getDocument();
+
+            activity.setLinks(unwind(activity.getLinks()));
+
             entry.setDocument(activity);
+
             result.add(entry);
 
             return result;
+        } else if( entry.getDocument() instanceof String ) {
+
+            try {
+                activity = mapper.readValue((String) entry.getDocument(), Activity.class);
+            } catch (Exception e) {
+                e.printStackTrace();
+                LOGGER.warn(e.getMessage());
+                return(Lists.newArrayList(entry));
+            }
+
+            activity.setLinks(unwind(activity.getLinks()));
+
+            try {
+                entry.setDocument(mapper.writeValueAsString(activity));
+            } catch (JsonProcessingException e) {
+                e.printStackTrace();
+                LOGGER.warn(e.getMessage());
+                return(Lists.newArrayList(entry));
+            }
+
+            result.add(entry);
+
+            return result;
+
+        }
+        else {
+            return(Lists.newArrayList(entry));
         }
-        else throw new NotImplementedException();
     }
 
     @Override
@@ -73,4 +103,21 @@ public class LinkUnwinderProcessor implements StreamsProcessor
 
     }
 
+    private List<String> unwind(List<String> inputLinks) {
+        List<String> outputLinks = Lists.newArrayList();
+        for( String link : inputLinks ) {
+            try {
+                LinkUnwinder unwinder = new LinkUnwinder((String)link);
+                unwinder.run();
+                if( !unwinder.isFailure()) {
+                    outputLinks.add(unwinder.getFinalURL());
+                }
+            } catch (Exception e) {
+                //if unwindable drop
+                LOGGER.debug("Failed to unwind link : {}", link);
+                LOGGER.debug("Exception unwinding link : {}", e);
+            }
+        }
+        return outputLinks;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java b/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java
index 94ae2d2..c6ccf24 100644
--- a/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java
+++ b/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java
@@ -24,31 +24,37 @@ public class TestLinkUnwinderProcessor {
     @Test
     public void testActivityLinkUnwinderProcessorBitly() throws Exception{
         testActivityUnwinderHelper(Lists.newArrayList("http://bit.ly/1cX5Rh4"), Lists.newArrayList("http://www.wcgworld.com/"));
+        testStringActivityUnwinderHelper(Lists.newArrayList("http://bit.ly/1cX5Rh4"), Lists.newArrayList("http://www.wcgworld.com/"));
     }
 
     @Test
     public void testActivityLinkUnwinderProcessorGoogle() throws Exception{
         testActivityUnwinderHelper(Lists.newArrayList("http://goo.gl/wSrHDA"), Lists.newArrayList("http://www.wcgworld.com/"));
+        testStringActivityUnwinderHelper(Lists.newArrayList("http://goo.gl/wSrHDA"), Lists.newArrayList("http://www.wcgworld.com/"));
     }
 
     @Test
     public void testActivityLinkUnwinderProcessorOwly() throws Exception{
         testActivityUnwinderHelper(Lists.newArrayList("http://ow.ly/u4Kte"), Lists.newArrayList("http://www.wcgworld.com/"));
+        testStringActivityUnwinderHelper(Lists.newArrayList("http://ow.ly/u4Kte"), Lists.newArrayList("http://www.wcgworld.com/"));
     }
 
     @Test
     public void testActivityLinkUnwinderProcessorGoDaddy() throws Exception{
         testActivityUnwinderHelper(Lists.newArrayList("http://x.co/3yapt"), Lists.newArrayList("http://www.wcgworld.com/"));
+        testStringActivityUnwinderHelper(Lists.newArrayList("http://x.co/3yapt"), Lists.newArrayList("http://www.wcgworld.com/"));
     }
 
     @Test
     public void testActivityLinkUnwinderProcessorMulti() throws Exception{
         testActivityUnwinderHelper(Lists.newArrayList("http://x.co/3yapt", "http://ow.ly/u4Kte", "http://goo.gl/wSrHDA"), Lists.newArrayList("http://www.wcgworld.com/", "http://www.wcgworld.com/", "http://www.wcgworld.com/"));
+        testStringActivityUnwinderHelper(Lists.newArrayList("http://x.co/3yapt", "http://ow.ly/u4Kte", "http://goo.gl/wSrHDA"), Lists.newArrayList("http://www.wcgworld.com/", "http://www.wcgworld.com/", "http://www.wcgworld.com/"));
     }
 
     @Test
     public void testActivityLinkUnwinderProcessorUnwindable() throws Exception{
         testActivityUnwinderHelper(Lists.newArrayList("http://bit.ly/1cX5Rh4", "http://nope@#$%"), Lists.newArrayList("http://www.wcgworld.com/"));
+        testStringActivityUnwinderHelper(Lists.newArrayList("http://bit.ly/1cX5Rh4", "http://nope@#$%"), Lists.newArrayList("http://www.wcgworld.com/"));
     }
 
     public void testActivityUnwinderHelper(List<String> input, List<String> expected) throws Exception{
@@ -73,4 +79,28 @@ public class TestLinkUnwinderProcessor {
         assertEquals(Sets.newHashSet(expected), Sets.newHashSet(resultLinks));
     }
 
+    public void testStringActivityUnwinderHelper(List<String> input, List<String> expected) throws Exception{
+        ObjectMapper mapper = new ObjectMapper();
+        mapper.disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+        mapper.registerModule(new StreamsJacksonModule());
+        Activity activity = new Activity();
+        activity.setLinks(input);
+        String str = mapper.writeValueAsString(activity);
+        StreamsDatum datum = new StreamsDatum(str);
+        LinkUnwinderProcessor processor = new LinkUnwinderProcessor();
+        processor.prepare(null);
+        List<StreamsDatum> result = processor.process(datum);
+        assertNotNull(result);
+        assertEquals(1, result.size());
+        StreamsDatum resultDatum = result.get(0);
+        assertNotNull(resultDatum);
+        assertTrue(resultDatum.getDocument() instanceof String);
+        String resultActivityString = (String) resultDatum.getDocument();
+        Activity resultActivity = mapper.readValue(resultActivityString, Activity.class);
+        assertNotNull(resultActivity.getLinks());
+        List<String> resultLinks = resultActivity.getLinks();
+        assertEquals(expected.size(), resultLinks.size());
+        assertEquals(Sets.newHashSet(expected), Sets.newHashSet(resultLinks));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
index 00c8032..2f2194f 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
@@ -14,10 +14,7 @@ import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.twitter.provider.TwitterEventClassifier;
-import org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
+import org.apache.streams.twitter.serializer.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,7 +32,7 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TwitterEventProcessor.class);
 
-    private ObjectMapper mapper = new ObjectMapper();
+    private ObjectMapper mapper = StreamsTwitterMapper.getInstance();
 
     private BlockingQueue<String> inQueue;
     private Queue<StreamsDatum> outQueue;
@@ -96,15 +93,15 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable {
             if( inClass.equals( Delete.class )) {
                 LOGGER.debug("ACTIVITY DELETE");
                 result = twitterJsonDeleteActivitySerializer.deserialize(
-                        TwitterJsonActivitySerializer.mapper.writeValueAsString(event));
+                        mapper.writeValueAsString(event));
             } else if ( inClass.equals( Retweet.class )) {
                 LOGGER.debug("ACTIVITY RETWEET");
                 result = twitterJsonRetweetActivitySerializer.deserialize(
-                        TwitterJsonActivitySerializer.mapper.writeValueAsString(event));
+                        mapper.writeValueAsString(event));
             } else if ( inClass.equals( Tweet.class )) {
                 LOGGER.debug("ACTIVITY TWEET");
                 result = twitterJsonTweetActivitySerializer.deserialize(
-                        TwitterJsonActivitySerializer.mapper.writeValueAsString(event));
+                        mapper.writeValueAsString(event));
             } else {
                 return null;
             }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
index cc438b1..60f2ae7 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
@@ -69,15 +69,15 @@ public class TwitterTypeConverter implements StreamsProcessor {
             if( inClass.equals( Delete.class )) {
                 LOGGER.debug("ACTIVITY DELETE");
                 result = twitterJsonDeleteActivitySerializer.deserialize(
-                        TwitterJsonActivitySerializer.mapper.writeValueAsString(event));
+                        mapper.writeValueAsString(event));
             } else if ( inClass.equals( Retweet.class )) {
                 LOGGER.debug("ACTIVITY RETWEET");
                 result = twitterJsonRetweetActivitySerializer.deserialize(
-                        TwitterJsonActivitySerializer.mapper.writeValueAsString(event));
+                        mapper.writeValueAsString(event));
             } else if ( inClass.equals( Tweet.class )) {
                 LOGGER.debug("ACTIVITY TWEET");
                 result = twitterJsonTweetActivitySerializer.deserialize(
-                        TwitterJsonActivitySerializer.mapper.writeValueAsString(event));
+                        mapper.writeValueAsString(event));
             } else {
                 return null;
             }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
new file mode 100644
index 0000000..004e174
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
@@ -0,0 +1,49 @@
+package org.apache.streams.twitter.serializer;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.jackson.StreamsJacksonModule;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.io.IOException;
+
+/**
+ * Created by sblackmon on 3/27/14.
+ */
+public class StreamsTwitterMapper extends StreamsJacksonMapper {
+
+    public static final DateTimeFormatter TWITTER_FORMAT = DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss Z yyyy");
+
+    private static final StreamsTwitterMapper INSTANCE = new StreamsTwitterMapper();
+
+    public static StreamsTwitterMapper getInstance(){
+        return INSTANCE;
+    }
+
+    private StreamsTwitterMapper() {
+        super();
+        registerModule(new SimpleModule()
+        {
+            {
+                addDeserializer(DateTime.class, new StdDeserializer<DateTime>(DateTime.class) {
+                    @Override
+                    public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException, JsonProcessingException {
+                        return TWITTER_FORMAT.parseDateTime(jpar.getValueAsString());
+                    }
+                });
+            }
+        });
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
index fceff2c..bfceae0 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
@@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.DeserializationContext;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
 import com.google.common.base.Joiner;
@@ -16,6 +17,7 @@ import com.google.common.collect.Lists;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.data.ActivitySerializer;
 import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.jackson.StreamsJacksonModule;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.Provider;
@@ -42,35 +44,6 @@ public class TwitterJsonActivitySerializer implements ActivitySerializer<String>
 
     }
 
-    public static final DateTimeFormatter TWITTER_FORMAT = DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss Z yyyy");
-    public static final DateTimeFormatter ACTIVITY_FORMAT = ISODateTimeFormat.basicDateTime();
-
-    public static ObjectMapper mapper;
-    static {
-        mapper = new ObjectMapper();
-        mapper.disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
-        mapper.registerModule(new StreamsJacksonModule() {
-            {
-                addDeserializer(DateTime.class, new StdDeserializer<DateTime>(DateTime.class) {
-                    @Override
-                    public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException, JsonProcessingException {
-                        return TWITTER_FORMAT.parseDateTime(jpar.getValueAsString());
-                    }
-                });
-            }
-        });
-        //AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(mapper.getTypeFactory());
-        //mapper.setAnnotationIntrospector(introspector);
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.FALSE);
-        mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, Boolean.TRUE);
-        mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
-
-    }
-
     TwitterJsonTweetActivitySerializer tweetActivitySerializer = new TwitterJsonTweetActivitySerializer();
     TwitterJsonRetweetActivitySerializer retweetActivitySerializer = new TwitterJsonRetweetActivitySerializer();
     TwitterJsonDeleteActivitySerializer deleteActivitySerializer = new TwitterJsonDeleteActivitySerializer();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
index b24bc39..40be0f6 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
@@ -1,6 +1,7 @@
 package org.apache.streams.twitter.serializer;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Strings;
 import org.apache.commons.lang.NotImplementedException;
@@ -47,9 +48,10 @@ public class TwitterJsonDeleteActivitySerializer implements ActivitySerializer<S
 
     public Activity convert(ObjectNode event) throws ActivitySerializerException {
 
+        ObjectMapper mapper = StreamsTwitterMapper.getInstance();
         Delete delete = null;
         try {
-            delete = TwitterJsonActivitySerializer.mapper.treeToValue(event, Delete.class);
+            delete = mapper.treeToValue(event, Delete.class);
         } catch (JsonProcessingException e) {
             e.printStackTrace();
         }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
index 7c08590..51e39b1 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
@@ -1,6 +1,7 @@
 package org.apache.streams.twitter.serializer;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Optional;
 import com.google.common.base.Strings;
@@ -49,9 +50,10 @@ public class TwitterJsonRetweetActivitySerializer implements ActivitySerializer<
     @Override
     public Activity deserialize(String event) throws ActivitySerializerException {
 
+        ObjectMapper mapper = StreamsTwitterMapper.getInstance();
         Retweet retweet = null;
         try {
-            retweet = TwitterJsonActivitySerializer.mapper.readValue(event, Retweet.class);
+            retweet = mapper.readValue(event, Retweet.class);
         } catch (JsonProcessingException e) {
             e.printStackTrace();
         } catch (IOException e) {
@@ -87,7 +89,7 @@ public class TwitterJsonRetweetActivitySerializer implements ActivitySerializer<
         }
         activity.setUrl("http://twitter.com/" + retweet.getIdStr());
         activity.setLinks(TwitterJsonTweetActivitySerializer.getLinks(retweet.getRetweetedStatus()));
-        addTwitterExtension(activity, TwitterJsonActivitySerializer.mapper.convertValue(retweet, ObjectNode.class));
+        addTwitterExtension(activity, mapper.convertValue(retweet, ObjectNode.class));
         addLocationExtension(activity, retweet);
         return activity;
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
index a038792..d258dac 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
@@ -1,6 +1,7 @@
 package org.apache.streams.twitter.serializer;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Optional;
 import com.google.common.base.Strings;
@@ -8,6 +9,7 @@ import com.google.common.collect.Lists;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.data.ActivitySerializer;
 import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.ActivityObject;
 import org.apache.streams.pojo.json.Actor;
@@ -49,9 +51,10 @@ public class TwitterJsonTweetActivitySerializer implements ActivitySerializer<St
     @Override
     public Activity deserialize(String serialized) throws ActivitySerializerException {
 
+        ObjectMapper mapper = StreamsJacksonMapper.getInstance();
         Tweet tweet = null;
         try {
-            tweet = TwitterJsonActivitySerializer.mapper.readValue(serialized, Tweet.class);
+            tweet = mapper.readValue(serialized, Tweet.class);
         } catch (JsonProcessingException e) {
             e.printStackTrace();
         } catch (IOException e) {
@@ -81,7 +84,7 @@ public class TwitterJsonTweetActivitySerializer implements ActivitySerializer<St
         activity.setUrl("http://twitter.com/" + tweet.getIdStr());
         activity.setLinks(getLinks(tweet));
 
-        addTwitterExtension(activity, TwitterJsonActivitySerializer.mapper.convertValue(tweet, ObjectNode.class));
+        addTwitterExtension(activity, mapper.convertValue(tweet, ObjectNode.class));
         addLocationExtension(activity, tweet);
         return activity;
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java
index 494f698..b99a8be 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java
@@ -8,6 +8,7 @@ import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
 import org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer;
 import org.junit.Assert;
 import org.junit.Test;
@@ -33,6 +34,7 @@ import static org.junit.Assert.assertThat;
 public class TweetActivitySerDeTest {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TweetActivitySerDeTest.class);
+    private ObjectMapper mapper = StreamsTwitterMapper.getInstance();
 
     private TwitterJsonActivitySerializer twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
 
@@ -55,7 +57,7 @@ public class TweetActivitySerDeTest {
 
                     Activity activity = twitterJsonActivitySerializer.deserialize(line);
 
-                    String activitystring = TwitterJsonActivitySerializer.mapper.writeValueAsString(activity);
+                    String activitystring = mapper.writeValueAsString(activity);
 
                     LOGGER.info("activity: {}", activitystring);
 
@@ -72,7 +74,7 @@ public class TweetActivitySerDeTest {
 
                         assertEquals(activity.getVerb(), "post");
 
-                        Tweet tweet = TwitterJsonActivitySerializer.mapper.readValue(line, Tweet.class);
+                        Tweet tweet = mapper.readValue(line, Tweet.class);
 
                         if( tweet.getEntities() != null &&
                             tweet.getEntities().getUrls() != null &&
@@ -85,7 +87,7 @@ public class TweetActivitySerDeTest {
 
                     } else if( detected == Retweet.class ) {
 
-                        Retweet retweet = TwitterJsonActivitySerializer.mapper.readValue(line, Retweet.class);
+                        Retweet retweet = mapper.readValue(line, Retweet.class);
 
                         assertThat(retweet.getRetweetedStatus(), is(not(nullValue())));
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
index bc7bcf7..1f6c86d 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
@@ -36,7 +36,7 @@ import static org.junit.Assert.assertThat;
 public class TweetSerDeTest {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TweetSerDeTest.class);
-    private ObjectMapper mapper = TwitterJsonActivitySerializer.mapper;
+    private ObjectMapper mapper = StreamsTwitterMapper.getInstance();
 
     private TwitterJsonActivitySerializer twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-pojo/src/main/java/org/apache/streams/exceptions/ActivityDeserializerException.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/exceptions/ActivityDeserializerException.java b/streams-pojo/src/main/java/org/apache/streams/exceptions/ActivityDeserializerException.java
new file mode 100644
index 0000000..0d5d095
--- /dev/null
+++ b/streams-pojo/src/main/java/org/apache/streams/exceptions/ActivityDeserializerException.java
@@ -0,0 +1,27 @@
+package org.apache.streams.exceptions;
+
+/**
+ * Created by sblackmon on 3/25/14.
+ */
+public class ActivityDeserializerException extends Exception {
+
+    public ActivityDeserializerException() {
+        // TODO Auto-generated constructor stub
+    }
+
+    public ActivityDeserializerException(String message) {
+        super(message);
+        // TODO Auto-generated constructor stub
+    }
+
+    public ActivityDeserializerException(Throwable cause) {
+        super(cause);
+        // TODO Auto-generated constructor stub
+    }
+
+    public ActivityDeserializerException(String message, Throwable cause) {
+        super(message, cause);
+        // TODO Auto-generated constructor stub
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
index 4b3c4d1..f189aa9 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
@@ -8,6 +8,8 @@ import com.fasterxml.jackson.databind.DeserializationContext;
 import com.fasterxml.jackson.databind.SerializerProvider;
 import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
 import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import org.apache.streams.exceptions.ActivityDeserializerException;
+import org.apache.streams.exceptions.ActivitySerializerException;
 import org.joda.time.DateTime;
 import org.joda.time.format.DateTimeFormatter;
 import org.joda.time.format.ISODateTimeFormat;
@@ -26,7 +28,28 @@ public class StreamsDateTimeDeserializer extends StdDeserializer<DateTime> {
     }
 
     @Override
-    public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException, JsonProcessingException {
-        return ACTIVITY_FORMAT.parseDateTime(jpar.getValueAsString());
+    public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException {
+        DateTime result = null;
+
+        try {
+            result = ACTIVITY_FORMAT.parseDateTime(jpar.getText());
+            return result;
+        } catch( Exception e ) {}
+
+        try {
+            result = ACTIVITY_FORMAT.parseDateTime(jpar.getValueAsString());
+            return result;
+        } catch( Exception e ) {}
+
+
+        try {
+            result = jpar.readValueAs(DateTime.class);
+            return result;
+        } catch( Exception e ) {}
+
+        if( result == null )
+            throw new IOException(" could not deserialize " + jpar.toString());
+
+        return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java
index b905bf4..94cf6c3 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java
@@ -22,7 +22,9 @@ public class StreamsDateTimeSerializer extends StdSerializer<DateTime> {
     }
 
     @Override
-    public void serialize(DateTime value, JsonGenerator jgen, SerializerProvider provider) throws IOException, JsonGenerationException {
-        jgen.writeString(ACTIVITY_FORMAT.print(value));
+    public void serialize(DateTime value, JsonGenerator jgen, SerializerProvider provider) throws IOException {
+        long timestamp = value.getMillis();
+        String result = ACTIVITY_FORMAT.print(timestamp);
+        jgen.writeString(result);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
new file mode 100644
index 0000000..7b1c41e
--- /dev/null
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
@@ -0,0 +1,40 @@
+package org.apache.streams.jackson;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+
+/**
+ * Created by sblackmon on 3/27/14.
+ */
+public class StreamsJacksonMapper extends ObjectMapper {
+
+    private static final StreamsJacksonMapper INSTANCE = new StreamsJacksonMapper();
+
+    public static StreamsJacksonMapper getInstance(){
+        return INSTANCE;
+    }
+
+    public StreamsJacksonMapper() {
+        super();
+        registerModule(new StreamsJacksonModule());
+        disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+        configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
+        configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, Boolean.TRUE);
+        configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
+        configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
+        configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.FALSE);
+        configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, Boolean.TRUE);
+        setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
index 8edd963..4c8a5aa 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
@@ -13,4 +13,6 @@ public class StreamsJacksonModule extends SimpleModule {
         addSerializer(DateTime.class, new StreamsDateTimeSerializer(DateTime.class));
         addDeserializer(DateTime.class, new StreamsDateTimeDeserializer(DateTime.class));
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-runtimes/streams-runtime-local/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/pom.xml b/streams-runtimes/streams-runtime-local/pom.xml
index 51be8c4..31e2660 100644
--- a/streams-runtimes/streams-runtime-local/pom.xml
+++ b/streams-runtimes/streams-runtime-local/pom.xml
@@ -115,6 +115,17 @@
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/DoNothingProcessor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/DoNothingProcessor.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/DoNothingProcessor.java
index 25c717b..6c47c41 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/DoNothingProcessor.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/DoNothingProcessor.java
@@ -13,6 +13,9 @@ public class DoNothingProcessor implements StreamsProcessor {
 
     List<StreamsDatum> result;
 
+    public DoNothingProcessor() {
+    }
+
     @Override
     public List<StreamsDatum> process(StreamsDatum entry) {
         this.result = new LinkedList<StreamsDatum>();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-runtimes/streams-runtime-pig/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/pom.xml b/streams-runtimes/streams-runtime-pig/pom.xml
index 4ceaf1d..4cbf488 100644
--- a/streams-runtimes/streams-runtime-pig/pom.xml
+++ b/streams-runtimes/streams-runtime-pig/pom.xml
@@ -41,6 +41,24 @@
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
+            <artifactId>streams-runtime-local</artifactId>
+            <version>0.1-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-runtime-local</artifactId>
+            <version>0.1-SNAPSHOT</version>
+            <scope>test-jar</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-provider-twitter</artifactId>
+            <version>0.1-SNAPSHOT</version>
+            <scope>test-jar</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
             <artifactId>streams-util</artifactId>
             <version>0.1-SNAPSHOT</version>
         </dependency>
@@ -55,7 +73,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
+            <artifactId>hadoop-client</artifactId>
             <version>2.0.0-cdh4.5.0.1-SNAPSHOT</version>
             <scope>provided</scope>
         </dependency>
@@ -65,6 +83,24 @@
             <version>0.11.0-cdh4.5.0.1-SNAPSHOT</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.pig</groupId>
+            <artifactId>pigunit</artifactId>
+            <version>0.11.0-cdh4.5.0.1-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>jline</groupId>
+            <artifactId>jline</artifactId>
+            <version>2.11</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.antlr</groupId>
+            <artifactId>antlr-runtime</artifactId>
+            <version>3.5.2</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
@@ -80,5 +116,18 @@
                 <directory>src/test/resources</directory>
             </testResource>
         </testResources>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
     </build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java
index 4203787..5e58e1e 100644
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java
@@ -1,5 +1,6 @@
 package org.apache.streams.pig;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -30,7 +31,10 @@ public class StreamsProcessorExec extends EvalFunc<DataBag> {
     StreamsProcessor streamsProcessor;
 
     public StreamsProcessorExec(String... execArgs) throws ClassNotFoundException{
+        Preconditions.checkNotNull(execArgs);
+        Preconditions.checkArgument(execArgs.length > 0);
         String classFullName = execArgs[0];
+        Preconditions.checkNotNull(classFullName);
         String[] constructorArgs = new String[execArgs.length-1];
         ArrayUtils.remove(execArgs, 0);
         ArrayUtils.addAll(constructorArgs, execArgs);
@@ -46,7 +50,7 @@ public class StreamsProcessorExec extends EvalFunc<DataBag> {
 
         Configuration conf = UDFContext.getUDFContext().getJobConf();
 
-        Long id = (Long)line.get(0);
+        String id = (String)line.get(0);
         String source = (String)line.get(1);
         Long timestamp = (Long)line.get(2);
         String object = (String)line.get(3);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-runtimes/streams-runtime-pig/src/test/java/PigProcessorTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/PigProcessorTest.java b/streams-runtimes/streams-runtime-pig/src/test/java/PigProcessorTest.java
new file mode 100644
index 0000000..003e49c
--- /dev/null
+++ b/streams-runtimes/streams-runtime-pig/src/test/java/PigProcessorTest.java
@@ -0,0 +1,32 @@
+import org.apache.pig.pigunit.PigTest;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.text.ParseException;
+
+/**
+ * Created by sblackmon on 3/30/14.
+ */
+public class PigProcessorTest {
+
+    @Ignore
+    @Test
+    public void testPigProcessor() throws Exception {
+        String[] args = {};
+
+        String[] input = {
+                "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{\"id\":\"id:twitter:share:159475541894897679\",\"actor\":{\"id\":\"id:twitter:27552112\",\"displayName\":\"rmedinaflores\",\"attachments\":[],\"upstreamDuplicates\":[],\"downstreamDuplicates\":[]},\"verb\":\"share\",\"object\":{\"id\":\"159470076259602432\",\"objectType\":\"tweet\",\"attachments\":[],\"upstreamDuplicates\":[],\"downstreamDuplicates\":[]},\"published\":{\"year\":2012,\"era\":1,\"dayOfMonth\":17,\"dayOfWeek\":2,\"dayOfYear\":17,\"weekOfWeekyear\":3,\"weekyear\":2012,\"monthOfYear\":1,\"yearOfEra\":2012,\"yearOfCentury\":12,\"centuryOfEra\":20,\"millisOfSecond\":0,\"millisOfDay\":69706000,\"secondOfMinute\":46,\"secondOfDay\":69706,\"minuteOfHour\":21,\"minuteOfDay\":1161,\"hourOfDay\":19,\"zone\":{\"fixed\":false,\"uncachedZone\":{\"cachable\":true,\"fixed\":false,\"id\":\"America/Los_Angeles\"},\"id\":\"America/Los_Angeles\"},\"millis\":1326856906000,\"chronology\":{\"zone\":{\"fixed
 \":false,\"uncachedZone\":{\"cachable\":true,\"fixed\":false,\"id\":\"America/Los_Angeles\"},\"id\":\"America/Los_Angeles\"}},\"afterNow\":false,\"beforeNow\":true,\"equalNow\":false},\"provider\":{\"id\":\"id:providers:twitter\",\"attachments\":[],\"upstreamDuplicates\":[],\"downstreamDuplicates\":[]},\"title\":\"\",\"content\":\"The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"url\":\"http://twitter.com/159475541894897679\",\"links\":[\"http://ti.me/zYyEtD\"],\"extensions\":{\"twitter\":{\"retweeted_status\":{\"text\":\"The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"retweeted\":false,\"truncated\":false,\"entities\":{\"user_mentions\":[{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[106,120],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\",\"additionalProperties\":{}}],\"hashtags\":[],\"urls\":[{\"expanded_
 url\":\"http://ti.me/zYyEtD\",\"indices\":[80,100],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\",\"additionalProperties\":{}}],\"additionalProperties\":{\"symbols\":[]},\"symbols\":[]},\"id\":159470076259602432,\"source\":\"<a href=\\\"http://www.hootsuite.com\\\" rel=\\\"nofollow\\\">HootSuite</a>\",\"lang\":\"en\",\"favorited\":false,\"possibly_sensitive\":false,\"created_at\":\"20120117T190003.000-0800\",\"retweet_count\":71,\"id_str\":\"159470076259602432\",\"user\":{\"location\":\"\",\"default_profile\":false,\"statuses_count\":70754,\"profile_background_tile\":true,\"lang\":\"en\",\"profile_link_color\":\"1B4F89\",\"id\":14293310,\"protected\":false,\"favourites_count\":59,\"profile_text_color\":\"000000\",\"verified\":true,\"description\":\"Breaking news and current events from around the globe. Hosted by TIME staff. Tweet questions to our customer service team @TIMEmag_Service.\",\"contributors_enabled\":false,\"name\":\"TIME.com\",\"profile_sidebar_borde
 r_color\":\"000000\",\"profile_background_color\":\"CC0000\",\"created_at\":\"20080403T065430.000-0700\",\"default_profile_image\":false,\"followers_count\":5146268,\"geo_enabled\":false,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/1700796190/Picture_24_normal.png\",\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"profile_background_image_url_https\":\"https://si0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"follow_request_sent\":false,\"url\":\"http://t.co/4aYbUuAeSh\",\"utc_offset\":-18000,\"time_zone\":\"Eastern Time (US & Canada)\",\"profile_use_background_image\":true,\"friends_count\":742,\"profile_sidebar_fill_color\":\"D9D9D9\",\"screen_name\":\"TIME\",\"id_str\":\"14293310\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/1700796190/Picture_24_normal.png\",\"is_translator\":false,\"listed_count\":76944,\"additionalP
 roperties\":{\"following\":false,\"notifications\":false,\"entities\":{\"description\":{\"urls\":[]},\"url\":{\"urls\":[{\"expanded_url\":\"http://www.time.com\",\"indices\":[0,22],\"display_url\":\"time.com\",\"url\":\"http://t.co/4aYbUuAeSh\"}]}},\"profile_banner_url\":\"https://pbs.twimg.com/profile_banners/14293310/1355243462\"},\"following\":false,\"notifications\":false,\"entities\":{\"description\":{\"urls\":[]},\"url\":{\"urls\":[{\"expanded_url\":\"http://www.time.com\",\"indices\":[0,22],\"display_url\":\"time.com\",\"url\":\"http://t.co/4aYbUuAeSh\"}]}},\"profile_banner_url\":\"https://pbs.twimg.com/profile_banners/14293310/1355243462\"},\"additionalProperties\":{\"geo\":null,\"favorite_count\":14,\"place\":null},\"geo\":null,\"favorite_count\":14,\"place\":null},\"additionalProperties\":{\"geo\":null,\"favorite_count\":0,\"place\":null},\"text\":\"RT @TIME: The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMone
 yland)\",\"retweeted\":false,\"truncated\":false,\"entities\":{\"user_mentions\":[{\"id\":14293310,\"name\":\"TIME.com\",\"indices\":[3,8],\"screen_name\":\"TIME\",\"id_str\":\"14293310\",\"additionalProperties\":{}},{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[116,130],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\",\"additionalProperties\":{}}],\"hashtags\":[],\"urls\":[{\"expanded_url\":\"http://ti.me/zYyEtD\",\"indices\":[90,110],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\",\"additionalProperties\":{}}],\"additionalProperties\":{\"symbols\":[]},\"symbols\":[]},\"id\":159475541894897679,\"source\":\"<a href=\\\"http://twitter.com/download/iphone\\\" rel=\\\"nofollow\\\">Twitter for iPhone</a>\",\"lang\":\"en\",\"favorited\":false,\"possibly_sensitive\":false,\"created_at\":\"20120117T192146.000-0800\",\"retweet_count\":71,\"id_str\":\"159475541894897679\",\"user\":{\"location\":\"\",\"default_profile\":false,\"statuses_count\":5053,\"
 profile_background_tile\":true,\"lang\":\"en\",\"profile_link_color\":\"738D84\",\"id\":27552112,\"protected\":false,\"favourites_count\":52,\"profile_text_color\":\"97CEC9\",\"verified\":false,\"description\":\"\",\"contributors_enabled\":false,\"name\":\"rafael medina-flores\",\"profile_sidebar_border_color\":\"A9AC00\",\"profile_background_color\":\"C5EFE3\",\"created_at\":\"20090329T182155.000-0700\",\"default_profile_image\":false,\"followers_count\":963,\"geo_enabled\":true,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"profile_background_image_url_https\":\"https://si0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"follow_request_sent\":false,\"utc_offset\":-25200,\"time_zone\":\"Mountain Time (US & Canada)\",\"profile_use_background_image\":true,\"friends_count\":1800,\"profile_sidebar_fill_color\":\"5C4F3
 C\",\"screen_name\":\"rmedinaflores\",\"id_str\":\"27552112\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"is_translator\":false,\"listed_count\":50,\"additionalProperties\":{\"following\":false,\"notifications\":false,\"entities\":{\"description\":{\"urls\":[]}}},\"following\":false,\"notifications\":false,\"entities\":{\"description\":{\"urls\":[]}}},\"geo\":null,\"favorite_count\":0,\"place\":null},\"location\":{\"id\":\"id:twitter:159475541894897679\",\"coordinates\":null}}}",
+        };
+
+        String[] output = {
+                "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{\"id\":\"id:twitter:share:159475541894897679\",\"actor\":{\"id\":\"id:twitter:27552112\",\"displayName\":\"rmedinaflores\",\"attachments\":[],\"upstreamDuplicates\":[],\"downstreamDuplicates\":[]},\"verb\":\"share\",\"object\":{\"id\":\"159470076259602432\",\"objectType\":\"tweet\",\"attachments\":[],\"upstreamDuplicates\":[],\"downstreamDuplicates\":[]},\"published\":{\"year\":2012,\"era\":1,\"dayOfMonth\":17,\"dayOfWeek\":2,\"dayOfYear\":17,\"weekOfWeekyear\":3,\"weekyear\":2012,\"monthOfYear\":1,\"yearOfEra\":2012,\"yearOfCentury\":12,\"centuryOfEra\":20,\"millisOfSecond\":0,\"millisOfDay\":69706000,\"secondOfMinute\":46,\"secondOfDay\":69706,\"minuteOfHour\":21,\"minuteOfDay\":1161,\"hourOfDay\":19,\"zone\":{\"fixed\":false,\"uncachedZone\":{\"cachable\":true,\"fixed\":false,\"id\":\"America/Los_Angeles\"},\"id\":\"America/Los_Angeles\"},\"millis\":1326856906000,\"chronology\":{\"zone\":{\"fixed
 \":false,\"uncachedZone\":{\"cachable\":true,\"fixed\":false,\"id\":\"America/Los_Angeles\"},\"id\":\"America/Los_Angeles\"}},\"afterNow\":false,\"beforeNow\":true,\"equalNow\":false},\"provider\":{\"id\":\"id:providers:twitter\",\"attachments\":[],\"upstreamDuplicates\":[],\"downstreamDuplicates\":[]},\"title\":\"\",\"content\":\"The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"url\":\"http://twitter.com/159475541894897679\",\"links\":[\"http://ti.me/zYyEtD\"],\"extensions\":{\"twitter\":{\"retweeted_status\":{\"text\":\"The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"retweeted\":false,\"truncated\":false,\"entities\":{\"user_mentions\":[{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[106,120],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\",\"additionalProperties\":{}}],\"hashtags\":[],\"urls\":[{\"expanded_
 url\":\"http://ti.me/zYyEtD\",\"indices\":[80,100],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\",\"additionalProperties\":{}}],\"additionalProperties\":{\"symbols\":[]},\"symbols\":[]},\"id\":159470076259602432,\"source\":\"<a href=\\\"http://www.hootsuite.com\\\" rel=\\\"nofollow\\\">HootSuite</a>\",\"lang\":\"en\",\"favorited\":false,\"possibly_sensitive\":false,\"created_at\":\"20120117T190003.000-0800\",\"retweet_count\":71,\"id_str\":\"159470076259602432\",\"user\":{\"location\":\"\",\"default_profile\":false,\"statuses_count\":70754,\"profile_background_tile\":true,\"lang\":\"en\",\"profile_link_color\":\"1B4F89\",\"id\":14293310,\"protected\":false,\"favourites_count\":59,\"profile_text_color\":\"000000\",\"verified\":true,\"description\":\"Breaking news and current events from around the globe. Hosted by TIME staff. Tweet questions to our customer service team @TIMEmag_Service.\",\"contributors_enabled\":false,\"name\":\"TIME.com\",\"profile_sidebar_borde
 r_color\":\"000000\",\"profile_background_color\":\"CC0000\",\"created_at\":\"20080403T065430.000-0700\",\"default_profile_image\":false,\"followers_count\":5146268,\"geo_enabled\":false,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/1700796190/Picture_24_normal.png\",\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"profile_background_image_url_https\":\"https://si0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"follow_request_sent\":false,\"url\":\"http://t.co/4aYbUuAeSh\",\"utc_offset\":-18000,\"time_zone\":\"Eastern Time (US & Canada)\",\"profile_use_background_image\":true,\"friends_count\":742,\"profile_sidebar_fill_color\":\"D9D9D9\",\"screen_name\":\"TIME\",\"id_str\":\"14293310\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/1700796190/Picture_24_normal.png\",\"is_translator\":false,\"listed_count\":76944,\"additionalP
 roperties\":{\"following\":false,\"notifications\":false,\"entities\":{\"description\":{\"urls\":[]},\"url\":{\"urls\":[{\"expanded_url\":\"http://www.time.com\",\"indices\":[0,22],\"display_url\":\"time.com\",\"url\":\"http://t.co/4aYbUuAeSh\"}]}},\"profile_banner_url\":\"https://pbs.twimg.com/profile_banners/14293310/1355243462\"},\"following\":false,\"notifications\":false,\"entities\":{\"description\":{\"urls\":[]},\"url\":{\"urls\":[{\"expanded_url\":\"http://www.time.com\",\"indices\":[0,22],\"display_url\":\"time.com\",\"url\":\"http://t.co/4aYbUuAeSh\"}]}},\"profile_banner_url\":\"https://pbs.twimg.com/profile_banners/14293310/1355243462\"},\"additionalProperties\":{\"geo\":null,\"favorite_count\":14,\"place\":null},\"geo\":null,\"favorite_count\":14,\"place\":null},\"additionalProperties\":{\"geo\":null,\"favorite_count\":0,\"place\":null},\"text\":\"RT @TIME: The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMone
 yland)\",\"retweeted\":false,\"truncated\":false,\"entities\":{\"user_mentions\":[{\"id\":14293310,\"name\":\"TIME.com\",\"indices\":[3,8],\"screen_name\":\"TIME\",\"id_str\":\"14293310\",\"additionalProperties\":{}},{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[116,130],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\",\"additionalProperties\":{}}],\"hashtags\":[],\"urls\":[{\"expanded_url\":\"http://ti.me/zYyEtD\",\"indices\":[90,110],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\",\"additionalProperties\":{}}],\"additionalProperties\":{\"symbols\":[]},\"symbols\":[]},\"id\":159475541894897679,\"source\":\"<a href=\\\"http://twitter.com/download/iphone\\\" rel=\\\"nofollow\\\">Twitter for iPhone</a>\",\"lang\":\"en\",\"favorited\":false,\"possibly_sensitive\":false,\"created_at\":\"20120117T192146.000-0800\",\"retweet_count\":71,\"id_str\":\"159475541894897679\",\"user\":{\"location\":\"\",\"default_profile\":false,\"statuses_count\":5053,\"
 profile_background_tile\":true,\"lang\":\"en\",\"profile_link_color\":\"738D84\",\"id\":27552112,\"protected\":false,\"favourites_count\":52,\"profile_text_color\":\"97CEC9\",\"verified\":false,\"description\":\"\",\"contributors_enabled\":false,\"name\":\"rafael medina-flores\",\"profile_sidebar_border_color\":\"A9AC00\",\"profile_background_color\":\"C5EFE3\",\"created_at\":\"20090329T182155.000-0700\",\"default_profile_image\":false,\"followers_count\":963,\"geo_enabled\":true,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"profile_background_image_url_https\":\"https://si0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"follow_request_sent\":false,\"utc_offset\":-25200,\"time_zone\":\"Mountain Time (US & Canada)\",\"profile_use_background_image\":true,\"friends_count\":1800,\"profile_sidebar_fill_color\":\"5C4F3
 C\",\"screen_name\":\"rmedinaflores\",\"id_str\":\"27552112\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"is_translator\":false,\"listed_count\":50,\"additionalProperties\":{\"following\":false,\"notifications\":false,\"entities\":{\"description\":{\"urls\":[]}}},\"following\":false,\"notifications\":false,\"entities\":{\"description\":{\"urls\":[]}}},\"geo\":null,\"favorite_count\":0,\"place\":null},\"location\":{\"id\":\"id:twitter:159475541894897679\",\"coordinates\":null}}}",
+        };
+
+        PigTest test;
+        test = new PigTest("src/test/resources/pigprocessortest.pig", args);
+        test.assertOutput("activities", input, "result", output);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-runtimes/streams-runtime-pig/src/test/java/PigSerializerTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/PigSerializerTest.java b/streams-runtimes/streams-runtime-pig/src/test/java/PigSerializerTest.java
new file mode 100644
index 0000000..ffd8e75
--- /dev/null
+++ b/streams-runtimes/streams-runtime-pig/src/test/java/PigSerializerTest.java
@@ -0,0 +1,40 @@
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.pigunit.PigTest;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Iterator;
+
+/**
+ * Created by sblackmon on 3/30/14.
+ */
+public class PigSerializerTest {
+
+    @Ignore
+    @Test
+    public void testPigSerializer() throws Exception {
+        String[] args = {};
+
+        String input =
+                "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{\"retweeted_status\":{\"contributors\":null,\"text\":\"The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"possibly_sensitive\":false,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[{\"expanded_url\":\"http://ti.me/zYyEtD\",\"indices\":[80,100],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\"}],\"hashtags\":[],\"user_mentions\":[{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[106,120],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\"}]},\"in_reply_to_status_id_str\":null,\"id\":159470076259602432,\"source\":\"<a href=\\\"http://www.hootsuite.com\\\" rel=\\\"nofollow\\\">HootSuite<\\/a>\",\"in_reply_to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retweet_count\":71,\"create
 d_at\":\"Wed Jan 18 03:00:03 +0000 2012\",\"in_reply_to_user_id\":null,\"favorite_count\":14,\"id_str\":\"159470076259602432\",\"place\":null,\"user\":{\"location\":\"\",\"default_profile\":false,\"profile_background_tile\":true,\"statuses_count\":70754,\"lang\":\"en\",\"profile_link_color\":\"1B4F89\",\"profile_banner_url\":\"https://pbs.twimg.com/profile_banners/14293310/1355243462\",\"id\":14293310,\"following\":false,\"protected\":false,\"favourites_count\":59,\"profile_text_color\":\"000000\",\"description\":\"Breaking news and current events from around the globe. Hosted by TIME staff. Tweet questions to our customer service team @TIMEmag_Service.\",\"verified\":true,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"000000\",\"name\":\"TIME.com\",\"profile_background_color\":\"CC0000\",\"created_at\":\"Thu Apr 03 13:54:30 +0000 2008\",\"default_profile_image\":false,\"followers_count\":5146268,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/1
 700796190/Picture_24_normal.png\",\"geo_enabled\":false,\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"profile_background_image_url_https\":\"https://si0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]},\"url\":{\"urls\":[{\"expanded_url\":\"http://www.time.com\",\"indices\":[0,22],\"display_url\":\"time.com\",\"url\":\"http://t.co/4aYbUuAeSh\"}]}},\"url\":\"http://t.co/4aYbUuAeSh\",\"utc_offset\":-18000,\"time_zone\":\"Eastern Time (US & Canada)\",\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":742,\"profile_sidebar_fill_color\":\"D9D9D9\",\"screen_name\":\"TIME\",\"id_str\":\"14293310\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/1700796190/Picture_24_normal.png\",\"listed_count\":76944,\"is_translator\":false},\"coordinates\":null},\"contr
 ibutors\":null,\"text\":\"RT @TIME: The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"possibly_sensitive\":false,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[{\"expanded_url\":\"http://ti.me/zYyEtD\",\"indices\":[90,110],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\"}],\"hashtags\":[],\"user_mentions\":[{\"id\":14293310,\"name\":\"TIME.com\",\"indices\":[3,8],\"screen_name\":\"TIME\",\"id_str\":\"14293310\"},{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[116,130],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\"}]},\"in_reply_to_status_id_str\":null,\"id\":159475541894897679,\"source\":\"<a href=\\\"http://twitter.com/download/iphone\\\" rel=\\\"nofollow\\\">Twitter for iPhone<\\/a>\",\"in_reply_to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retwe
 et_count\":71,\"created_at\":\"Wed Jan 18 03:21:46 +0000 2012\",\"in_reply_to_user_id\":null,\"favorite_count\":0,\"id_str\":\"159475541894897679\",\"place\":null,\"user\":{\"location\":\"\",\"default_profile\":false,\"profile_background_tile\":true,\"statuses_count\":5053,\"lang\":\"en\",\"profile_link_color\":\"738D84\",\"id\":27552112,\"following\":false,\"protected\":false,\"favourites_count\":52,\"profile_text_color\":\"97CEC9\",\"description\":\"\",\"verified\":false,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"A9AC00\",\"name\":\"rafael medina-flores\",\"profile_background_color\":\"C5EFE3\",\"created_at\":\"Mon Mar 30 01:21:55 +0000 2009\",\"default_profile_image\":false,\"followers_count\":963,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"geo_enabled\":true,\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"profile_background_image_url_https\":\"
 https://si0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]}},\"url\":null,\"utc_offset\":-25200,\"time_zone\":\"Mountain Time (US & Canada)\",\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":1800,\"profile_sidebar_fill_color\":\"5C4F3C\",\"screen_name\":\"rmedinaflores\",\"id_str\":\"27552112\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"listed_count\":50,\"is_translator\":false},\"coordinates\":null}"
+        ;
+
+        String[] output = {
+                "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{\"id\":\"id:twitter:share:159475541894897679\",\"actor\":{\"id\":\"id:twitter:27552112\",\"displayName\":\"rmedinaflores\",\"attachments\":[],\"upstreamDuplicates\":[],\"downstreamDuplicates\":[]},\"verb\":\"share\",\"object\":{\"id\":\"159470076259602432\",\"objectType\":\"tweet\",\"attachments\":[],\"upstreamDuplicates\":[],\"downstreamDuplicates\":[]},\"published\":{\"year\":2012,\"era\":1,\"dayOfMonth\":17,\"dayOfWeek\":2,\"dayOfYear\":17,\"weekOfWeekyear\":3,\"weekyear\":2012,\"monthOfYear\":1,\"yearOfEra\":2012,\"yearOfCentury\":12,\"centuryOfEra\":20,\"millisOfSecond\":0,\"millisOfDay\":69706000,\"secondOfMinute\":46,\"secondOfDay\":69706,\"minuteOfHour\":21,\"minuteOfDay\":1161,\"hourOfDay\":19,\"zone\":{\"fixed\":false,\"uncachedZone\":{\"cachable\":true,\"fixed\":false,\"id\":\"America/Los_Angeles\"},\"id\":\"America/Los_Angeles\"},\"millis\":1326856906000,\"chronology\":{\"zone\":{\"fixed
 \":false,\"uncachedZone\":{\"cachable\":true,\"fixed\":false,\"id\":\"America/Los_Angeles\"},\"id\":\"America/Los_Angeles\"}},\"afterNow\":false,\"beforeNow\":true,\"equalNow\":false},\"provider\":{\"id\":\"id:providers:twitter\",\"attachments\":[],\"upstreamDuplicates\":[],\"downstreamDuplicates\":[]},\"title\":\"\",\"content\":\"The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"url\":\"http://twitter.com/159475541894897679\",\"links\":[\"http://business.time.com/2012/01/17/italy-cruise-disaster-a-disaster-for-the-entire-cruise-industry/\"],\"extensions\":{\"twitter\":{\"retweeted_status\":{\"text\":\"The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"retweeted\":false,\"truncated\":false,\"entities\":{\"user_mentions\":[{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[106,120],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\
 "245888431\",\"additionalProperties\":{}}],\"hashtags\":[],\"urls\":[{\"expanded_url\":\"http://ti.me/zYyEtD\",\"indices\":[80,100],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\",\"additionalProperties\":{}}],\"additionalProperties\":{\"symbols\":[]},\"symbols\":[]},\"id\":159470076259602432,\"source\":\"<a href=\\\"http://www.hootsuite.com\\\" rel=\\\"nofollow\\\">HootSuite</a>\",\"lang\":\"en\",\"favorited\":false,\"possibly_sensitive\":false,\"created_at\":\"20120117T190003.000-0800\",\"retweet_count\":71,\"id_str\":\"159470076259602432\",\"user\":{\"location\":\"\",\"default_profile\":false,\"statuses_count\":70754,\"profile_background_tile\":true,\"lang\":\"en\",\"profile_link_color\":\"1B4F89\",\"id\":14293310,\"protected\":false,\"favourites_count\":59,\"profile_text_color\":\"000000\",\"verified\":true,\"description\":\"Breaking news and current events from around the globe. Hosted by TIME staff. Tweet questions to our customer service team @TIMEmag_Servic
 e.\",\"contributors_enabled\":false,\"name\":\"TIME.com\",\"profile_sidebar_border_color\":\"000000\",\"profile_background_color\":\"CC0000\",\"created_at\":\"20080403T065430.000-0700\",\"default_profile_image\":false,\"followers_count\":5146268,\"geo_enabled\":false,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/1700796190/Picture_24_normal.png\",\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"profile_background_image_url_https\":\"https://si0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"follow_request_sent\":false,\"url\":\"http://t.co/4aYbUuAeSh\",\"utc_offset\":-18000,\"time_zone\":\"Eastern Time (US & Canada)\",\"profile_use_background_image\":true,\"friends_count\":742,\"profile_sidebar_fill_color\":\"D9D9D9\",\"screen_name\":\"TIME\",\"id_str\":\"14293310\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/1700796190/Pic
 ture_24_normal.png\",\"is_translator\":false,\"listed_count\":76944,\"additionalProperties\":{\"following\":false,\"notifications\":false,\"entities\":{\"description\":{\"urls\":[]},\"url\":{\"urls\":[{\"expanded_url\":\"http://www.time.com\",\"indices\":[0,22],\"display_url\":\"time.com\",\"url\":\"http://t.co/4aYbUuAeSh\"}]}},\"profile_banner_url\":\"https://pbs.twimg.com/profile_banners/14293310/1355243462\"},\"following\":false,\"notifications\":false,\"entities\":{\"description\":{\"urls\":[]},\"url\":{\"urls\":[{\"expanded_url\":\"http://www.time.com\",\"indices\":[0,22],\"display_url\":\"time.com\",\"url\":\"http://t.co/4aYbUuAeSh\"}]}},\"profile_banner_url\":\"https://pbs.twimg.com/profile_banners/14293310/1355243462\"},\"additionalProperties\":{\"geo\":null,\"favorite_count\":14,\"place\":null},\"geo\":null,\"favorite_count\":14,\"place\":null},\"additionalProperties\":{\"geo\":null,\"favorite_count\":0,\"place\":null},\"text\":\"RT @TIME: The Costa Concordia cruise ship ac
 cident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"retweeted\":false,\"truncated\":false,\"entities\":{\"user_mentions\":[{\"id\":14293310,\"name\":\"TIME.com\",\"indices\":[3,8],\"screen_name\":\"TIME\",\"id_str\":\"14293310\",\"additionalProperties\":{}},{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[116,130],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\",\"additionalProperties\":{}}],\"hashtags\":[],\"urls\":[{\"expanded_url\":\"http://ti.me/zYyEtD\",\"indices\":[90,110],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\",\"additionalProperties\":{}}],\"additionalProperties\":{\"symbols\":[]},\"symbols\":[]},\"id\":159475541894897679,\"source\":\"<a href=\\\"http://twitter.com/download/iphone\\\" rel=\\\"nofollow\\\">Twitter for iPhone</a>\",\"lang\":\"en\",\"favorited\":false,\"possibly_sensitive\":false,\"created_at\":\"20120117T192146.000-0800\",\"retweet_count\":71,\"id_str\":\"159475541894897679\"
 ,\"user\":{\"location\":\"\",\"default_profile\":false,\"statuses_count\":5053,\"profile_background_tile\":true,\"lang\":\"en\",\"profile_link_color\":\"738D84\",\"id\":27552112,\"protected\":false,\"favourites_count\":52,\"profile_text_color\":\"97CEC9\",\"verified\":false,\"description\":\"\",\"contributors_enabled\":false,\"name\":\"rafael medina-flores\",\"profile_sidebar_border_color\":\"A9AC00\",\"profile_background_color\":\"C5EFE3\",\"created_at\":\"20090329T182155.000-0700\",\"default_profile_image\":false,\"followers_count\":963,\"geo_enabled\":true,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"profile_background_image_url_https\":\"https://si0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"follow_request_sent\":false,\"utc_offset\":-25200,\"time_zone\":\"Mountain Time (US & Canada)\",\"profile_use_back
 ground_image\":true,\"friends_count\":1800,\"profile_sidebar_fill_color\":\"5C4F3C\",\"screen_name\":\"rmedinaflores\",\"id_str\":\"27552112\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"is_translator\":false,\"listed_count\":50,\"additionalProperties\":{\"following\":false,\"notifications\":false,\"entities\":{\"description\":{\"urls\":[]}}},\"following\":false,\"notifications\":false,\"entities\":{\"description\":{\"urls\":[]}}},\"geo\":null,\"favorite_count\":0,\"place\":null},\"location\":{\"id\":\"id:twitter:159475541894897679\",\"coordinates\":null}}}",
+        };
+
+        //File output = new File("src/test/resources/serializertestout.txt");
+
+        PigTest test;
+        test = new PigTest("src/test/resources/pigserializertest.pig", args);
+        test.runScript();
+        Iterator<Tuple> inIterator = test.getAlias("tweets");
+        assert(ImmutableList.copyOf(inIterator)).size() == 1;
+        test.assertOutput("activities", output);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessortest.pig
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessortest.pig b/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessortest.pig
new file mode 100644
index 0000000..6b1511a
--- /dev/null
+++ b/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessortest.pig
@@ -0,0 +1,7 @@
+DEFINE UNWINDER org.apache.streams.pig.StreamsProcessorExec('org.apache.streams.local.test.processors.DoNothingProcessor');
+
+activities = LOAD '*' USING PigStorage('\t') AS (activityid: chararray, source: chararray, timestamp: long, object: chararray);
+
+unwound = FOREACH activities GENERATE UNWINDER(activityid, source, timestamp, object);
+
+result = FILTER activities BY $3 IS NOT NULL;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-runtimes/streams-runtime-pig/src/test/resources/pigserializertest.pig
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/resources/pigserializertest.pig b/streams-runtimes/streams-runtime-pig/src/test/resources/pigserializertest.pig
new file mode 100644
index 0000000..e1820d9
--- /dev/null
+++ b/streams-runtimes/streams-runtime-pig/src/test/resources/pigserializertest.pig
@@ -0,0 +1,7 @@
+DEFINE SERIALIZER org.apache.streams.pig.StreamsSerializerExec('org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer');
+
+tweets = LOAD 'src/main/resources/serializertestin.txt' USING PigStorage('\t') AS (activityid: chararray, source: chararray, timestamp: long, object: chararray);
+
+activities = FOREACH tweets GENERATE activityid, source, timestamp, SERIALIZER(object);
+
+STORE activities INTO 'target/tweets-activities';
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-runtimes/streams-runtime-pig/src/test/resources/serializertestin.txt
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/resources/serializertestin.txt b/streams-runtimes/streams-runtime-pig/src/test/resources/serializertestin.txt
new file mode 100644
index 0000000..7a20515
--- /dev/null
+++ b/streams-runtimes/streams-runtime-pig/src/test/resources/serializertestin.txt
@@ -0,0 +1 @@
+159475541894897679	twitter,statuses/user_timeline	1384499359006	{"retweeted_status":{"contributors":null,"text":"The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)","geo":null,"retweeted":false,"in_reply_to_screen_name":null,"possibly_sensitive":false,"truncated":false,"lang":"en","entities":{"symbols":[],"urls":[{"expanded_url":"http://ti.me/zYyEtD","indices":[80,100],"display_url":"ti.me/zYyEtD","url":"http://t.co/M9UUNvZi"}],"hashtags":[],"user_mentions":[{"id":245888431,"name":"TIME Moneyland","indices":[106,120],"screen_name":"TIMEMoneyland","id_str":"245888431"}]},"in_reply_to_status_id_str":null,"id":159470076259602432,"source":"<a href=\"http://www.hootsuite.com\" rel=\"nofollow\">HootSuite<\/a>","in_reply_to_user_id_str":null,"favorited":false,"in_reply_to_status_id":null,"retweet_count":71,"created_at":"Wed Jan 18 03:00:03 +0000 2012","in_reply_to_user_id":null,"favorite_count":14,"id_str":"159470076259
 602432","place":null,"user":{"location":"","default_profile":false,"profile_background_tile":true,"statuses_count":70754,"lang":"en","profile_link_color":"1B4F89","profile_banner_url":"https://pbs.twimg.com/profile_banners/14293310/1355243462","id":14293310,"following":false,"protected":false,"favourites_count":59,"profile_text_color":"000000","description":"Breaking news and current events from around the globe. Hosted by TIME staff. Tweet questions to our customer service team @TIMEmag_Service.","verified":true,"contributors_enabled":false,"profile_sidebar_border_color":"000000","name":"TIME.com","profile_background_color":"CC0000","created_at":"Thu Apr 03 13:54:30 +0000 2008","default_profile_image":false,"followers_count":5146268,"profile_image_url_https":"https://pbs.twimg.com/profile_images/1700796190/Picture_24_normal.png","geo_enabled":false,"profile_background_image_url":"http://a0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg","profile
 _background_image_url_https":"https://si0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg","follow_request_sent":false,"entities":{"description":{"urls":[]},"url":{"urls":[{"expanded_url":"http://www.time.com","indices":[0,22],"display_url":"time.com","url":"http://t.co/4aYbUuAeSh"}]}},"url":"http://t.co/4aYbUuAeSh","utc_offset":-18000,"time_zone":"Eastern Time (US & Canada)","notifications":false,"profile_use_background_image":true,"friends_count":742,"profile_sidebar_fill_color":"D9D9D9","screen_name":"TIME","id_str":"14293310","profile_image_url":"http://pbs.twimg.com/profile_images/1700796190/Picture_24_normal.png","listed_count":76944,"is_translator":false},"coordinates":null},"contributors":null,"text":"RT @TIME: The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)","geo":null,"retweeted":false,"in_reply_to_screen_name":null,"possibly_sensitive":false,"truncated":false,"lan
 g":"en","entities":{"symbols":[],"urls":[{"expanded_url":"http://ti.me/zYyEtD","indices":[90,110],"display_url":"ti.me/zYyEtD","url":"http://t.co/M9UUNvZi"}],"hashtags":[],"user_mentions":[{"id":14293310,"name":"TIME.com","indices":[3,8],"screen_name":"TIME","id_str":"14293310"},{"id":245888431,"name":"TIME Moneyland","indices":[116,130],"screen_name":"TIMEMoneyland","id_str":"245888431"}]},"in_reply_to_status_id_str":null,"id":159475541894897679,"source":"<a href=\"http://twitter.com/download/iphone\" rel=\"nofollow\">Twitter for iPhone<\/a>","in_reply_to_user_id_str":null,"favorited":false,"in_reply_to_status_id":null,"retweet_count":71,"created_at":"Wed Jan 18 03:21:46 +0000 2012","in_reply_to_user_id":null,"favorite_count":0,"id_str":"159475541894897679","place":null,"user":{"location":"","default_profile":false,"profile_background_tile":true,"statuses_count":5053,"lang":"en","profile_link_color":"738D84","id":27552112,"following":false,"protected":false,"favourites_count":52,"p
 rofile_text_color":"97CEC9","description":"","verified":false,"contributors_enabled":false,"profile_sidebar_border_color":"A9AC00","name":"rafael medina-flores","profile_background_color":"C5EFE3","created_at":"Mon Mar 30 01:21:55 +0000 2009","default_profile_image":false,"followers_count":963,"profile_image_url_https":"https://pbs.twimg.com/profile_images/2519547938/image_normal.jpg","geo_enabled":true,"profile_background_image_url":"http://a0.twimg.com/profile_background_images/167479660/trireme.jpg","profile_background_image_url_https":"https://si0.twimg.com/profile_background_images/167479660/trireme.jpg","follow_request_sent":false,"entities":{"description":{"urls":[]}},"url":null,"utc_offset":-25200,"time_zone":"Mountain Time (US & Canada)","notifications":false,"profile_use_background_image":true,"friends_count":1800,"profile_sidebar_fill_color":"5C4F3C","screen_name":"rmedinaflores","id_str":"27552112","profile_image_url":"http://pbs.twimg.com/profile_images/2519547938/image
 _normal.jpg","listed_count":50,"is_translator":false},"coordinates":null}