You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/23 21:11:47 UTC

[02/28] incubator-streams git commit: omni-bus update

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-pojo/src/main/java/org/apache/streams/data/DocumentClassifier.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/data/DocumentClassifier.java b/streams-pojo/src/main/java/org/apache/streams/data/DocumentClassifier.java
new file mode 100644
index 0000000..ea0055f
--- /dev/null
+++ b/streams-pojo/src/main/java/org/apache/streams/data/DocumentClassifier.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.data;
+
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.pojo.json.Activity;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Serializes and deserializes Activities
+ */
+public interface DocumentClassifier extends Serializable {
+
+    /**
+     * Gets the supported content type that can be deserialized/serialized
+     *
+     * @param document the document
+     * @return a serializable pojo class this document matches
+     */
+    Class detectClass(Object document);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java b/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
index 04ee923..b87854f 100644
--- a/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
+++ b/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
@@ -120,4 +120,7 @@ public class ActivityUtil {
         return String.format("id:%s:activities:%s", providerName, activityId);
     }
 
+    public static boolean isValid(Activity activity) {
+        return activity.getId() != null && activity.getVerb() != null && activity.getProvider().getId() != null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
index 8a74caa..c85de84 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
+import com.google.common.collect.Lists;
 
 import java.util.List;
 
@@ -38,6 +39,13 @@ public class StreamsJacksonMapper extends ObjectMapper {
         return INSTANCE;
     }
 
+    public static StreamsJacksonMapper getInstance(String format){
+
+        StreamsJacksonMapper instance = new StreamsJacksonMapper(Lists.newArrayList(format));
+
+        return instance;
+
+    }
     public static StreamsJacksonMapper getInstance(List<String> formats){
 
         StreamsJacksonMapper instance = new StreamsJacksonMapper(formats);
@@ -52,6 +60,12 @@ public class StreamsJacksonMapper extends ObjectMapper {
         configure();
     }
 
+    public StreamsJacksonMapper(String format) {
+        super();
+        registerModule(new StreamsJacksonModule(Lists.newArrayList(format)));
+        configure();
+    }
+
     public StreamsJacksonMapper(List<String> formats) {
         super();
         registerModule(new StreamsJacksonModule(formats));

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java
index e92a5ae..48207d5 100644
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java
@@ -20,17 +20,10 @@
 package org.apache.streams.pig;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.ArrayUtils;
 import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.data.ActivityConverter;
 import org.slf4j.Logger;
 
-import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
 /**
  * Static reflection wrappers for instantiating StreamsComponents
  */
@@ -38,7 +31,7 @@ public class StreamsComponentFactory {
 
     private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsComponentFactory.class);
 
-    public static ActivitySerializer getSerializerInstance(Class<?> serializerClazz) {
+    public static ActivityConverter getSerializerInstance(Class<?> serializerClazz) {
 
         Object object = null;
         try {
@@ -49,7 +42,7 @@ public class StreamsComponentFactory {
 
         Preconditions.checkNotNull(object);
 
-        ActivitySerializer serializer = (ActivitySerializer) object;
+        ActivityConverter serializer = (ActivityConverter) object;
 
         return serializer;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
index 788b347..2303d52 100644
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
@@ -20,24 +20,13 @@
 package org.apache.streams.pig;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import datafu.pig.util.SimpleEvalFunc;
 import org.apache.commons.lang.ArrayUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.pig.EvalFunc;
 import org.apache.pig.builtin.MonitoredUDF;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.util.UDFContext;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.data.ActivitySerializer;
 import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
 import org.slf4j.Logger;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
index d517752..65b7956 100644
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
@@ -21,26 +21,14 @@ package org.apache.streams.pig;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import datafu.pig.util.SimpleEvalFunc;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.pig.EvalFunc;
 import org.apache.pig.builtin.MonitoredUDF;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.util.UDFContext;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.data.ActivityConverter;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 import org.slf4j.Logger;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -53,7 +41,7 @@ public class StreamsSerializerExec extends SimpleEvalFunc<String> {
 
     private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsSerializerExec.class);
 
-    ActivitySerializer activitySerializer;
+    ActivityConverter activitySerializer;
     ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
     public StreamsSerializerExec(String... execArgs) throws ClassNotFoundException{

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDatumTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDatumTest.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDatumTest.java
index e643cb6..475d791 100644
--- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDatumTest.java
+++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDatumTest.java
@@ -21,15 +21,9 @@ package org.apache.streams.pig.test;
 
 import org.apache.pig.pigunit.PigTest;
 import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
 import org.apache.tools.ant.util.StringUtils;
-import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.text.ParseException;
 import java.util.List;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java
index 556ea3a..dd30eb1 100644
--- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java
+++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java
@@ -21,9 +21,6 @@ package org.apache.streams.pig.test;
 
 import org.apache.pig.pigunit.PigTest;
 import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
 import org.apache.tools.ant.util.StringUtils;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aad0e887/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigSerializerTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigSerializerTest.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigSerializerTest.java
index a7ad4a0..b53083a 100644
--- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigSerializerTest.java
+++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigSerializerTest.java
@@ -19,22 +19,14 @@
 
 package org.apache.streams.pig.test;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import org.apache.pig.data.Tuple;
 import org.apache.pig.pigunit.PigTest;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
-import org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
+import org.apache.streams.twitter.serializer.TwitterJsonActivityConverter;
 import org.apache.tools.ant.util.StringUtils;
-import org.junit.Ignore;
 import org.junit.Test;
 
-import java.io.File;
-import java.util.Iterator;
-
 /**
  * This is a test for StreamsSerializerExec
  */
@@ -47,7 +39,7 @@ public class PigSerializerTest {
                 "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{\"retweeted_status\":{\"contributors\":null,\"text\":\"The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"possibly_sensitive\":false,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[{\"expanded_url\":\"http://ti.me/zYyEtD\",\"indices\":[80,100],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\"}],\"hashtags\":[],\"user_mentions\":[{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[106,120],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\"}]},\"in_reply_to_status_id_str\":null,\"id\":159470076259602432,\"source\":\"<a href=\\\"http://www.hootsuite.com\\\" rel=\\\"nofollow\\\">HootSuite<\\/a>\",\"in_reply_to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retweet_count\":71,\"create
 d_at\":\"Wed Jan 18 03:00:03 +0000 2012\",\"in_reply_to_user_id\":null,\"favorite_count\":14,\"id_str\":\"159470076259602432\",\"place\":null,\"user\":{\"location\":\"\",\"default_profile\":false,\"profile_background_tile\":true,\"statuses_count\":70754,\"lang\":\"en\",\"profile_link_color\":\"1B4F89\",\"profile_banner_url\":\"https://pbs.twimg.com/profile_banners/14293310/1355243462\",\"id\":14293310,\"following\":false,\"protected\":false,\"favourites_count\":59,\"profile_text_color\":\"000000\",\"description\":\"Breaking news and current events from around the globe. Hosted by TIME staff. Tweet questions to our customer service team @TIMEmag_Service.\",\"verified\":true,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"000000\",\"name\":\"TIME.com\",\"profile_background_color\":\"CC0000\",\"created_at\":\"Thu Apr 03 13:54:30 +0000 2008\",\"default_profile_image\":false,\"followers_count\":5146268,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/1
 700796190/Picture_24_normal.png\",\"geo_enabled\":false,\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"profile_background_image_url_https\":\"https://si0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]},\"url\":{\"urls\":[{\"expanded_url\":\"http://www.time.com\",\"indices\":[0,22],\"display_url\":\"time.com\",\"url\":\"http://t.co/4aYbUuAeSh\"}]}},\"url\":\"http://t.co/4aYbUuAeSh\",\"utc_offset\":-18000,\"time_zone\":\"Eastern Time (US & Canada)\",\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":742,\"profile_sidebar_fill_color\":\"D9D9D9\",\"screen_name\":\"TIME\",\"id_str\":\"14293310\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/1700796190/Picture_24_normal.png\",\"listed_count\":76944,\"is_translator\":false},\"coordinates\":null},\"contr
 ibutors\":null,\"text\":\"RT @TIME: The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"possibly_sensitive\":false,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[{\"expanded_url\":\"http://ti.me/zYyEtD\",\"indices\":[90,110],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\"}],\"hashtags\":[],\"user_mentions\":[{\"id\":14293310,\"name\":\"TIME.com\",\"indices\":[3,8],\"screen_name\":\"TIME\",\"id_str\":\"14293310\"},{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[116,130],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\"}]},\"in_reply_to_status_id_str\":null,\"id\":159475541894897679,\"source\":\"<a href=\\\"http://twitter.com/download/iphone\\\" rel=\\\"nofollow\\\">Twitter for iPhone<\\/a>\",\"in_reply_to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retwe
 et_count\":71,\"created_at\":\"Wed Jan 18 03:21:46 +0000 2012\",\"in_reply_to_user_id\":null,\"favorite_count\":0,\"id_str\":\"159475541894897679\",\"place\":null,\"user\":{\"location\":\"\",\"default_profile\":false,\"profile_background_tile\":true,\"statuses_count\":5053,\"lang\":\"en\",\"profile_link_color\":\"738D84\",\"id\":27552112,\"following\":false,\"protected\":false,\"favourites_count\":52,\"profile_text_color\":\"97CEC9\",\"description\":\"\",\"verified\":false,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"A9AC00\",\"name\":\"rafael medina-flores\",\"profile_background_color\":\"C5EFE3\",\"created_at\":\"Mon Mar 30 01:21:55 +0000 2009\",\"default_profile_image\":false,\"followers_count\":963,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"geo_enabled\":true,\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"profile_background_image_url_https\":\"
 https://si0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]}},\"url\":null,\"utc_offset\":-25200,\"time_zone\":\"Mountain Time (US & Canada)\",\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":1800,\"profile_sidebar_fill_color\":\"5C4F3C\",\"screen_name\":\"rmedinaflores\",\"id_str\":\"27552112\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"listed_count\":50,\"is_translator\":false},\"coordinates\":null}"
         };
 
-        TwitterJsonActivitySerializer serializer = new TwitterJsonActivitySerializer();
+        TwitterJsonActivityConverter serializer = new TwitterJsonActivityConverter();
 
         String doc = (String) StringUtils.split(input[0], '\t').get(3);
         String outdoc = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT)).writeValueAsString(serializer.deserialize(doc));