You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/04/01 18:44:10 UTC

git commit: attempting to fix jackson date deserialization

Repository: incubator-streams
Updated Branches:
  refs/heads/springcleaning cd4355ae4 -> a93bb176d


attempting to fix jackson date deserialization


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a93bb176
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a93bb176
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a93bb176

Branch: refs/heads/springcleaning
Commit: a93bb176d1c17322b860966553371fe3fefd5596
Parents: cd4355a
Author: sblackmon <sb...@w2odigital.com>
Authored: Tue Apr 1 11:43:48 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Tue Apr 1 11:43:48 2014 -0500

----------------------------------------------------------------------
 .../streams/twitter/test/SimpleTweetTest.java   | 94 ++++++++++++++++++++
 .../apache/streams/data/util/RFC3339Utils.java  |  7 ++
 .../jackson/StreamsDateTimeDeserializer.java    |  2 +-
 .../jackson/StreamsDateTimeSerializer.java      |  2 +-
 .../streams/pig/StreamsProcessDatumExec.java    | 81 +++++++++++++++++
 .../streams/pig/StreamsProcessDocumentExec.java | 83 +++++++++++++++++
 .../streams/pig/StreamsProcessorExec.java       | 81 -----------------
 .../streams/pig/StreamsSerializerExec.java      |  2 -
 .../src/test/resources/pigprocessortest.pig     |  2 +-
 .../org/apache/streams/util/ComponentUtils.java | 22 +++++
 10 files changed, 290 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
new file mode 100644
index 0000000..8988de0
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
@@ -0,0 +1,94 @@
+package org.apache.streams.twitter.test;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Optional;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
+import org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import static org.hamcrest.CoreMatchers.*;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+* Created with IntelliJ IDEA.
+* User: sblackmon
+* Date: 8/20/13
+* Time: 5:57 PM
+* To change this template use File | Settings | File Templates.
+*/
+public class SimpleTweetTest {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(SimpleTweetTest.class);
+    private ObjectMapper mapper = StreamsTwitterMapper.getInstance();
+
+    private static final String TWITTER_JSON= "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":410898682356047872,\"id_str\":\"410898682356047872\",\"text\":\"RT @ughhblog: RRome (Brooklyn, NY) \\u2013 MY GIRL http:\\/\\/t.co\\/x6uxX9PLsH via @indierapblog @RRoseRRome\",\"source\":\"\\u003ca href=\\\"https:\\/\\/about.twitter.com\\/products\\/tweetdeck\\\" rel=\\\"nofollow\\\"\\u003eTweetDeck\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":70463906,\"id_str\":\"70463906\",\"name\":\"MHM DESIGNS, LLC\",\"screen_name\":\"MHMDESIGNS\",\"location\":\"Los Angeles New York\",\"url\":\"http:\\/\\/www.mhmdesigns.com\",\"description\":\"Multi Media Made Simple- Web desig, Graphic Design, Internet Marketing, Photography, Video Production and much much more.\",\"protected\":false,\"followers_count\":10,\"friends_coun
 t\":64,\"listed_count\":1,\"created_at\":\"Mon Aug 31 18:31:54 +0000 2009\",\"favourites_count\":0,\"utc_offset\":-28800,\"time_zone\":\"Pacific Time (US & Canada)\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":87,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"9AE4E8\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_background_images\\/33456434\\/body.png\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/33456434\\/body.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/391494416\\/mhm_design_logo__normal.png\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/391494416\\/mhm_design_logo__normal.png\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"BDDCAD\",\"profile_sidebar_fill_color\":\"DDFFCC\",\"profile_text_color\":\"333333\",\"profile_us
 e_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweeted_status\":{\"created_at\":\"Wed Dec 11 10:56:49 +0000 2013\",\"id\":410724848306892800,\"id_str\":\"410724848306892800\",\"text\":\"RRome (Brooklyn, NY) \\u2013 MY GIRL http:\\/\\/t.co\\/x6uxX9PLsH via @indierapblog @RRoseRRome\",\"source\":\"\\u003ca href=\\\"http:\\/\\/twitter.com\\/tweetbutton\\\" rel=\\\"nofollow\\\"\\u003eTweet Button\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":538836510,\"id_str\":\"538836510\",\"name\":\"UGHHBlog\",\"screen_name\":\"ughhblog\",\"location\":\"Los Angeles\",\"url\":\"http:\\/\\/www.undergroundhiphopblog.com\",\"description\":\"http:\\/\\/UN
 DERGROUNDHIPHOPBLOG.com: A top Indie\\/Underground Hip Hop community blog. Submission Email: ughhblog@gmail.com \\/\\/\\/ Official Host: @pawz1\",\"protected\":false,\"followers_count\":2598,\"friends_count\":373,\"listed_count\":25,\"created_at\":\"Wed Mar 28 05:40:49 +0000 2012\",\"favourites_count\":423,\"utc_offset\":-28800,\"time_zone\":\"Pacific Time (US & Canada)\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":9623,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"131516\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_background_images\\/544717772\\/UGHHBlogLogo.jpg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/544717772\\/UGHHBlogLogo.jpg\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/2583702975\\/uas8528qzzdlnsb7igzn_normal.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.tw
 img.com\\/profile_images\\/2583702975\\/uas8528qzzdlnsb7igzn_normal.jpeg\",\"profile_link_color\":\"009999\",\"profile_sidebar_border_color\":\"EEEEEE\",\"profile_sidebar_fill_color\":\"EFEFEF\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":4,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/t.co\\/x6uxX9PLsH\",\"expanded_url\":\"http:\\/\\/indierapblog.com\\/rrome-brooklyn-ny-my-girl\\/\",\"display_url\":\"indierapblog.com\\/rrome-brooklyn\\u2026\",\"indices\":[31,53]}],\"user_mentions\":[{\"screen_name\":\"IndieRapBlog\",\"name\":\"IndieRapBlog.com\",\"id\":922776728,\"id_str\":\"922776728\",\"indices\":[58,71]},{\"screen_name\":\"RRoseRRome\",\"name\":\"RRome\",\"id\":76371478,\"id_str\":\"76
 371478\",\"indices\":[72,83]}]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"lang\":\"en\"},\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/t.co\\/x6uxX9PLsH\",\"expanded_url\":\"http:\\/\\/indierapblog.com\\/rrome-brooklyn-ny-my-girl\\/\",\"display_url\":\"indierapblog.com\\/rrome-brooklyn\\u2026\",\"indices\":[45,67]}],\"user_mentions\":[{\"screen_name\":\"ughhblog\",\"name\":\"UGHHBlog\",\"id\":538836510,\"id_str\":\"538836510\",\"indices\":[3,12]},{\"screen_name\":\"IndieRapBlog\",\"name\":\"IndieRapBlog.com\",\"id\":922776728,\"id_str\":\"922776728\",\"indices\":[72,85]},{\"screen_name\":\"RRoseRRome\",\"name\":\"RRome\",\"id\":76371478,\"id_str\":\"76371478\",\"indices\":[86,97]}]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"}";
+
+    private TwitterJsonActivitySerializer twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
+
+
+    //    @Ignore
+    @Test
+    public void Tests()
+    {
+        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
+        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
+        mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
+
+        InputStream is = SimpleTweetTest.class.getResourceAsStream("/testtweets.txt");
+        InputStreamReader isr = new InputStreamReader(is);
+        BufferedReader br = new BufferedReader(isr);
+
+        ObjectNode event = null;
+        try {
+            event = (ObjectNode) mapper.readTree(TWITTER_JSON);
+        } catch (IOException e) {
+            e.printStackTrace();
+            Assert.fail();
+        }
+
+        assertThat(event, is(not(nullValue())));
+
+        Tweet tweet = mapper.convertValue(event, Tweet.class);
+
+        assertThat(tweet, is(not(nullValue())));
+        assertThat(tweet.getCreatedAt(), is(not(nullValue())));
+        assertThat(tweet.getText(), is(not(nullValue())));
+        assertThat(tweet.getUser(), is(not(nullValue())));
+
+        Activity activity = null;
+        try {
+            activity = twitterJsonActivitySerializer.deserialize(TWITTER_JSON);
+        } catch (ActivitySerializerException e) {
+            e.printStackTrace();
+            Assert.fail();
+        }
+
+        assertThat(activity, is(not(nullValue())));
+
+        assertThat(activity.getId(), is(not(nullValue())));
+        assertThat(activity.getActor(), is(not(nullValue())));
+        assertThat(activity.getActor().getId(), is(not(nullValue())));
+        assertThat(activity.getVerb(), is(not(nullValue())));
+        assertThat(activity.getProvider(), is(not(nullValue())));
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-pojo/src/main/java/org/apache/streams/data/util/RFC3339Utils.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/data/util/RFC3339Utils.java b/streams-pojo/src/main/java/org/apache/streams/data/util/RFC3339Utils.java
index 7af16c4..473d6bb 100644
--- a/streams-pojo/src/main/java/org/apache/streams/data/util/RFC3339Utils.java
+++ b/streams-pojo/src/main/java/org/apache/streams/data/util/RFC3339Utils.java
@@ -30,6 +30,13 @@ import java.util.regex.Pattern;
  * Parses and formats Joda Time {@link org.joda.time.DateTime} dates to and from RFC3339 compatible Strings
  */
 public class RFC3339Utils {
+
+    private static final RFC3339Utils INSTANCE = new RFC3339Utils();
+
+    public static RFC3339Utils getInstance(){
+        return INSTANCE;
+    }
+
     private static final String BASE = "^[0-9]{4}\\-[0-9]{2}\\-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}";
     private static final String TZ = "[+-][0-9]{2}:?[0-9]{2}$";
     private static final String SUB_SECOND = "\\.([0-9]*)";

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
index 8d56c53..3286e74 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
@@ -19,6 +19,6 @@ public class StreamsDateTimeDeserializer extends StdDeserializer<DateTime> {
 
     @Override
     public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException {
-        return RFC3339Utils.parseUTC(jpar.getValueAsString());
+        return RFC3339Utils.getInstance().parseUTC(jpar.getValueAsString());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java
index 4677fce..26bc157 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java
@@ -23,6 +23,6 @@ public class StreamsDateTimeSerializer extends StdSerializer<DateTime> {
 
     @Override
     public void serialize(DateTime value, JsonGenerator jgen, SerializerProvider provider) throws IOException {
-        jgen.writeString(RFC3339Utils.format(value));
+        jgen.writeString(RFC3339Utils.getInstance().format(value));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
new file mode 100644
index 0000000..542c106
--- /dev/null
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
@@ -0,0 +1,81 @@
+package org.apache.streams.pig;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.builtin.MonitoredUDF;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by sblackmon on 3/25/14.
+ */
+@MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 30, intDefault = 10)
+public class StreamsProcessDatumExec extends EvalFunc<DataBag> {
+
+    TupleFactory mTupleFactory = TupleFactory.getInstance();
+    BagFactory mBagFactory = BagFactory.getInstance();
+
+    StreamsProcessor streamsProcessor;
+
+    public StreamsProcessDatumExec(String... execArgs) throws ClassNotFoundException{
+        Preconditions.checkNotNull(execArgs);
+        Preconditions.checkArgument(execArgs.length > 0);
+        String classFullName = execArgs[0];
+        Preconditions.checkNotNull(classFullName);
+        String[] constructorArgs = new String[execArgs.length-1];
+        ArrayUtils.remove(execArgs, 0);
+        ArrayUtils.addAll(constructorArgs, execArgs);
+        streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(classFullName));
+        streamsProcessor.prepare(null);
+    }
+
+    @Override
+    public DataBag exec(Tuple line) throws IOException {
+
+        if (line == null || line.size() == 0)
+            return null;
+
+        Configuration conf = UDFContext.getUDFContext().getJobConf();
+
+        String id = (String)line.get(0);
+        String provider = (String)line.get(1);
+        Long timestamp = (Long)line.get(2);
+        String object = (String)line.get(3);
+
+        StreamsDatum entry = new StreamsDatum(object);
+
+        List<StreamsDatum> resultSet = streamsProcessor.process(entry);
+        List<Tuple> resultTupleList = Lists.newArrayList();
+
+        for( StreamsDatum resultDatum : resultSet ) {
+            Tuple tuple = mTupleFactory.newTuple();
+            tuple.append(id);
+            tuple.append(provider);
+            tuple.append(timestamp);
+            tuple.append(resultDatum.getDocument());
+            resultTupleList.add(tuple);
+        }
+
+        DataBag result = mBagFactory.newDefaultBag(resultTupleList);
+
+        return result;
+
+    }
+
+    public void finish() {
+        streamsProcessor.cleanUp();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
new file mode 100644
index 0000000..469aa3f
--- /dev/null
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
@@ -0,0 +1,83 @@
+package org.apache.streams.pig;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.builtin.MonitoredUDF;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by sblackmon on 3/25/14.
+ */
+@MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 30, intDefault = 10)
+public class StreamsProcessDocumentExec extends EvalFunc<String> {
+
+    TupleFactory mTupleFactory = TupleFactory.getInstance();
+    BagFactory mBagFactory = BagFactory.getInstance();
+
+    StreamsProcessor streamsProcessor;
+    ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    public StreamsProcessDocumentExec(String... execArgs) throws ClassNotFoundException{
+        Preconditions.checkNotNull(execArgs);
+        Preconditions.checkArgument(execArgs.length > 0);
+        String processorFullName = execArgs[0];
+        Preconditions.checkNotNull(processorFullName);
+        streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(processorFullName));
+        streamsProcessor.prepare(null);
+    }
+
+    @Override
+    public String exec(Tuple input) throws IOException {
+
+        Preconditions.checkNotNull(streamsProcessor);
+        Preconditions.checkNotNull(input);
+        Preconditions.checkArgument(input.size() == 1);
+
+        String document = (String) input.get(0);
+
+        Preconditions.checkNotNull(document);
+
+        StreamsDatum entry = new StreamsDatum(document);
+
+        Preconditions.checkNotNull(entry);
+
+        List<StreamsDatum> resultSet = streamsProcessor.process(entry);
+
+        Object resultDoc = null;
+        for( StreamsDatum resultDatum : resultSet ) {
+            resultDoc = resultDatum.getDocument();
+        }
+
+        Preconditions.checkNotNull(resultDoc);
+
+        if( resultDoc instanceof String )
+            return (String) resultDoc;
+        else if( resultDoc instanceof ObjectNode)
+            return mapper.writeValueAsString(resultDoc);
+        else
+            return null;
+
+    }
+
+    public void finish() {
+        streamsProcessor.cleanUp();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java
deleted file mode 100644
index addded4..0000000
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package org.apache.streams.pig;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.builtin.MonitoredUDF;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.util.UDFContext;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Created by sblackmon on 3/25/14.
- */
-@MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 30, intDefault = 10)
-public class StreamsProcessorExec extends EvalFunc<DataBag> {
-
-    TupleFactory mTupleFactory = TupleFactory.getInstance();
-    BagFactory mBagFactory = BagFactory.getInstance();
-
-    StreamsProcessor streamsProcessor;
-
-    public StreamsProcessorExec(String... execArgs) throws ClassNotFoundException{
-        Preconditions.checkNotNull(execArgs);
-        Preconditions.checkArgument(execArgs.length > 0);
-        String classFullName = execArgs[0];
-        Preconditions.checkNotNull(classFullName);
-        String[] constructorArgs = new String[execArgs.length-1];
-        ArrayUtils.remove(execArgs, 0);
-        ArrayUtils.addAll(constructorArgs, execArgs);
-        streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(classFullName));
-        streamsProcessor.prepare(null);
-    }
-
-    @Override
-    public DataBag exec(Tuple line) throws IOException {
-
-        if (line == null || line.size() == 0)
-            return null;
-
-        Configuration conf = UDFContext.getUDFContext().getJobConf();
-
-        String id = (String)line.get(0);
-        String provider = (String)line.get(1);
-        Long timestamp = (Long)line.get(2);
-        String object = (String)line.get(3);
-
-        StreamsDatum entry = new StreamsDatum(object);
-
-        List<StreamsDatum> resultSet = streamsProcessor.process(entry);
-        List<Tuple> resultTupleList = Lists.newArrayList();
-
-        for( StreamsDatum resultDatum : resultSet ) {
-            Tuple tuple = mTupleFactory.newTuple();
-            tuple.append(id);
-            tuple.append(provider);
-            tuple.append(timestamp);
-            tuple.append(resultDatum.getDocument());
-            resultTupleList.add(tuple);
-        }
-
-        DataBag result = mBagFactory.newDefaultBag(resultTupleList);
-
-        return result;
-
-    }
-
-    public void finish() {
-        streamsProcessor.cleanUp();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
index 46675cb..67f3c61 100644
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
@@ -59,9 +59,7 @@ public class StreamsSerializerExec extends EvalFunc<String> {
         } catch( Exception e ) {
             e.printStackTrace();
         }
-        System.out.println("5");
         Preconditions.checkNotNull(activity);
-        System.out.println("6");
 
         return mapper.writeValueAsString(activity);
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessortest.pig
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessortest.pig b/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessortest.pig
index 6b1511a..054b3af 100644
--- a/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessortest.pig
+++ b/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessortest.pig
@@ -1,4 +1,4 @@
-DEFINE UNWINDER org.apache.streams.pig.StreamsProcessorExec('org.apache.streams.local.test.processors.DoNothingProcessor');
+DEFINE UNWINDER org.apache.streams.pig.StreamsProcessDatumExec('org.apache.streams.local.test.processors.DoNothingProcessor');
 
 activities = LOAD '*' USING PigStorage('\t') AS (activityid: chararray, source: chararray, timestamp: long, object: chararray);
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
new file mode 100644
index 0000000..5bf1d53
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
@@ -0,0 +1,22 @@
+package org.apache.streams.util;
+
+import java.util.Queue;
+
+/**
+ * Created by sblackmon on 3/31/14.
+ */
+public class ComponentUtils {
+
+    public static void offerUntilSuccess(Object entry, Queue queue) {
+
+        boolean success;
+        do {
+            synchronized( ComponentUtils.class ) {
+                success = queue.offer(entry);
+            }
+            Thread.yield();
+        }
+        while( !success );
+    }
+
+}