You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/04/01 18:44:10 UTC
git commit: attempting to fix jackson date deserialization
Repository: incubator-streams
Updated Branches:
refs/heads/springcleaning cd4355ae4 -> a93bb176d
attempting to fix jackson date deserialization
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a93bb176
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a93bb176
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a93bb176
Branch: refs/heads/springcleaning
Commit: a93bb176d1c17322b860966553371fe3fefd5596
Parents: cd4355a
Author: sblackmon <sb...@w2odigital.com>
Authored: Tue Apr 1 11:43:48 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Tue Apr 1 11:43:48 2014 -0500
----------------------------------------------------------------------
.../streams/twitter/test/SimpleTweetTest.java | 94 ++++++++++++++++++++
.../apache/streams/data/util/RFC3339Utils.java | 7 ++
.../jackson/StreamsDateTimeDeserializer.java | 2 +-
.../jackson/StreamsDateTimeSerializer.java | 2 +-
.../streams/pig/StreamsProcessDatumExec.java | 81 +++++++++++++++++
.../streams/pig/StreamsProcessDocumentExec.java | 83 +++++++++++++++++
.../streams/pig/StreamsProcessorExec.java | 81 -----------------
.../streams/pig/StreamsSerializerExec.java | 2 -
.../src/test/resources/pigprocessortest.pig | 2 +-
.../org/apache/streams/util/ComponentUtils.java | 22 +++++
10 files changed, 290 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
new file mode 100644
index 0000000..8988de0
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
@@ -0,0 +1,94 @@
+package org.apache.streams.twitter.test;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Optional;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.provider.TwitterEventClassifier;
+import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
+import org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import static org.hamcrest.CoreMatchers.*;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+* Created with IntelliJ IDEA.
+* User: sblackmon
+* Date: 8/20/13
+* Time: 5:57 PM
+* To change this template use File | Settings | File Templates.
+*/
+public class SimpleTweetTest {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(SimpleTweetTest.class);
+ private ObjectMapper mapper = StreamsTwitterMapper.getInstance();
+
+ private static final String TWITTER_JSON= "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":410898682356047872,\"id_str\":\"410898682356047872\",\"text\":\"RT @ughhblog: RRome (Brooklyn, NY) \\u2013 MY GIRL http:\\/\\/t.co\\/x6uxX9PLsH via @indierapblog @RRoseRRome\",\"source\":\"\\u003ca href=\\\"https:\\/\\/about.twitter.com\\/products\\/tweetdeck\\\" rel=\\\"nofollow\\\"\\u003eTweetDeck\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":70463906,\"id_str\":\"70463906\",\"name\":\"MHM DESIGNS, LLC\",\"screen_name\":\"MHMDESIGNS\",\"location\":\"Los Angeles New York\",\"url\":\"http:\\/\\/www.mhmdesigns.com\",\"description\":\"Multi Media Made Simple- Web desig, Graphic Design, Internet Marketing, Photography, Video Production and much much more.\",\"protected\":false,\"followers_count\":10,\"friends_coun
t\":64,\"listed_count\":1,\"created_at\":\"Mon Aug 31 18:31:54 +0000 2009\",\"favourites_count\":0,\"utc_offset\":-28800,\"time_zone\":\"Pacific Time (US & Canada)\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":87,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"9AE4E8\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_background_images\\/33456434\\/body.png\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/33456434\\/body.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/391494416\\/mhm_design_logo__normal.png\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/391494416\\/mhm_design_logo__normal.png\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"BDDCAD\",\"profile_sidebar_fill_color\":\"DDFFCC\",\"profile_text_color\":\"333333\",\"profile_us
e_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweeted_status\":{\"created_at\":\"Wed Dec 11 10:56:49 +0000 2013\",\"id\":410724848306892800,\"id_str\":\"410724848306892800\",\"text\":\"RRome (Brooklyn, NY) \\u2013 MY GIRL http:\\/\\/t.co\\/x6uxX9PLsH via @indierapblog @RRoseRRome\",\"source\":\"\\u003ca href=\\\"http:\\/\\/twitter.com\\/tweetbutton\\\" rel=\\\"nofollow\\\"\\u003eTweet Button\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":538836510,\"id_str\":\"538836510\",\"name\":\"UGHHBlog\",\"screen_name\":\"ughhblog\",\"location\":\"Los Angeles\",\"url\":\"http:\\/\\/www.undergroundhiphopblog.com\",\"description\":\"http:\\/\\/UN
DERGROUNDHIPHOPBLOG.com: A top Indie\\/Underground Hip Hop community blog. Submission Email: ughhblog@gmail.com \\/\\/\\/ Official Host: @pawz1\",\"protected\":false,\"followers_count\":2598,\"friends_count\":373,\"listed_count\":25,\"created_at\":\"Wed Mar 28 05:40:49 +0000 2012\",\"favourites_count\":423,\"utc_offset\":-28800,\"time_zone\":\"Pacific Time (US & Canada)\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":9623,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"131516\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_background_images\\/544717772\\/UGHHBlogLogo.jpg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/544717772\\/UGHHBlogLogo.jpg\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/2583702975\\/uas8528qzzdlnsb7igzn_normal.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.tw
img.com\\/profile_images\\/2583702975\\/uas8528qzzdlnsb7igzn_normal.jpeg\",\"profile_link_color\":\"009999\",\"profile_sidebar_border_color\":\"EEEEEE\",\"profile_sidebar_fill_color\":\"EFEFEF\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":4,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/t.co\\/x6uxX9PLsH\",\"expanded_url\":\"http:\\/\\/indierapblog.com\\/rrome-brooklyn-ny-my-girl\\/\",\"display_url\":\"indierapblog.com\\/rrome-brooklyn\\u2026\",\"indices\":[31,53]}],\"user_mentions\":[{\"screen_name\":\"IndieRapBlog\",\"name\":\"IndieRapBlog.com\",\"id\":922776728,\"id_str\":\"922776728\",\"indices\":[58,71]},{\"screen_name\":\"RRoseRRome\",\"name\":\"RRome\",\"id\":76371478,\"id_str\":\"76
371478\",\"indices\":[72,83]}]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"lang\":\"en\"},\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/t.co\\/x6uxX9PLsH\",\"expanded_url\":\"http:\\/\\/indierapblog.com\\/rrome-brooklyn-ny-my-girl\\/\",\"display_url\":\"indierapblog.com\\/rrome-brooklyn\\u2026\",\"indices\":[45,67]}],\"user_mentions\":[{\"screen_name\":\"ughhblog\",\"name\":\"UGHHBlog\",\"id\":538836510,\"id_str\":\"538836510\",\"indices\":[3,12]},{\"screen_name\":\"IndieRapBlog\",\"name\":\"IndieRapBlog.com\",\"id\":922776728,\"id_str\":\"922776728\",\"indices\":[72,85]},{\"screen_name\":\"RRoseRRome\",\"name\":\"RRome\",\"id\":76371478,\"id_str\":\"76371478\",\"indices\":[86,97]}]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"}";
+
+ private TwitterJsonActivitySerializer twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
+
+
+ // @Ignore
+ @Test
+ public void Tests()
+ {
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
+ mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
+ mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
+
+ InputStream is = SimpleTweetTest.class.getResourceAsStream("/testtweets.txt");
+ InputStreamReader isr = new InputStreamReader(is);
+ BufferedReader br = new BufferedReader(isr);
+
+ ObjectNode event = null;
+ try {
+ event = (ObjectNode) mapper.readTree(TWITTER_JSON);
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+
+ assertThat(event, is(not(nullValue())));
+
+ Tweet tweet = mapper.convertValue(event, Tweet.class);
+
+ assertThat(tweet, is(not(nullValue())));
+ assertThat(tweet.getCreatedAt(), is(not(nullValue())));
+ assertThat(tweet.getText(), is(not(nullValue())));
+ assertThat(tweet.getUser(), is(not(nullValue())));
+
+ Activity activity = null;
+ try {
+ activity = twitterJsonActivitySerializer.deserialize(TWITTER_JSON);
+ } catch (ActivitySerializerException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+
+ assertThat(activity, is(not(nullValue())));
+
+ assertThat(activity.getId(), is(not(nullValue())));
+ assertThat(activity.getActor(), is(not(nullValue())));
+ assertThat(activity.getActor().getId(), is(not(nullValue())));
+ assertThat(activity.getVerb(), is(not(nullValue())));
+ assertThat(activity.getProvider(), is(not(nullValue())));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-pojo/src/main/java/org/apache/streams/data/util/RFC3339Utils.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/data/util/RFC3339Utils.java b/streams-pojo/src/main/java/org/apache/streams/data/util/RFC3339Utils.java
index 7af16c4..473d6bb 100644
--- a/streams-pojo/src/main/java/org/apache/streams/data/util/RFC3339Utils.java
+++ b/streams-pojo/src/main/java/org/apache/streams/data/util/RFC3339Utils.java
@@ -30,6 +30,13 @@ import java.util.regex.Pattern;
* Parses and formats Joda Time {@link org.joda.time.DateTime} dates to and from RFC3339 compatible Strings
*/
public class RFC3339Utils {
+
+ private static final RFC3339Utils INSTANCE = new RFC3339Utils();
+
+ public static RFC3339Utils getInstance(){
+ return INSTANCE;
+ }
+
private static final String BASE = "^[0-9]{4}\\-[0-9]{2}\\-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}";
private static final String TZ = "[+-][0-9]{2}:?[0-9]{2}$";
private static final String SUB_SECOND = "\\.([0-9]*)";
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
index 8d56c53..3286e74 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java
@@ -19,6 +19,6 @@ public class StreamsDateTimeDeserializer extends StdDeserializer<DateTime> {
@Override
public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException {
- return RFC3339Utils.parseUTC(jpar.getValueAsString());
+ return RFC3339Utils.getInstance().parseUTC(jpar.getValueAsString());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java
index 4677fce..26bc157 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java
@@ -23,6 +23,6 @@ public class StreamsDateTimeSerializer extends StdSerializer<DateTime> {
@Override
public void serialize(DateTime value, JsonGenerator jgen, SerializerProvider provider) throws IOException {
- jgen.writeString(RFC3339Utils.format(value));
+ jgen.writeString(RFC3339Utils.getInstance().format(value));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
new file mode 100644
index 0000000..542c106
--- /dev/null
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
@@ -0,0 +1,81 @@
+package org.apache.streams.pig;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.builtin.MonitoredUDF;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by sblackmon on 3/25/14.
+ */
+@MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 30, intDefault = 10)
+public class StreamsProcessDatumExec extends EvalFunc<DataBag> {
+
+ TupleFactory mTupleFactory = TupleFactory.getInstance();
+ BagFactory mBagFactory = BagFactory.getInstance();
+
+ StreamsProcessor streamsProcessor;
+
+ public StreamsProcessDatumExec(String... execArgs) throws ClassNotFoundException{
+ Preconditions.checkNotNull(execArgs);
+ Preconditions.checkArgument(execArgs.length > 0);
+ String classFullName = execArgs[0];
+ Preconditions.checkNotNull(classFullName);
+ String[] constructorArgs = new String[execArgs.length-1];
+ ArrayUtils.remove(execArgs, 0);
+ ArrayUtils.addAll(constructorArgs, execArgs);
+ streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(classFullName));
+ streamsProcessor.prepare(null);
+ }
+
+ @Override
+ public DataBag exec(Tuple line) throws IOException {
+
+ if (line == null || line.size() == 0)
+ return null;
+
+ Configuration conf = UDFContext.getUDFContext().getJobConf();
+
+ String id = (String)line.get(0);
+ String provider = (String)line.get(1);
+ Long timestamp = (Long)line.get(2);
+ String object = (String)line.get(3);
+
+ StreamsDatum entry = new StreamsDatum(object);
+
+ List<StreamsDatum> resultSet = streamsProcessor.process(entry);
+ List<Tuple> resultTupleList = Lists.newArrayList();
+
+ for( StreamsDatum resultDatum : resultSet ) {
+ Tuple tuple = mTupleFactory.newTuple();
+ tuple.append(id);
+ tuple.append(provider);
+ tuple.append(timestamp);
+ tuple.append(resultDatum.getDocument());
+ resultTupleList.add(tuple);
+ }
+
+ DataBag result = mBagFactory.newDefaultBag(resultTupleList);
+
+ return result;
+
+ }
+
+ public void finish() {
+ streamsProcessor.cleanUp();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
new file mode 100644
index 0000000..469aa3f
--- /dev/null
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
@@ -0,0 +1,83 @@
+package org.apache.streams.pig;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.builtin.MonitoredUDF;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by sblackmon on 3/25/14.
+ */
+@MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 30, intDefault = 10)
+public class StreamsProcessDocumentExec extends EvalFunc<String> {
+
+ TupleFactory mTupleFactory = TupleFactory.getInstance();
+ BagFactory mBagFactory = BagFactory.getInstance();
+
+ StreamsProcessor streamsProcessor;
+ ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ public StreamsProcessDocumentExec(String... execArgs) throws ClassNotFoundException{
+ Preconditions.checkNotNull(execArgs);
+ Preconditions.checkArgument(execArgs.length > 0);
+ String processorFullName = execArgs[0];
+ Preconditions.checkNotNull(processorFullName);
+ streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(processorFullName));
+ streamsProcessor.prepare(null);
+ }
+
+ @Override
+ public String exec(Tuple input) throws IOException {
+
+ Preconditions.checkNotNull(streamsProcessor);
+ Preconditions.checkNotNull(input);
+ Preconditions.checkArgument(input.size() == 1);
+
+ String document = (String) input.get(0);
+
+ Preconditions.checkNotNull(document);
+
+ StreamsDatum entry = new StreamsDatum(document);
+
+ Preconditions.checkNotNull(entry);
+
+ List<StreamsDatum> resultSet = streamsProcessor.process(entry);
+
+ Object resultDoc = null;
+ for( StreamsDatum resultDatum : resultSet ) {
+ resultDoc = resultDatum.getDocument();
+ }
+
+ Preconditions.checkNotNull(resultDoc);
+
+ if( resultDoc instanceof String )
+ return (String) resultDoc;
+ else if( resultDoc instanceof ObjectNode)
+ return mapper.writeValueAsString(resultDoc);
+ else
+ return null;
+
+ }
+
+ public void finish() {
+ streamsProcessor.cleanUp();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java
deleted file mode 100644
index addded4..0000000
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessorExec.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package org.apache.streams.pig;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.pig.EvalFunc;
-import org.apache.pig.builtin.MonitoredUDF;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.util.UDFContext;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Created by sblackmon on 3/25/14.
- */
-@MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 30, intDefault = 10)
-public class StreamsProcessorExec extends EvalFunc<DataBag> {
-
- TupleFactory mTupleFactory = TupleFactory.getInstance();
- BagFactory mBagFactory = BagFactory.getInstance();
-
- StreamsProcessor streamsProcessor;
-
- public StreamsProcessorExec(String... execArgs) throws ClassNotFoundException{
- Preconditions.checkNotNull(execArgs);
- Preconditions.checkArgument(execArgs.length > 0);
- String classFullName = execArgs[0];
- Preconditions.checkNotNull(classFullName);
- String[] constructorArgs = new String[execArgs.length-1];
- ArrayUtils.remove(execArgs, 0);
- ArrayUtils.addAll(constructorArgs, execArgs);
- streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(classFullName));
- streamsProcessor.prepare(null);
- }
-
- @Override
- public DataBag exec(Tuple line) throws IOException {
-
- if (line == null || line.size() == 0)
- return null;
-
- Configuration conf = UDFContext.getUDFContext().getJobConf();
-
- String id = (String)line.get(0);
- String provider = (String)line.get(1);
- Long timestamp = (Long)line.get(2);
- String object = (String)line.get(3);
-
- StreamsDatum entry = new StreamsDatum(object);
-
- List<StreamsDatum> resultSet = streamsProcessor.process(entry);
- List<Tuple> resultTupleList = Lists.newArrayList();
-
- for( StreamsDatum resultDatum : resultSet ) {
- Tuple tuple = mTupleFactory.newTuple();
- tuple.append(id);
- tuple.append(provider);
- tuple.append(timestamp);
- tuple.append(resultDatum.getDocument());
- resultTupleList.add(tuple);
- }
-
- DataBag result = mBagFactory.newDefaultBag(resultTupleList);
-
- return result;
-
- }
-
- public void finish() {
- streamsProcessor.cleanUp();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
index 46675cb..67f3c61 100644
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
@@ -59,9 +59,7 @@ public class StreamsSerializerExec extends EvalFunc<String> {
} catch( Exception e ) {
e.printStackTrace();
}
- System.out.println("5");
Preconditions.checkNotNull(activity);
- System.out.println("6");
return mapper.writeValueAsString(activity);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessortest.pig
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessortest.pig b/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessortest.pig
index 6b1511a..054b3af 100644
--- a/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessortest.pig
+++ b/streams-runtimes/streams-runtime-pig/src/test/resources/pigprocessortest.pig
@@ -1,4 +1,4 @@
-DEFINE UNWINDER org.apache.streams.pig.StreamsProcessorExec('org.apache.streams.local.test.processors.DoNothingProcessor');
+DEFINE UNWINDER org.apache.streams.pig.StreamsProcessDatumExec('org.apache.streams.local.test.processors.DoNothingProcessor');
activities = LOAD '*' USING PigStorage('\t') AS (activityid: chararray, source: chararray, timestamp: long, object: chararray);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a93bb176/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
new file mode 100644
index 0000000..5bf1d53
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
@@ -0,0 +1,22 @@
+package org.apache.streams.util;
+
+import java.util.Queue;
+
+/**
+ * Created by sblackmon on 3/31/14.
+ */
+public class ComponentUtils {
+
+ public static void offerUntilSuccess(Object entry, Queue queue) {
+
+ boolean success;
+ do {
+ synchronized( ComponentUtils.class ) {
+ success = queue.offer(entry);
+ }
+ Thread.yield();
+ }
+ while( !success );
+ }
+
+}