You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/31 21:24:13 UTC
[1/2] attempting to fix jackson date deserialization setting up a pig
runtime test capability
Repository: incubator-streams
Updated Branches:
refs/heads/springcleaning 5d0f6bc42 -> 9edf1766f
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-runtimes/streams-runtime-pig/src/test/resources/serializertestout.txt
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/resources/serializertestout.txt b/streams-runtimes/streams-runtime-pig/src/test/resources/serializertestout.txt
new file mode 100644
index 0000000..fc036ee
--- /dev/null
+++ b/streams-runtimes/streams-runtime-pig/src/test/resources/serializertestout.txt
@@ -0,0 +1 @@
+159475541894897679 twitter,statuses/user_timeline 1384499359006 {"id":"id:twitter:share:159475541894897679","actor":{"id":"id:twitter:27552112","displayName":"rmedinaflores","attachments":[],"upstreamDuplicates":[],"downstreamDuplicates":[]},"verb":"share","object":{"id":"159470076259602432","objectType":"tweet","attachments":[],"upstreamDuplicates":[],"downstreamDuplicates":[]},"published":{"year":2012,"era":1,"dayOfMonth":17,"dayOfWeek":2,"dayOfYear":17,"weekOfWeekyear":3,"weekyear":2012,"monthOfYear":1,"yearOfEra":2012,"yearOfCentury":12,"centuryOfEra":20,"millisOfSecond":0,"millisOfDay":69706000,"secondOfMinute":46,"secondOfDay":69706,"minuteOfHour":21,"minuteOfDay":1161,"hourOfDay":19,"zone":{"fixed":false,"uncachedZone":{"cachable":true,"fixed":false,"id":"America/Los_Angeles"},"id":"America/Los_Angeles"},"millis":1326856906000,"chronology":{"zone":{"fixed":false,"uncachedZone":{"cachable":true,"fixed":false,"id":"America/Los_Angeles"},"id":"America/Los_Angeles"}},"afterNow":f
alse,"beforeNow":true,"equalNow":false},"provider":{"id":"id:providers:twitter","attachments":[],"upstreamDuplicates":[],"downstreamDuplicates":[]},"title":"","content":"The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)","url":"http://twitter.com/159475541894897679","links":["http://ti.me/zYyEtD"],"extensions":{"twitter":{"retweeted_status":{"text":"The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)","retweeted":false,"truncated":false,"entities":{"user_mentions":[{"id":245888431,"name":"TIME Moneyland","indices":[106,120],"screen_name":"TIMEMoneyland","id_str":"245888431","additionalProperties":{}}],"hashtags":[],"urls":[{"expanded_url":"http://ti.me/zYyEtD","indices":[80,100],"display_url":"ti.me/zYyEtD","url":"http://t.co/M9UUNvZi","additionalProperties":{}}],"additionalProperties":{"symbols":[]},"symbols":[]},"id":159470076259602432,"sour
ce":"<a href=\"http://www.hootsuite.com\" rel=\"nofollow\">HootSuite</a>","lang":"en","favorited":false,"possibly_sensitive":false,"created_at":"20120117T190003.000-0800","retweet_count":71,"id_str":"159470076259602432","user":{"location":"","default_profile":false,"statuses_count":70754,"profile_background_tile":true,"lang":"en","profile_link_color":"1B4F89","id":14293310,"protected":false,"favourites_count":59,"profile_text_color":"000000","verified":true,"description":"Breaking news and current events from around the globe. Hosted by TIME staff. Tweet questions to our customer service team @TIMEmag_Service.","contributors_enabled":false,"name":"TIME.com","profile_sidebar_border_color":"000000","profile_background_color":"CC0000","created_at":"20080403T065430.000-0700","default_profile_image":false,"followers_count":5146268,"geo_enabled":false,"profile_image_url_https":"https://pbs.twimg.com/profile_images/1700796190/Picture_24_normal.png","profile_background_image_url":"http://a0
.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg","profile_background_image_url_https":"https://si0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg","follow_request_sent":false,"url":"http://t.co/4aYbUuAeSh","utc_offset":-18000,"time_zone":"Eastern Time (US & Canada)","profile_use_background_image":true,"friends_count":742,"profile_sidebar_fill_color":"D9D9D9","screen_name":"TIME","id_str":"14293310","profile_image_url":"http://pbs.twimg.com/profile_images/1700796190/Picture_24_normal.png","is_translator":false,"listed_count":76944,"additionalProperties":{"following":false,"notifications":false,"entities":{"description":{"urls":[]},"url":{"urls":[{"expanded_url":"http://www.time.com","indices":[0,22],"display_url":"time.com","url":"http://t.co/4aYbUuAeSh"}]}},"profile_banner_url":"https://pbs.twimg.com/profile_banners/14293310/1355243462"},"following":false,"notifications":false,"entities":{"description":{"urls"
:[]},"url":{"urls":[{"expanded_url":"http://www.time.com","indices":[0,22],"display_url":"time.com","url":"http://t.co/4aYbUuAeSh"}]}},"profile_banner_url":"https://pbs.twimg.com/profile_banners/14293310/1355243462"},"additionalProperties":{"geo":null,"favorite_count":14,"place":null},"geo":null,"favorite_count":14,"place":null},"additionalProperties":{"geo":null,"favorite_count":0,"place":null},"text":"RT @TIME: The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)","retweeted":false,"truncated":false,"entities":{"user_mentions":[{"id":14293310,"name":"TIME.com","indices":[3,8],"screen_name":"TIME","id_str":"14293310","additionalProperties":{}},{"id":245888431,"name":"TIME Moneyland","indices":[116,130],"screen_name":"TIMEMoneyland","id_str":"245888431","additionalProperties":{}}],"hashtags":[],"urls":[{"expanded_url":"http://ti.me/zYyEtD","indices":[90,110],"display_url":"ti.me/zYyEtD","url":"http://t.co/M9UUNvZi"
,"additionalProperties":{}}],"additionalProperties":{"symbols":[]},"symbols":[]},"id":159475541894897679,"source":"<a href=\"http://twitter.com/download/iphone\" rel=\"nofollow\">Twitter for iPhone</a>","lang":"en","favorited":false,"possibly_sensitive":false,"created_at":"20120117T192146.000-0800","retweet_count":71,"id_str":"159475541894897679","user":{"location":"","default_profile":false,"statuses_count":5053,"profile_background_tile":true,"lang":"en","profile_link_color":"738D84","id":27552112,"protected":false,"favourites_count":52,"profile_text_color":"97CEC9","verified":false,"description":"","contributors_enabled":false,"name":"rafael medina-flores","profile_sidebar_border_color":"A9AC00","profile_background_color":"C5EFE3","created_at":"20090329T182155.000-0700","default_profile_image":false,"followers_count":963,"geo_enabled":true,"profile_image_url_https":"https://pbs.twimg.com/profile_images/2519547938/image_normal.jpg","profile_background_image_url":"http://a0.twimg.co
m/profile_background_images/167479660/trireme.jpg","profile_background_image_url_https":"https://si0.twimg.com/profile_background_images/167479660/trireme.jpg","follow_request_sent":false,"utc_offset":-25200,"time_zone":"Mountain Time (US & Canada)","profile_use_background_image":true,"friends_count":1800,"profile_sidebar_fill_color":"5C4F3C","screen_name":"rmedinaflores","id_str":"27552112","profile_image_url":"http://pbs.twimg.com/profile_images/2519547938/image_normal.jpg","is_translator":false,"listed_count":50,"additionalProperties":{"following":false,"notifications":false,"entities":{"description":{"urls":[]}}},"following":false,"notifications":false,"entities":{"description":{"urls":[]}}},"geo":null,"favorite_count":0,"place":null},"location":{"id":"id:twitter:159475541894897679","coordinates":null}}}
[2/2] git commit: attempting to fix jackson date deserialization
setting up a pig runtime test capability
Posted by sb...@apache.org.
attempting to fix jackson date deserialization
setting up a pig runtime test capability
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9edf1766
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9edf1766
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9edf1766
Branch: refs/heads/springcleaning
Commit: 9edf1766f3f4b11f2353a15c9d76368a522f30e0
Parents: 5d0f6bc
Author: sblackmon <sb...@w2odigital.com>
Authored: Mon Mar 31 14:23:22 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Mon Mar 31 14:23:36 2014 -0500
----------------------------------------------------------------------
.../streams/urls/LinkUnwinderProcessor.java | 85 +++++++++++++++-----
.../streams/urls/TestLinkUnwinderProcessor.java | 30 +++++++
.../processor/TwitterEventProcessor.java | 13 ++-
.../twitter/processor/TwitterTypeConverter.java | 6 +-
.../serializer/StreamsTwitterMapper.java | 49 +++++++++++
.../TwitterJsonActivitySerializer.java | 31 +------
.../TwitterJsonDeleteActivitySerializer.java | 4 +-
.../TwitterJsonRetweetActivitySerializer.java | 6 +-
.../TwitterJsonTweetActivitySerializer.java | 7 +-
.../twitter/test/TweetActivitySerDeTest.java | 8 +-
.../streams/twitter/test/TweetSerDeTest.java | 2 +-
.../ActivityDeserializerException.java | 27 +++++++
.../jackson/StreamsDateTimeDeserializer.java | 27 ++++++-
.../jackson/StreamsDateTimeSerializer.java | 6 +-
.../streams/jackson/StreamsJacksonMapper.java | 40 +++++++++
.../streams/jackson/StreamsJacksonModule.java | 2 +
streams-runtimes/streams-runtime-local/pom.xml | 11 +++
.../test/processors/DoNothingProcessor.java | 3 +
streams-runtimes/streams-runtime-pig/pom.xml | 51 +++++++++++-
.../streams/pig/StreamsProcessorExec.java | 6 +-
.../src/test/java/PigProcessorTest.java | 32 ++++++++
.../src/test/java/PigSerializerTest.java | 40 +++++++++
.../src/test/resources/pigprocessortest.pig | 7 ++
.../src/test/resources/pigserializertest.pig | 7 ++
.../src/test/resources/serializertestin.txt | 1 +
.../src/test/resources/serializertestout.txt | 1 +
26 files changed, 428 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinderProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinderProcessor.java b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinderProcessor.java
index 45ec04d..2496061 100644
--- a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinderProcessor.java
+++ b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkUnwinderProcessor.java
@@ -1,15 +1,29 @@
package org.apache.streams.urls;
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.google.common.collect.Lists;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.jackson.StreamsJacksonModule;
import org.apache.streams.urls.Link;
import org.apache.streams.urls.LinkUnwinder;
import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.pojo.json.Activity;
+import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.*;
/**
@@ -28,7 +42,7 @@ public class LinkUnwinderProcessor implements StreamsProcessor
private final static Logger LOGGER = LoggerFactory.getLogger(LinkUnwinderProcessor.class);
-
+ private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
@Override
public List<StreamsDatum> process(StreamsDatum entry) {
@@ -37,31 +51,47 @@ public class LinkUnwinderProcessor implements StreamsProcessor
LOGGER.debug("{} processing {}", STREAMS_ID, entry.getDocument().getClass());
+ Activity activity;
+
// get list of shared urls
if( entry.getDocument() instanceof Activity) {
- Activity activity = (Activity) entry.getDocument();
- List<String> inputLinks = activity.getLinks();
- List<String> outputLinks = Lists.newArrayList();
- for( String link : inputLinks ) {
- try {
- LinkUnwinder unwinder = new LinkUnwinder((String)link);
- unwinder.run();
- if( !unwinder.isFailure()) {
- outputLinks.add(unwinder.getFinalURL());
- }
- } catch (Exception e) {
- //if unwindable drop
- LOGGER.debug("Failed to unwind link : {}", link);
- LOGGER.debug("Excpetion unwind link : {}", e);
- }
- }
- activity.setLinks(outputLinks);
+ activity = (Activity) entry.getDocument();
+
+ activity.setLinks(unwind(activity.getLinks()));
+
entry.setDocument(activity);
+
result.add(entry);
return result;
+ } else if( entry.getDocument() instanceof String ) {
+
+ try {
+ activity = mapper.readValue((String) entry.getDocument(), Activity.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn(e.getMessage());
+ return(Lists.newArrayList(entry));
+ }
+
+ activity.setLinks(unwind(activity.getLinks()));
+
+ try {
+ entry.setDocument(mapper.writeValueAsString(activity));
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ LOGGER.warn(e.getMessage());
+ return(Lists.newArrayList(entry));
+ }
+
+ result.add(entry);
+
+ return result;
+
+ }
+ else {
+ return(Lists.newArrayList(entry));
}
- else throw new NotImplementedException();
}
@Override
@@ -73,4 +103,21 @@ public class LinkUnwinderProcessor implements StreamsProcessor
}
+ private List<String> unwind(List<String> inputLinks) {
+ List<String> outputLinks = Lists.newArrayList();
+ for( String link : inputLinks ) {
+ try {
+ LinkUnwinder unwinder = new LinkUnwinder((String)link);
+ unwinder.run();
+ if( !unwinder.isFailure()) {
+ outputLinks.add(unwinder.getFinalURL());
+ }
+ } catch (Exception e) {
+ //if unwindable drop
+ LOGGER.debug("Failed to unwind link : {}", link);
+ LOGGER.debug("Exception unwinding link : {}", e);
+ }
+ }
+ return outputLinks;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java b/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java
index 94ae2d2..c6ccf24 100644
--- a/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java
+++ b/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java
@@ -24,31 +24,37 @@ public class TestLinkUnwinderProcessor {
@Test
public void testActivityLinkUnwinderProcessorBitly() throws Exception{
testActivityUnwinderHelper(Lists.newArrayList("http://bit.ly/1cX5Rh4"), Lists.newArrayList("http://www.wcgworld.com/"));
+ testStringActivityUnwinderHelper(Lists.newArrayList("http://bit.ly/1cX5Rh4"), Lists.newArrayList("http://www.wcgworld.com/"));
}
@Test
public void testActivityLinkUnwinderProcessorGoogle() throws Exception{
testActivityUnwinderHelper(Lists.newArrayList("http://goo.gl/wSrHDA"), Lists.newArrayList("http://www.wcgworld.com/"));
+ testStringActivityUnwinderHelper(Lists.newArrayList("http://goo.gl/wSrHDA"), Lists.newArrayList("http://www.wcgworld.com/"));
}
@Test
public void testActivityLinkUnwinderProcessorOwly() throws Exception{
testActivityUnwinderHelper(Lists.newArrayList("http://ow.ly/u4Kte"), Lists.newArrayList("http://www.wcgworld.com/"));
+ testStringActivityUnwinderHelper(Lists.newArrayList("http://ow.ly/u4Kte"), Lists.newArrayList("http://www.wcgworld.com/"));
}
@Test
public void testActivityLinkUnwinderProcessorGoDaddy() throws Exception{
testActivityUnwinderHelper(Lists.newArrayList("http://x.co/3yapt"), Lists.newArrayList("http://www.wcgworld.com/"));
+ testStringActivityUnwinderHelper(Lists.newArrayList("http://x.co/3yapt"), Lists.newArrayList("http://www.wcgworld.com/"));
}
@Test
public void testActivityLinkUnwinderProcessorMulti() throws Exception{
testActivityUnwinderHelper(Lists.newArrayList("http://x.co/3yapt", "http://ow.ly/u4Kte", "http://goo.gl/wSrHDA"), Lists.newArrayList("http://www.wcgworld.com/", "http://www.wcgworld.com/", "http://www.wcgworld.com/"));
+ testStringActivityUnwinderHelper(Lists.newArrayList("http://x.co/3yapt", "http://ow.ly/u4Kte", "http://goo.gl/wSrHDA"), Lists.newArrayList("http://www.wcgworld.com/", "http://www.wcgworld.com/", "http://www.wcgworld.com/"));
}
@Test
public void testActivityLinkUnwinderProcessorUnwindable() throws Exception{
testActivityUnwinderHelper(Lists.newArrayList("http://bit.ly/1cX5Rh4", "http://nope@#$%"), Lists.newArrayList("http://www.wcgworld.com/"));
+ testStringActivityUnwinderHelper(Lists.newArrayList("http://bit.ly/1cX5Rh4", "http://nope@#$%"), Lists.newArrayList("http://www.wcgworld.com/"));
}
public void testActivityUnwinderHelper(List<String> input, List<String> expected) throws Exception{
@@ -73,4 +79,28 @@ public class TestLinkUnwinderProcessor {
assertEquals(Sets.newHashSet(expected), Sets.newHashSet(resultLinks));
}
+ public void testStringActivityUnwinderHelper(List<String> input, List<String> expected) throws Exception{
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ mapper.registerModule(new StreamsJacksonModule());
+ Activity activity = new Activity();
+ activity.setLinks(input);
+ String str = mapper.writeValueAsString(activity);
+ StreamsDatum datum = new StreamsDatum(str);
+ LinkUnwinderProcessor processor = new LinkUnwinderProcessor();
+ processor.prepare(null);
+ List<StreamsDatum> result = processor.process(datum);
+ assertNotNull(result);
+ assertEquals(1, result.size());
+ StreamsDatum resultDatum = result.get(0);
+ assertNotNull(resultDatum);
+ assertTrue(resultDatum.getDocument() instanceof String);
+ String resultActivityString = (String) resultDatum.getDocument();
+ Activity resultActivity = mapper.readValue(resultActivityString, Activity.class);
+ assertNotNull(resultActivity.getLinks());
+ List<String> resultLinks = resultActivity.getLinks();
+ assertEquals(expected.size(), resultLinks.size());
+ assertEquals(Sets.newHashSet(expected), Sets.newHashSet(resultLinks));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
index 00c8032..2f2194f 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java
@@ -14,10 +14,7 @@ import org.apache.streams.twitter.pojo.Delete;
import org.apache.streams.twitter.pojo.Retweet;
import org.apache.streams.twitter.pojo.Tweet;
import org.apache.streams.twitter.provider.TwitterEventClassifier;
-import org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
+import org.apache.streams.twitter.serializer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,7 +32,7 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable {
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterEventProcessor.class);
- private ObjectMapper mapper = new ObjectMapper();
+ private ObjectMapper mapper = StreamsTwitterMapper.getInstance();
private BlockingQueue<String> inQueue;
private Queue<StreamsDatum> outQueue;
@@ -96,15 +93,15 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable {
if( inClass.equals( Delete.class )) {
LOGGER.debug("ACTIVITY DELETE");
result = twitterJsonDeleteActivitySerializer.deserialize(
- TwitterJsonActivitySerializer.mapper.writeValueAsString(event));
+ mapper.writeValueAsString(event));
} else if ( inClass.equals( Retweet.class )) {
LOGGER.debug("ACTIVITY RETWEET");
result = twitterJsonRetweetActivitySerializer.deserialize(
- TwitterJsonActivitySerializer.mapper.writeValueAsString(event));
+ mapper.writeValueAsString(event));
} else if ( inClass.equals( Tweet.class )) {
LOGGER.debug("ACTIVITY TWEET");
result = twitterJsonTweetActivitySerializer.deserialize(
- TwitterJsonActivitySerializer.mapper.writeValueAsString(event));
+ mapper.writeValueAsString(event));
} else {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
index cc438b1..60f2ae7 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
@@ -69,15 +69,15 @@ public class TwitterTypeConverter implements StreamsProcessor {
if( inClass.equals( Delete.class )) {
LOGGER.debug("ACTIVITY DELETE");
result = twitterJsonDeleteActivitySerializer.deserialize(
- TwitterJsonActivitySerializer.mapper.writeValueAsString(event));
+ mapper.writeValueAsString(event));
} else if ( inClass.equals( Retweet.class )) {
LOGGER.debug("ACTIVITY RETWEET");
result = twitterJsonRetweetActivitySerializer.deserialize(
- TwitterJsonActivitySerializer.mapper.writeValueAsString(event));
+ mapper.writeValueAsString(event));
} else if ( inClass.equals( Tweet.class )) {
LOGGER.debug("ACTIVITY TWEET");
result = twitterJsonTweetActivitySerializer.deserialize(
- TwitterJsonActivitySerializer.mapper.writeValueAsString(event));
+ mapper.writeValueAsString(event));
} else {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
new file mode 100644
index 0000000..004e174
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
@@ -0,0 +1,49 @@
+package org.apache.streams.twitter.serializer;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.jackson.StreamsJacksonModule;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.io.IOException;
+
+/**
+ * Created by sblackmon on 3/27/14.
+ */
+public class StreamsTwitterMapper extends StreamsJacksonMapper {
+
+ public static final DateTimeFormatter TWITTER_FORMAT = DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss Z yyyy");
+
+ private static final StreamsTwitterMapper INSTANCE = new StreamsTwitterMapper();
+
+ public static StreamsTwitterMapper getInstance(){
+ return INSTANCE;
+ }
+
+ private StreamsTwitterMapper() {
+ super();
+ registerModule(new SimpleModule()
+ {
+ {
+ addDeserializer(DateTime.class, new StdDeserializer<DateTime>(DateTime.class) {
+ @Override
+ public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException, JsonProcessingException {
+ return TWITTER_FORMAT.parseDateTime(jpar.getValueAsString());
+ }
+ });
+ }
+ });
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
index fceff2c..bfceae0 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
@@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
import com.google.common.base.Joiner;
@@ -16,6 +17,7 @@ import com.google.common.collect.Lists;
import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.data.ActivitySerializer;
import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.jackson.StreamsJacksonModule;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.pojo.json.Provider;
@@ -42,35 +44,6 @@ public class TwitterJsonActivitySerializer implements ActivitySerializer<String>
}
- public static final DateTimeFormatter TWITTER_FORMAT = DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss Z yyyy");
- public static final DateTimeFormatter ACTIVITY_FORMAT = ISODateTimeFormat.basicDateTime();
-
- public static ObjectMapper mapper;
- static {
- mapper = new ObjectMapper();
- mapper.disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
- mapper.registerModule(new StreamsJacksonModule() {
- {
- addDeserializer(DateTime.class, new StdDeserializer<DateTime>(DateTime.class) {
- @Override
- public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException, JsonProcessingException {
- return TWITTER_FORMAT.parseDateTime(jpar.getValueAsString());
- }
- });
- }
- });
- //AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(mapper.getTypeFactory());
- //mapper.setAnnotationIntrospector(introspector);
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
- mapper.configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, Boolean.TRUE);
- mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
- mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
- mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.FALSE);
- mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, Boolean.TRUE);
- mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
-
- }
-
TwitterJsonTweetActivitySerializer tweetActivitySerializer = new TwitterJsonTweetActivitySerializer();
TwitterJsonRetweetActivitySerializer retweetActivitySerializer = new TwitterJsonRetweetActivitySerializer();
TwitterJsonDeleteActivitySerializer deleteActivitySerializer = new TwitterJsonDeleteActivitySerializer();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
index b24bc39..40be0f6 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java
@@ -1,6 +1,7 @@
package org.apache.streams.twitter.serializer;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Strings;
import org.apache.commons.lang.NotImplementedException;
@@ -47,9 +48,10 @@ public class TwitterJsonDeleteActivitySerializer implements ActivitySerializer<S
public Activity convert(ObjectNode event) throws ActivitySerializerException {
+ ObjectMapper mapper = StreamsTwitterMapper.getInstance();
Delete delete = null;
try {
- delete = TwitterJsonActivitySerializer.mapper.treeToValue(event, Delete.class);
+ delete = mapper.treeToValue(event, Delete.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
index 7c08590..51e39b1 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
@@ -1,6 +1,7 @@
package org.apache.streams.twitter.serializer;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Optional;
import com.google.common.base.Strings;
@@ -49,9 +50,10 @@ public class TwitterJsonRetweetActivitySerializer implements ActivitySerializer<
@Override
public Activity deserialize(String event) throws ActivitySerializerException {
+ ObjectMapper mapper = StreamsTwitterMapper.getInstance();
Retweet retweet = null;
try {
- retweet = TwitterJsonActivitySerializer.mapper.readValue(event, Retweet.class);
+ retweet = mapper.readValue(event, Retweet.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
} catch (IOException e) {
@@ -87,7 +89,7 @@ public class TwitterJsonRetweetActivitySerializer implements ActivitySerializer<
}
activity.setUrl("http://twitter.com/" + retweet.getIdStr());
activity.setLinks(TwitterJsonTweetActivitySerializer.getLinks(retweet.getRetweetedStatus()));
- addTwitterExtension(activity, TwitterJsonActivitySerializer.mapper.convertValue(retweet, ObjectNode.class));
+ addTwitterExtension(activity, mapper.convertValue(retweet, ObjectNode.class));
addLocationExtension(activity, retweet);
return activity;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
index a038792..d258dac 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
@@ -1,6 +1,7 @@
package org.apache.streams.twitter.serializer;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Optional;
import com.google.common.base.Strings;
@@ -8,6 +9,7 @@ import com.google.common.collect.Lists;
import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.data.ActivitySerializer;
import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.pojo.json.ActivityObject;
import org.apache.streams.pojo.json.Actor;
@@ -49,9 +51,10 @@ public class TwitterJsonTweetActivitySerializer implements ActivitySerializer<St
@Override
public Activity deserialize(String serialized) throws ActivitySerializerException {
+ ObjectMapper mapper = StreamsJacksonMapper.getInstance();
Tweet tweet = null;
try {
- tweet = TwitterJsonActivitySerializer.mapper.readValue(serialized, Tweet.class);
+ tweet = mapper.readValue(serialized, Tweet.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
} catch (IOException e) {
@@ -81,7 +84,7 @@ public class TwitterJsonTweetActivitySerializer implements ActivitySerializer<St
activity.setUrl("http://twitter.com/" + tweet.getIdStr());
activity.setLinks(getLinks(tweet));
- addTwitterExtension(activity, TwitterJsonActivitySerializer.mapper.convertValue(tweet, ObjectNode.class));
+ addTwitterExtension(activity, mapper.convertValue(tweet, ObjectNode.class));
addLocationExtension(activity, tweet);
return activity;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java
index 494f698..b99a8be 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java
@@ -8,6 +8,7 @@ import org.apache.streams.pojo.json.Activity;
import org.apache.streams.twitter.pojo.Retweet;
import org.apache.streams.twitter.pojo.Tweet;
import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
import org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer;
import org.junit.Assert;
import org.junit.Test;
@@ -33,6 +34,7 @@ import static org.junit.Assert.assertThat;
public class TweetActivitySerDeTest {
private final static Logger LOGGER = LoggerFactory.getLogger(TweetActivitySerDeTest.class);
+ private ObjectMapper mapper = StreamsTwitterMapper.getInstance();
private TwitterJsonActivitySerializer twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
@@ -55,7 +57,7 @@ public class TweetActivitySerDeTest {
Activity activity = twitterJsonActivitySerializer.deserialize(line);
- String activitystring = TwitterJsonActivitySerializer.mapper.writeValueAsString(activity);
+ String activitystring = mapper.writeValueAsString(activity);
LOGGER.info("activity: {}", activitystring);
@@ -72,7 +74,7 @@ public class TweetActivitySerDeTest {
assertEquals(activity.getVerb(), "post");
- Tweet tweet = TwitterJsonActivitySerializer.mapper.readValue(line, Tweet.class);
+ Tweet tweet = mapper.readValue(line, Tweet.class);
if( tweet.getEntities() != null &&
tweet.getEntities().getUrls() != null &&
@@ -85,7 +87,7 @@ public class TweetActivitySerDeTest {
} else if( detected == Retweet.class ) {
- Retweet retweet = TwitterJsonActivitySerializer.mapper.readValue(line, Retweet.class);
+ Retweet retweet = mapper.readValue(line, Retweet.class);
assertThat(retweet.getRetweetedStatus(), is(not(nullValue())));
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
index bc7bcf7..1f6c86d 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
@@ -36,7 +36,7 @@ import static org.junit.Assert.assertThat;
public class TweetSerDeTest {
private final static Logger LOGGER = LoggerFactory.getLogger(TweetSerDeTest.class);
- private ObjectMapper mapper = TwitterJsonActivitySerializer.mapper;
+ private ObjectMapper mapper = StreamsTwitterMapper.getInstance();
private TwitterJsonActivitySerializer twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-pojo/src/main/java/org/apache/streams/exceptions/ActivityDeserializerException.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/exceptions/ActivityDeserializerException.java b/streams-pojo/src/main/java/org/apache/streams/exceptions/ActivityDeserializerException.java
new file mode 100644
index 0000000..0d5d095
--- /dev/null
+++ b/streams-pojo/src/main/java/org/apache/streams/exceptions/ActivityDeserializerException.java
@@ -0,0 +1,27 @@
+package org.apache.streams.exceptions;
+
+/**
+ * Created by sblackmon on 3/25/14.
+ */
+public class ActivityDeserializerException extends Exception {
+
+ public ActivityDeserializerException() {
+ // TODO Auto-generated constructor stub
+ }
+
+ public ActivityDeserializerException(String message) {
+ super(message);
+ // TODO Auto-generated constructor stub
+ }
+
+ public ActivityDeserializerException(Throwable cause) {
+ super(cause);
+ // TODO Auto-generated constructor stub
+ }
+
+ public ActivityDeserializerException(String message, Throwable cause) {
+ super(message, cause);
+ // TODO Auto-generated constructor stub
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
index 4b3c4d1..f189aa9 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
@@ -8,6 +8,8 @@ import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import org.apache.streams.exceptions.ActivityDeserializerException;
+import org.apache.streams.exceptions.ActivitySerializerException;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
@@ -26,7 +28,28 @@ public class StreamsDateTimeDeserializer extends StdDeserializer<DateTime> {
}
@Override
- public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException, JsonProcessingException {
- return ACTIVITY_FORMAT.parseDateTime(jpar.getValueAsString());
+ public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException {
+ DateTime result = null;
+
+ try {
+ result = ACTIVITY_FORMAT.parseDateTime(jpar.getText());
+ return result;
+ } catch( Exception e ) {}
+
+ try {
+ result = ACTIVITY_FORMAT.parseDateTime(jpar.getValueAsString());
+ return result;
+ } catch( Exception e ) {}
+
+
+ try {
+ result = jpar.readValueAs(DateTime.class);
+ return result;
+ } catch( Exception e ) {}
+
+ if( result == null )
+ throw new IOException(" could not deserialize " + jpar.toString());
+
+ return result;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java
index b905bf4..94cf6c3 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java
@@ -22,7 +22,9 @@ public class StreamsDateTimeSerializer extends StdSerializer<DateTime> {
}
@Override
- public void serialize(DateTime value, JsonGenerator jgen, SerializerProvider provider) throws IOException, JsonGenerationException {
- jgen.writeString(ACTIVITY_FORMAT.print(value));
+ public void serialize(DateTime value, JsonGenerator jgen, SerializerProvider provider) throws IOException {
+ long timestamp = value.getMillis();
+ String result = ACTIVITY_FORMAT.print(timestamp);
+ jgen.writeString(result);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
new file mode 100644
index 0000000..7b1c41e
--- /dev/null
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
@@ -0,0 +1,40 @@
+package org.apache.streams.jackson;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+
+/**
+ * Created by sblackmon on 3/27/14.
+ */
+public class StreamsJacksonMapper extends ObjectMapper {
+
+ private static final StreamsJacksonMapper INSTANCE = new StreamsJacksonMapper();
+
+ public static StreamsJacksonMapper getInstance(){
+ return INSTANCE;
+ }
+
+ public StreamsJacksonMapper() {
+ super();
+ registerModule(new StreamsJacksonModule());
+ disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
+ configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, Boolean.TRUE);
+ configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
+ configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
+ configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.FALSE);
+ configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, Boolean.TRUE);
+ setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
index 8edd963..4c8a5aa 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
@@ -13,4 +13,6 @@ public class StreamsJacksonModule extends SimpleModule {
addSerializer(DateTime.class, new StreamsDateTimeSerializer(DateTime.class));
addDeserializer(DateTime.class, new StreamsDateTimeDeserializer(DateTime.class));
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-runtimes/streams-runtime-local/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/pom.xml b/streams-runtimes/streams-runtime-local/pom.xml
index 51be8c4..31e2660 100644
--- a/streams-runtimes/streams-runtime-local/pom.xml
+++ b/streams-runtimes/streams-runtime-local/pom.xml
@@ -115,6 +115,17 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/DoNothingProcessor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/DoNothingProcessor.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/DoNothingProcessor.java
index 25c717b..6c47c41 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/DoNothingProcessor.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/DoNothingProcessor.java
@@ -13,6 +13,9 @@ public class DoNothingProcessor implements StreamsProcessor {
List<StreamsDatum> result;
+ public DoNothingProcessor() {
+ }
+
@Override
public List<StreamsDatum> process(StreamsDatum entry) {
this.result = new LinkedList<StreamsDatum>();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-runtimes/streams-runtime-pig/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/pom.xml b/streams-runtimes/streams-runtime-pig/pom.xml
index 4ceaf1d..4cbf488 100644
--- a/streams-runtimes/streams-runtime-pig/pom.xml
+++ b/streams-runtimes/streams-runtime-pig/pom.xml
@@ -41,6 +41,24 @@
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
+ <artifactId>streams-runtime-local</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-runtime-local</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ <scope>test-jar</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-provider-twitter</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ <scope>test-jar</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
<artifactId>streams-util</artifactId>
<version>0.1-SNAPSHOT</version>
</dependency>
@@ -55,7 +73,7 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
+ <artifactId>hadoop-client</artifactId>
<version>2.0.0-cdh4.5.0.1-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
@@ -65,6 +83,24 @@
<version>0.11.0-cdh4.5.0.1-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.pig</groupId>
+ <artifactId>pigunit</artifactId>
+ <version>0.11.0-cdh4.5.0.1-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>jline</groupId>
+ <artifactId>jline</artifactId>
+ <version>2.11</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr-runtime</artifactId>
+ <version>3.5.2</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
@@ -80,5 +116,18 @@
<directory>src/test/resources</directory>
</testResource>
</testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java
index 4203787..5e58e1e 100644
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java
@@ -1,5 +1,6 @@
package org.apache.streams.pig;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
@@ -30,7 +31,10 @@ public class StreamsProcessorExec extends EvalFunc<DataBag> {
StreamsProcessor streamsProcessor;
public StreamsProcessorExec(String... execArgs) throws ClassNotFoundException{
+ Preconditions.checkNotNull(execArgs);
+ Preconditions.checkArgument(execArgs.length > 0);
String classFullName = execArgs[0];
+ Preconditions.checkNotNull(classFullName);
String[] constructorArgs = new String[execArgs.length-1];
ArrayUtils.remove(execArgs, 0);
ArrayUtils.addAll(constructorArgs, execArgs);
@@ -46,7 +50,7 @@ public class StreamsProcessorExec extends EvalFunc<DataBag> {
Configuration conf = UDFContext.getUDFContext().getJobConf();
- Long id = (Long)line.get(0);
+ String id = (String)line.get(0);
String source = (String)line.get(1);
Long timestamp = (Long)line.get(2);
String object = (String)line.get(3);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-runtimes/streams-runtime-pig/src/test/java/PigProcessorTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/PigProcessorTest.java b/streams-runtimes/streams-runtime-pig/src/test/java/PigProcessorTest.java
new file mode 100644
index 0000000..003e49c
--- /dev/null
+++ b/streams-runtimes/streams-runtime-pig/src/test/java/PigProcessorTest.java
@@ -0,0 +1,32 @@
+import org.apache.pig.pigunit.PigTest;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.text.ParseException;
+
+/**
+ * Created by sblackmon on 3/30/14.
+ */
+public class PigProcessorTest {
+
+ @Ignore
+ @Test
+ public void testPigProcessor() throws Exception {
+ String[] args = {};
+
+ String[] input = {
+ "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{\"id\":\"id:twitter:share:159475541894897679\",\"actor\":{\"id\":\"id:twitter:27552112\",\"displayName\":\"rmedinaflores\",\"attachments\":[],\"upstreamDuplicates\":[],\"downstreamDuplicates\":[]},\"verb\":\"share\",\"object\":{\"id\":\"159470076259602432\",\"objectType\":\"tweet\",\"attachments\":[],\"upstreamDuplicates\":[],\"downstreamDuplicates\":[]},\"published\":{\"year\":2012,\"era\":1,\"dayOfMonth\":17,\"dayOfWeek\":2,\"dayOfYear\":17,\"weekOfWeekyear\":3,\"weekyear\":2012,\"monthOfYear\":1,\"yearOfEra\":2012,\"yearOfCentury\":12,\"centuryOfEra\":20,\"millisOfSecond\":0,\"millisOfDay\":69706000,\"secondOfMinute\":46,\"secondOfDay\":69706,\"minuteOfHour\":21,\"minuteOfDay\":1161,\"hourOfDay\":19,\"zone\":{\"fixed\":false,\"uncachedZone\":{\"cachable\":true,\"fixed\":false,\"id\":\"America/Los_Angeles\"},\"id\":\"America/Los_Angeles\"},\"millis\":1326856906000,\"chronology\":{\"zone\":{\"fixed
\":false,\"uncachedZone\":{\"cachable\":true,\"fixed\":false,\"id\":\"America/Los_Angeles\"},\"id\":\"America/Los_Angeles\"}},\"afterNow\":false,\"beforeNow\":true,\"equalNow\":false},\"provider\":{\"id\":\"id:providers:twitter\",\"attachments\":[],\"upstreamDuplicates\":[],\"downstreamDuplicates\":[]},\"title\":\"\",\"content\":\"The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"url\":\"http://twitter.com/159475541894897679\",\"links\":[\"http://ti.me/zYyEtD\"],\"extensions\":{\"twitter\":{\"retweeted_status\":{\"text\":\"The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"retweeted\":false,\"truncated\":false,\"entities\":{\"user_mentions\":[{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[106,120],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\",\"additionalProperties\":{}}],\"hashtags\":[],\"urls\":[{\"expanded_
url\":\"http://ti.me/zYyEtD\",\"indices\":[80,100],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\",\"additionalProperties\":{}}],\"additionalProperties\":{\"symbols\":[]},\"symbols\":[]},\"id\":159470076259602432,\"source\":\"<a href=\\\"http://www.hootsuite.com\\\" rel=\\\"nofollow\\\">HootSuite</a>\",\"lang\":\"en\",\"favorited\":false,\"possibly_sensitive\":false,\"created_at\":\"20120117T190003.000-0800\",\"retweet_count\":71,\"id_str\":\"159470076259602432\",\"user\":{\"location\":\"\",\"default_profile\":false,\"statuses_count\":70754,\"profile_background_tile\":true,\"lang\":\"en\",\"profile_link_color\":\"1B4F89\",\"id\":14293310,\"protected\":false,\"favourites_count\":59,\"profile_text_color\":\"000000\",\"verified\":true,\"description\":\"Breaking news and current events from around the globe. Hosted by TIME staff. Tweet questions to our customer service team @TIMEmag_Service.\",\"contributors_enabled\":false,\"name\":\"TIME.com\",\"profile_sidebar_borde
r_color\":\"000000\",\"profile_background_color\":\"CC0000\",\"created_at\":\"20080403T065430.000-0700\",\"default_profile_image\":false,\"followers_count\":5146268,\"geo_enabled\":false,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/1700796190/Picture_24_normal.png\",\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"profile_background_image_url_https\":\"https://si0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"follow_request_sent\":false,\"url\":\"http://t.co/4aYbUuAeSh\",\"utc_offset\":-18000,\"time_zone\":\"Eastern Time (US & Canada)\",\"profile_use_background_image\":true,\"friends_count\":742,\"profile_sidebar_fill_color\":\"D9D9D9\",\"screen_name\":\"TIME\",\"id_str\":\"14293310\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/1700796190/Picture_24_normal.png\",\"is_translator\":false,\"listed_count\":76944,\"additionalP
roperties\":{\"following\":false,\"notifications\":false,\"entities\":{\"description\":{\"urls\":[]},\"url\":{\"urls\":[{\"expanded_url\":\"http://www.time.com\",\"indices\":[0,22],\"display_url\":\"time.com\",\"url\":\"http://t.co/4aYbUuAeSh\"}]}},\"profile_banner_url\":\"https://pbs.twimg.com/profile_banners/14293310/1355243462\"},\"following\":false,\"notifications\":false,\"entities\":{\"description\":{\"urls\":[]},\"url\":{\"urls\":[{\"expanded_url\":\"http://www.time.com\",\"indices\":[0,22],\"display_url\":\"time.com\",\"url\":\"http://t.co/4aYbUuAeSh\"}]}},\"profile_banner_url\":\"https://pbs.twimg.com/profile_banners/14293310/1355243462\"},\"additionalProperties\":{\"geo\":null,\"favorite_count\":14,\"place\":null},\"geo\":null,\"favorite_count\":14,\"place\":null},\"additionalProperties\":{\"geo\":null,\"favorite_count\":0,\"place\":null},\"text\":\"RT @TIME: The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMone
yland)\",\"retweeted\":false,\"truncated\":false,\"entities\":{\"user_mentions\":[{\"id\":14293310,\"name\":\"TIME.com\",\"indices\":[3,8],\"screen_name\":\"TIME\",\"id_str\":\"14293310\",\"additionalProperties\":{}},{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[116,130],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\",\"additionalProperties\":{}}],\"hashtags\":[],\"urls\":[{\"expanded_url\":\"http://ti.me/zYyEtD\",\"indices\":[90,110],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\",\"additionalProperties\":{}}],\"additionalProperties\":{\"symbols\":[]},\"symbols\":[]},\"id\":159475541894897679,\"source\":\"<a href=\\\"http://twitter.com/download/iphone\\\" rel=\\\"nofollow\\\">Twitter for iPhone</a>\",\"lang\":\"en\",\"favorited\":false,\"possibly_sensitive\":false,\"created_at\":\"20120117T192146.000-0800\",\"retweet_count\":71,\"id_str\":\"159475541894897679\",\"user\":{\"location\":\"\",\"default_profile\":false,\"statuses_count\":5053,\"
profile_background_tile\":true,\"lang\":\"en\",\"profile_link_color\":\"738D84\",\"id\":27552112,\"protected\":false,\"favourites_count\":52,\"profile_text_color\":\"97CEC9\",\"verified\":false,\"description\":\"\",\"contributors_enabled\":false,\"name\":\"rafael medina-flores\",\"profile_sidebar_border_color\":\"A9AC00\",\"profile_background_color\":\"C5EFE3\",\"created_at\":\"20090329T182155.000-0700\",\"default_profile_image\":false,\"followers_count\":963,\"geo_enabled\":true,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"profile_background_image_url_https\":\"https://si0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"follow_request_sent\":false,\"utc_offset\":-25200,\"time_zone\":\"Mountain Time (US & Canada)\",\"profile_use_background_image\":true,\"friends_count\":1800,\"profile_sidebar_fill_color\":\"5C4F3
C\",\"screen_name\":\"rmedinaflores\",\"id_str\":\"27552112\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"is_translator\":false,\"listed_count\":50,\"additionalProperties\":{\"following\":false,\"notifications\":false,\"entities\":{\"description\":{\"urls\":[]}}},\"following\":false,\"notifications\":false,\"entities\":{\"description\":{\"urls\":[]}}},\"geo\":null,\"favorite_count\":0,\"place\":null},\"location\":{\"id\":\"id:twitter:159475541894897679\",\"coordinates\":null}}}",
+ };
+
+ String[] output = {
+ "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{\"id\":\"id:twitter:share:159475541894897679\",\"actor\":{\"id\":\"id:twitter:27552112\",\"displayName\":\"rmedinaflores\",\"attachments\":[],\"upstreamDuplicates\":[],\"downstreamDuplicates\":[]},\"verb\":\"share\",\"object\":{\"id\":\"159470076259602432\",\"objectType\":\"tweet\",\"attachments\":[],\"upstreamDuplicates\":[],\"downstreamDuplicates\":[]},\"published\":{\"year\":2012,\"era\":1,\"dayOfMonth\":17,\"dayOfWeek\":2,\"dayOfYear\":17,\"weekOfWeekyear\":3,\"weekyear\":2012,\"monthOfYear\":1,\"yearOfEra\":2012,\"yearOfCentury\":12,\"centuryOfEra\":20,\"millisOfSecond\":0,\"millisOfDay\":69706000,\"secondOfMinute\":46,\"secondOfDay\":69706,\"minuteOfHour\":21,\"minuteOfDay\":1161,\"hourOfDay\":19,\"zone\":{\"fixed\":false,\"uncachedZone\":{\"cachable\":true,\"fixed\":false,\"id\":\"America/Los_Angeles\"},\"id\":\"America/Los_Angeles\"},\"millis\":1326856906000,\"chronology\":{\"zone\":{\"fixed
\":false,\"uncachedZone\":{\"cachable\":true,\"fixed\":false,\"id\":\"America/Los_Angeles\"},\"id\":\"America/Los_Angeles\"}},\"afterNow\":false,\"beforeNow\":true,\"equalNow\":false},\"provider\":{\"id\":\"id:providers:twitter\",\"attachments\":[],\"upstreamDuplicates\":[],\"downstreamDuplicates\":[]},\"title\":\"\",\"content\":\"The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"url\":\"http://twitter.com/159475541894897679\",\"links\":[\"http://ti.me/zYyEtD\"],\"extensions\":{\"twitter\":{\"retweeted_status\":{\"text\":\"The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"retweeted\":false,\"truncated\":false,\"entities\":{\"user_mentions\":[{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[106,120],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\",\"additionalProperties\":{}}],\"hashtags\":[],\"urls\":[{\"expanded_
url\":\"http://ti.me/zYyEtD\",\"indices\":[80,100],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\",\"additionalProperties\":{}}],\"additionalProperties\":{\"symbols\":[]},\"symbols\":[]},\"id\":159470076259602432,\"source\":\"<a href=\\\"http://www.hootsuite.com\\\" rel=\\\"nofollow\\\">HootSuite</a>\",\"lang\":\"en\",\"favorited\":false,\"possibly_sensitive\":false,\"created_at\":\"20120117T190003.000-0800\",\"retweet_count\":71,\"id_str\":\"159470076259602432\",\"user\":{\"location\":\"\",\"default_profile\":false,\"statuses_count\":70754,\"profile_background_tile\":true,\"lang\":\"en\",\"profile_link_color\":\"1B4F89\",\"id\":14293310,\"protected\":false,\"favourites_count\":59,\"profile_text_color\":\"000000\",\"verified\":true,\"description\":\"Breaking news and current events from around the globe. Hosted by TIME staff. Tweet questions to our customer service team @TIMEmag_Service.\",\"contributors_enabled\":false,\"name\":\"TIME.com\",\"profile_sidebar_borde
r_color\":\"000000\",\"profile_background_color\":\"CC0000\",\"created_at\":\"20080403T065430.000-0700\",\"default_profile_image\":false,\"followers_count\":5146268,\"geo_enabled\":false,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/1700796190/Picture_24_normal.png\",\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"profile_background_image_url_https\":\"https://si0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"follow_request_sent\":false,\"url\":\"http://t.co/4aYbUuAeSh\",\"utc_offset\":-18000,\"time_zone\":\"Eastern Time (US & Canada)\",\"profile_use_background_image\":true,\"friends_count\":742,\"profile_sidebar_fill_color\":\"D9D9D9\",\"screen_name\":\"TIME\",\"id_str\":\"14293310\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/1700796190/Picture_24_normal.png\",\"is_translator\":false,\"listed_count\":76944,\"additionalP
roperties\":{\"following\":false,\"notifications\":false,\"entities\":{\"description\":{\"urls\":[]},\"url\":{\"urls\":[{\"expanded_url\":\"http://www.time.com\",\"indices\":[0,22],\"display_url\":\"time.com\",\"url\":\"http://t.co/4aYbUuAeSh\"}]}},\"profile_banner_url\":\"https://pbs.twimg.com/profile_banners/14293310/1355243462\"},\"following\":false,\"notifications\":false,\"entities\":{\"description\":{\"urls\":[]},\"url\":{\"urls\":[{\"expanded_url\":\"http://www.time.com\",\"indices\":[0,22],\"display_url\":\"time.com\",\"url\":\"http://t.co/4aYbUuAeSh\"}]}},\"profile_banner_url\":\"https://pbs.twimg.com/profile_banners/14293310/1355243462\"},\"additionalProperties\":{\"geo\":null,\"favorite_count\":14,\"place\":null},\"geo\":null,\"favorite_count\":14,\"place\":null},\"additionalProperties\":{\"geo\":null,\"favorite_count\":0,\"place\":null},\"text\":\"RT @TIME: The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMone
yland)\",\"retweeted\":false,\"truncated\":false,\"entities\":{\"user_mentions\":[{\"id\":14293310,\"name\":\"TIME.com\",\"indices\":[3,8],\"screen_name\":\"TIME\",\"id_str\":\"14293310\",\"additionalProperties\":{}},{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[116,130],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\",\"additionalProperties\":{}}],\"hashtags\":[],\"urls\":[{\"expanded_url\":\"http://ti.me/zYyEtD\",\"indices\":[90,110],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\",\"additionalProperties\":{}}],\"additionalProperties\":{\"symbols\":[]},\"symbols\":[]},\"id\":159475541894897679,\"source\":\"<a href=\\\"http://twitter.com/download/iphone\\\" rel=\\\"nofollow\\\">Twitter for iPhone</a>\",\"lang\":\"en\",\"favorited\":false,\"possibly_sensitive\":false,\"created_at\":\"20120117T192146.000-0800\",\"retweet_count\":71,\"id_str\":\"159475541894897679\",\"user\":{\"location\":\"\",\"default_profile\":false,\"statuses_count\":5053,\"
profile_background_tile\":true,\"lang\":\"en\",\"profile_link_color\":\"738D84\",\"id\":27552112,\"protected\":false,\"favourites_count\":52,\"profile_text_color\":\"97CEC9\",\"verified\":false,\"description\":\"\",\"contributors_enabled\":false,\"name\":\"rafael medina-flores\",\"profile_sidebar_border_color\":\"A9AC00\",\"profile_background_color\":\"C5EFE3\",\"created_at\":\"20090329T182155.000-0700\",\"default_profile_image\":false,\"followers_count\":963,\"geo_enabled\":true,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"profile_background_image_url_https\":\"https://si0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"follow_request_sent\":false,\"utc_offset\":-25200,\"time_zone\":\"Mountain Time (US & Canada)\",\"profile_use_background_image\":true,\"friends_count\":1800,\"profile_sidebar_fill_color\":\"5C4F3
C\",\"screen_name\":\"rmedinaflores\",\"id_str\":\"27552112\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"is_translator\":false,\"listed_count\":50,\"additionalProperties\":{\"following\":false,\"notifications\":false,\"entities\":{\"description\":{\"urls\":[]}}},\"following\":false,\"notifications\":false,\"entities\":{\"description\":{\"urls\":[]}}},\"geo\":null,\"favorite_count\":0,\"place\":null},\"location\":{\"id\":\"id:twitter:159475541894897679\",\"coordinates\":null}}}",
+ };
+
+ PigTest test;
+ test = new PigTest("src/test/resources/pigprocessortest.pig", args);
+ test.assertOutput("activities", input, "result", output);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-runtimes/streams-runtime-pig/src/test/java/PigSerializerTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/PigSerializerTest.java b/streams-runtimes/streams-runtime-pig/src/test/java/PigSerializerTest.java
new file mode 100644
index 0000000..ffd8e75
--- /dev/null
+++ b/streams-runtimes/streams-runtime-pig/src/test/java/PigSerializerTest.java
@@ -0,0 +1,40 @@
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.pigunit.PigTest;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Iterator;
+
+/**
+ * Created by sblackmon on 3/30/14.
+ */
+public class PigSerializerTest {
+
+ @Ignore
+ @Test
+ public void testPigSerializer() throws Exception {
+ String[] args = {};
+
+ String input =
+ "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{\"retweeted_status\":{\"contributors\":null,\"text\":\"The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"possibly_sensitive\":false,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[{\"expanded_url\":\"http://ti.me/zYyEtD\",\"indices\":[80,100],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\"}],\"hashtags\":[],\"user_mentions\":[{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[106,120],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\"}]},\"in_reply_to_status_id_str\":null,\"id\":159470076259602432,\"source\":\"<a href=\\\"http://www.hootsuite.com\\\" rel=\\\"nofollow\\\">HootSuite<\\/a>\",\"in_reply_to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retweet_count\":71,\"create
d_at\":\"Wed Jan 18 03:00:03 +0000 2012\",\"in_reply_to_user_id\":null,\"favorite_count\":14,\"id_str\":\"159470076259602432\",\"place\":null,\"user\":{\"location\":\"\",\"default_profile\":false,\"profile_background_tile\":true,\"statuses_count\":70754,\"lang\":\"en\",\"profile_link_color\":\"1B4F89\",\"profile_banner_url\":\"https://pbs.twimg.com/profile_banners/14293310/1355243462\",\"id\":14293310,\"following\":false,\"protected\":false,\"favourites_count\":59,\"profile_text_color\":\"000000\",\"description\":\"Breaking news and current events from around the globe. Hosted by TIME staff. Tweet questions to our customer service team @TIMEmag_Service.\",\"verified\":true,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"000000\",\"name\":\"TIME.com\",\"profile_background_color\":\"CC0000\",\"created_at\":\"Thu Apr 03 13:54:30 +0000 2008\",\"default_profile_image\":false,\"followers_count\":5146268,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/1
700796190/Picture_24_normal.png\",\"geo_enabled\":false,\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"profile_background_image_url_https\":\"https://si0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]},\"url\":{\"urls\":[{\"expanded_url\":\"http://www.time.com\",\"indices\":[0,22],\"display_url\":\"time.com\",\"url\":\"http://t.co/4aYbUuAeSh\"}]}},\"url\":\"http://t.co/4aYbUuAeSh\",\"utc_offset\":-18000,\"time_zone\":\"Eastern Time (US & Canada)\",\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":742,\"profile_sidebar_fill_color\":\"D9D9D9\",\"screen_name\":\"TIME\",\"id_str\":\"14293310\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/1700796190/Picture_24_normal.png\",\"listed_count\":76944,\"is_translator\":false},\"coordinates\":null},\"contr
ibutors\":null,\"text\":\"RT @TIME: The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"possibly_sensitive\":false,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[{\"expanded_url\":\"http://ti.me/zYyEtD\",\"indices\":[90,110],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\"}],\"hashtags\":[],\"user_mentions\":[{\"id\":14293310,\"name\":\"TIME.com\",\"indices\":[3,8],\"screen_name\":\"TIME\",\"id_str\":\"14293310\"},{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[116,130],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\"}]},\"in_reply_to_status_id_str\":null,\"id\":159475541894897679,\"source\":\"<a href=\\\"http://twitter.com/download/iphone\\\" rel=\\\"nofollow\\\">Twitter for iPhone<\\/a>\",\"in_reply_to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retwe
et_count\":71,\"created_at\":\"Wed Jan 18 03:21:46 +0000 2012\",\"in_reply_to_user_id\":null,\"favorite_count\":0,\"id_str\":\"159475541894897679\",\"place\":null,\"user\":{\"location\":\"\",\"default_profile\":false,\"profile_background_tile\":true,\"statuses_count\":5053,\"lang\":\"en\",\"profile_link_color\":\"738D84\",\"id\":27552112,\"following\":false,\"protected\":false,\"favourites_count\":52,\"profile_text_color\":\"97CEC9\",\"description\":\"\",\"verified\":false,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"A9AC00\",\"name\":\"rafael medina-flores\",\"profile_background_color\":\"C5EFE3\",\"created_at\":\"Mon Mar 30 01:21:55 +0000 2009\",\"default_profile_image\":false,\"followers_count\":963,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"geo_enabled\":true,\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"profile_background_image_url_https\":\"
https://si0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]}},\"url\":null,\"utc_offset\":-25200,\"time_zone\":\"Mountain Time (US & Canada)\",\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":1800,\"profile_sidebar_fill_color\":\"5C4F3C\",\"screen_name\":\"rmedinaflores\",\"id_str\":\"27552112\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"listed_count\":50,\"is_translator\":false},\"coordinates\":null}"
+ ;
+
+ String[] output = {
+ "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{\"id\":\"id:twitter:share:159475541894897679\",\"actor\":{\"id\":\"id:twitter:27552112\",\"displayName\":\"rmedinaflores\",\"attachments\":[],\"upstreamDuplicates\":[],\"downstreamDuplicates\":[]},\"verb\":\"share\",\"object\":{\"id\":\"159470076259602432\",\"objectType\":\"tweet\",\"attachments\":[],\"upstreamDuplicates\":[],\"downstreamDuplicates\":[]},\"published\":{\"year\":2012,\"era\":1,\"dayOfMonth\":17,\"dayOfWeek\":2,\"dayOfYear\":17,\"weekOfWeekyear\":3,\"weekyear\":2012,\"monthOfYear\":1,\"yearOfEra\":2012,\"yearOfCentury\":12,\"centuryOfEra\":20,\"millisOfSecond\":0,\"millisOfDay\":69706000,\"secondOfMinute\":46,\"secondOfDay\":69706,\"minuteOfHour\":21,\"minuteOfDay\":1161,\"hourOfDay\":19,\"zone\":{\"fixed\":false,\"uncachedZone\":{\"cachable\":true,\"fixed\":false,\"id\":\"America/Los_Angeles\"},\"id\":\"America/Los_Angeles\"},\"millis\":1326856906000,\"chronology\":{\"zone\":{\"fixed
\":false,\"uncachedZone\":{\"cachable\":true,\"fixed\":false,\"id\":\"America/Los_Angeles\"},\"id\":\"America/Los_Angeles\"}},\"afterNow\":false,\"beforeNow\":true,\"equalNow\":false},\"provider\":{\"id\":\"id:providers:twitter\",\"attachments\":[],\"upstreamDuplicates\":[],\"downstreamDuplicates\":[]},\"title\":\"\",\"content\":\"The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"url\":\"http://twitter.com/159475541894897679\",\"links\":[\"http://business.time.com/2012/01/17/italy-cruise-disaster-a-disaster-for-the-entire-cruise-industry/\"],\"extensions\":{\"twitter\":{\"retweeted_status\":{\"text\":\"The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"retweeted\":false,\"truncated\":false,\"entities\":{\"user_mentions\":[{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[106,120],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\
"245888431\",\"additionalProperties\":{}}],\"hashtags\":[],\"urls\":[{\"expanded_url\":\"http://ti.me/zYyEtD\",\"indices\":[80,100],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\",\"additionalProperties\":{}}],\"additionalProperties\":{\"symbols\":[]},\"symbols\":[]},\"id\":159470076259602432,\"source\":\"<a href=\\\"http://www.hootsuite.com\\\" rel=\\\"nofollow\\\">HootSuite</a>\",\"lang\":\"en\",\"favorited\":false,\"possibly_sensitive\":false,\"created_at\":\"20120117T190003.000-0800\",\"retweet_count\":71,\"id_str\":\"159470076259602432\",\"user\":{\"location\":\"\",\"default_profile\":false,\"statuses_count\":70754,\"profile_background_tile\":true,\"lang\":\"en\",\"profile_link_color\":\"1B4F89\",\"id\":14293310,\"protected\":false,\"favourites_count\":59,\"profile_text_color\":\"000000\",\"verified\":true,\"description\":\"Breaking news and current events from around the globe. Hosted by TIME staff. Tweet questions to our customer service team @TIMEmag_Servic
e.\",\"contributors_enabled\":false,\"name\":\"TIME.com\",\"profile_sidebar_border_color\":\"000000\",\"profile_background_color\":\"CC0000\",\"created_at\":\"20080403T065430.000-0700\",\"default_profile_image\":false,\"followers_count\":5146268,\"geo_enabled\":false,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/1700796190/Picture_24_normal.png\",\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"profile_background_image_url_https\":\"https://si0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"follow_request_sent\":false,\"url\":\"http://t.co/4aYbUuAeSh\",\"utc_offset\":-18000,\"time_zone\":\"Eastern Time (US & Canada)\",\"profile_use_background_image\":true,\"friends_count\":742,\"profile_sidebar_fill_color\":\"D9D9D9\",\"screen_name\":\"TIME\",\"id_str\":\"14293310\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/1700796190/Pic
ture_24_normal.png\",\"is_translator\":false,\"listed_count\":76944,\"additionalProperties\":{\"following\":false,\"notifications\":false,\"entities\":{\"description\":{\"urls\":[]},\"url\":{\"urls\":[{\"expanded_url\":\"http://www.time.com\",\"indices\":[0,22],\"display_url\":\"time.com\",\"url\":\"http://t.co/4aYbUuAeSh\"}]}},\"profile_banner_url\":\"https://pbs.twimg.com/profile_banners/14293310/1355243462\"},\"following\":false,\"notifications\":false,\"entities\":{\"description\":{\"urls\":[]},\"url\":{\"urls\":[{\"expanded_url\":\"http://www.time.com\",\"indices\":[0,22],\"display_url\":\"time.com\",\"url\":\"http://t.co/4aYbUuAeSh\"}]}},\"profile_banner_url\":\"https://pbs.twimg.com/profile_banners/14293310/1355243462\"},\"additionalProperties\":{\"geo\":null,\"favorite_count\":14,\"place\":null},\"geo\":null,\"favorite_count\":14,\"place\":null},\"additionalProperties\":{\"geo\":null,\"favorite_count\":0,\"place\":null},\"text\":\"RT @TIME: The Costa Concordia cruise ship ac
cident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"retweeted\":false,\"truncated\":false,\"entities\":{\"user_mentions\":[{\"id\":14293310,\"name\":\"TIME.com\",\"indices\":[3,8],\"screen_name\":\"TIME\",\"id_str\":\"14293310\",\"additionalProperties\":{}},{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[116,130],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\",\"additionalProperties\":{}}],\"hashtags\":[],\"urls\":[{\"expanded_url\":\"http://ti.me/zYyEtD\",\"indices\":[90,110],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\",\"additionalProperties\":{}}],\"additionalProperties\":{\"symbols\":[]},\"symbols\":[]},\"id\":159475541894897679,\"source\":\"<a href=\\\"http://twitter.com/download/iphone\\\" rel=\\\"nofollow\\\">Twitter for iPhone</a>\",\"lang\":\"en\",\"favorited\":false,\"possibly_sensitive\":false,\"created_at\":\"20120117T192146.000-0800\",\"retweet_count\":71,\"id_str\":\"159475541894897679\"
,\"user\":{\"location\":\"\",\"default_profile\":false,\"statuses_count\":5053,\"profile_background_tile\":true,\"lang\":\"en\",\"profile_link_color\":\"738D84\",\"id\":27552112,\"protected\":false,\"favourites_count\":52,\"profile_text_color\":\"97CEC9\",\"verified\":false,\"description\":\"\",\"contributors_enabled\":false,\"name\":\"rafael medina-flores\",\"profile_sidebar_border_color\":\"A9AC00\",\"profile_background_color\":\"C5EFE3\",\"created_at\":\"20090329T182155.000-0700\",\"default_profile_image\":false,\"followers_count\":963,\"geo_enabled\":true,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"profile_background_image_url_https\":\"https://si0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"follow_request_sent\":false,\"utc_offset\":-25200,\"time_zone\":\"Mountain Time (US & Canada)\",\"profile_use_back
ground_image\":true,\"friends_count\":1800,\"profile_sidebar_fill_color\":\"5C4F3C\",\"screen_name\":\"rmedinaflores\",\"id_str\":\"27552112\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"is_translator\":false,\"listed_count\":50,\"additionalProperties\":{\"following\":false,\"notifications\":false,\"entities\":{\"description\":{\"urls\":[]}}},\"following\":false,\"notifications\":false,\"entities\":{\"description\":{\"urls\":[]}}},\"geo\":null,\"favorite_count\":0,\"place\":null},\"location\":{\"id\":\"id:twitter:159475541894897679\",\"coordinates\":null}}}",
+ };
+
+ //File output = new File("src/test/resources/serializertestout.txt");
+
+ PigTest test;
+ test = new PigTest("src/test/resources/pigserializertest.pig", args);
+ test.runScript();
+ Iterator<Tuple> inIterator = test.getAlias("tweets");
+ assert(ImmutableList.copyOf(inIterator)).size() == 1;
+ test.assertOutput("activities", output);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessortest.pig
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessortest.pig b/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessortest.pig
new file mode 100644
index 0000000..6b1511a
--- /dev/null
+++ b/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessortest.pig
@@ -0,0 +1,7 @@
+DEFINE UNWINDER org.apache.streams.pig.StreamsProcessorExec('org.apache.streams.local.test.processors.DoNothingProcessor');
+
+activities = LOAD '*' USING PigStorage('\t') AS (activityid: chararray, source: chararray, timestamp: long, object: chararray);
+
+unwound = FOREACH activities GENERATE UNWINDER(activityid, source, timestamp, object);
+
+result = FILTER activities BY $3 IS NOT NULL;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-runtimes/streams-runtime-pig/src/test/resources/pigserializertest.pig
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/resources/pigserializertest.pig b/streams-runtimes/streams-runtime-pig/src/test/resources/pigserializertest.pig
new file mode 100644
index 0000000..e1820d9
--- /dev/null
+++ b/streams-runtimes/streams-runtime-pig/src/test/resources/pigserializertest.pig
@@ -0,0 +1,7 @@
+DEFINE SERIALIZER org.apache.streams.pig.StreamsSerializerExec('org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer');
+
+tweets = LOAD 'src/main/resources/serializertestin.txt' USING PigStorage('\t') AS (activityid: chararray, source: chararray, timestamp: long, object: chararray);
+
+activities = FOREACH tweets GENERATE activityid, source, timestamp, SERIALIZER(object);
+
+STORE activities INTO 'target/tweets-activities';
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9edf1766/streams-runtimes/streams-runtime-pig/src/test/resources/serializertestin.txt
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/resources/serializertestin.txt b/streams-runtimes/streams-runtime-pig/src/test/resources/serializertestin.txt
new file mode 100644
index 0000000..7a20515
--- /dev/null
+++ b/streams-runtimes/streams-runtime-pig/src/test/resources/serializertestin.txt
@@ -0,0 +1 @@
+159475541894897679 twitter,statuses/user_timeline 1384499359006 {"retweeted_status":{"contributors":null,"text":"The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)","geo":null,"retweeted":false,"in_reply_to_screen_name":null,"possibly_sensitive":false,"truncated":false,"lang":"en","entities":{"symbols":[],"urls":[{"expanded_url":"http://ti.me/zYyEtD","indices":[80,100],"display_url":"ti.me/zYyEtD","url":"http://t.co/M9UUNvZi"}],"hashtags":[],"user_mentions":[{"id":245888431,"name":"TIME Moneyland","indices":[106,120],"screen_name":"TIMEMoneyland","id_str":"245888431"}]},"in_reply_to_status_id_str":null,"id":159470076259602432,"source":"<a href=\"http://www.hootsuite.com\" rel=\"nofollow\">HootSuite<\/a>","in_reply_to_user_id_str":null,"favorited":false,"in_reply_to_status_id":null,"retweet_count":71,"created_at":"Wed Jan 18 03:00:03 +0000 2012","in_reply_to_user_id":null,"favorite_count":14,"id_str":"159470076259
602432","place":null,"user":{"location":"","default_profile":false,"profile_background_tile":true,"statuses_count":70754,"lang":"en","profile_link_color":"1B4F89","profile_banner_url":"https://pbs.twimg.com/profile_banners/14293310/1355243462","id":14293310,"following":false,"protected":false,"favourites_count":59,"profile_text_color":"000000","description":"Breaking news and current events from around the globe. Hosted by TIME staff. Tweet questions to our customer service team @TIMEmag_Service.","verified":true,"contributors_enabled":false,"profile_sidebar_border_color":"000000","name":"TIME.com","profile_background_color":"CC0000","created_at":"Thu Apr 03 13:54:30 +0000 2008","default_profile_image":false,"followers_count":5146268,"profile_image_url_https":"https://pbs.twimg.com/profile_images/1700796190/Picture_24_normal.png","geo_enabled":false,"profile_background_image_url":"http://a0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg","profile
_background_image_url_https":"https://si0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg","follow_request_sent":false,"entities":{"description":{"urls":[]},"url":{"urls":[{"expanded_url":"http://www.time.com","indices":[0,22],"display_url":"time.com","url":"http://t.co/4aYbUuAeSh"}]}},"url":"http://t.co/4aYbUuAeSh","utc_offset":-18000,"time_zone":"Eastern Time (US & Canada)","notifications":false,"profile_use_background_image":true,"friends_count":742,"profile_sidebar_fill_color":"D9D9D9","screen_name":"TIME","id_str":"14293310","profile_image_url":"http://pbs.twimg.com/profile_images/1700796190/Picture_24_normal.png","listed_count":76944,"is_translator":false},"coordinates":null},"contributors":null,"text":"RT @TIME: The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)","geo":null,"retweeted":false,"in_reply_to_screen_name":null,"possibly_sensitive":false,"truncated":false,"lan
g":"en","entities":{"symbols":[],"urls":[{"expanded_url":"http://ti.me/zYyEtD","indices":[90,110],"display_url":"ti.me/zYyEtD","url":"http://t.co/M9UUNvZi"}],"hashtags":[],"user_mentions":[{"id":14293310,"name":"TIME.com","indices":[3,8],"screen_name":"TIME","id_str":"14293310"},{"id":245888431,"name":"TIME Moneyland","indices":[116,130],"screen_name":"TIMEMoneyland","id_str":"245888431"}]},"in_reply_to_status_id_str":null,"id":159475541894897679,"source":"<a href=\"http://twitter.com/download/iphone\" rel=\"nofollow\">Twitter for iPhone<\/a>","in_reply_to_user_id_str":null,"favorited":false,"in_reply_to_status_id":null,"retweet_count":71,"created_at":"Wed Jan 18 03:21:46 +0000 2012","in_reply_to_user_id":null,"favorite_count":0,"id_str":"159475541894897679","place":null,"user":{"location":"","default_profile":false,"profile_background_tile":true,"statuses_count":5053,"lang":"en","profile_link_color":"738D84","id":27552112,"following":false,"protected":false,"favourites_count":52,"p
rofile_text_color":"97CEC9","description":"","verified":false,"contributors_enabled":false,"profile_sidebar_border_color":"A9AC00","name":"rafael medina-flores","profile_background_color":"C5EFE3","created_at":"Mon Mar 30 01:21:55 +0000 2009","default_profile_image":false,"followers_count":963,"profile_image_url_https":"https://pbs.twimg.com/profile_images/2519547938/image_normal.jpg","geo_enabled":true,"profile_background_image_url":"http://a0.twimg.com/profile_background_images/167479660/trireme.jpg","profile_background_image_url_https":"https://si0.twimg.com/profile_background_images/167479660/trireme.jpg","follow_request_sent":false,"entities":{"description":{"urls":[]}},"url":null,"utc_offset":-25200,"time_zone":"Mountain Time (US & Canada)","notifications":false,"profile_use_background_image":true,"friends_count":1800,"profile_sidebar_fill_color":"5C4F3C","screen_name":"rmedinaflores","id_str":"27552112","profile_image_url":"http://pbs.twimg.com/profile_images/2519547938/image
_normal.jpg","listed_count":50,"is_translator":false},"coordinates":null}