You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/16 01:56:25 UTC

[07/10] incubator-streams git commit: improved error handling serialize twitter type support classes

improved error handling
serialize twitter type support classes


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/93026a7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/93026a7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/93026a7c

Branch: refs/heads/STREAMS-68,218
Commit: 93026a7c2110df4ff441d2732f68085ebf933d53
Parents: 8737bda
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Sat Nov 15 12:51:11 2014 -0600
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Sat Nov 15 12:51:50 2014 -0600

----------------------------------------------------------------------
 .../converter/ActivityConverterProcessor.java   | 23 ++++++++++++++--
 .../converter/TypeConverterProcessor.java       |  5 ++--
 .../streams-provider-twitter/pom.xml            |  6 +++-
 .../serializer/TwitterConverterResolver.java    | 29 ++++++--------------
 .../serializer/TwitterDocumentClassifier.java   | 11 +++++---
 5 files changed, 43 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/93026a7c/streams-components/streams-converters/src/main/java/org/apache/streams/converter/ActivityConverterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/ActivityConverterProcessor.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/ActivityConverterProcessor.java
index 46ba788..629f979 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/ActivityConverterProcessor.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/ActivityConverterProcessor.java
@@ -74,6 +74,9 @@ public class ActivityConverterProcessor extends TypeConverterProcessor {
                     break;
             }
 
+            //Preconditions.checkNotNull(datumClass);
+            if( datumClass == null) return result;
+
             // This implementation is primitive, greedy, takes first it can resolve
             Class converterClass = null;
             for( ActivityConverterResolver resolver : resolvers ) {
@@ -82,25 +85,39 @@ public class ActivityConverterProcessor extends TypeConverterProcessor {
                     break;
             }
 
+            //Preconditions.checkNotNull(converterClass);
+            if( converterClass == null) return result;
+
             ActivityConverter converter = ActivityConverterFactory.getInstance(converterClass);
 
+            //Preconditions.checkNotNull(converter);
+            if( converter == null) return result;
+
             Object typedDoc;
             if( datumClass.isInstance(inDoc) )
                 typedDoc = inDoc;
             else
                 typedDoc = TypeConverterUtil.convert(inDoc, datumClass, mapper);
 
+            //Preconditions.checkNotNull(typedDoc);
+            if( typedDoc == null) return result;
+
             Activity activity = converter.deserialize(typedDoc);
 
+            //Preconditions.checkNotNull(activity);
+            if( activity == null) return result;
+
             entry.setDocument(activity);
 
             result.add(entry);
 
-        } catch( Throwable e ) {
-            LOGGER.warn("Unable to serialize!", e.getMessage());
+        } catch( Exception e ) {
+            LOGGER.warn("Unable to serialize!  " + e.getMessage());
+            e.printStackTrace();
+        } finally {
+            return result;
         }
 
-        return result;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/93026a7c/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java
index 6bfeead..5ff5567 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java
@@ -19,7 +19,6 @@ under the License.
 package org.apache.streams.converter;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Lists;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
@@ -27,13 +26,13 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
+import java.io.Serializable;
 import java.util.List;
 
 /**
  *
  */
-public class TypeConverterProcessor implements StreamsProcessor {
+public class TypeConverterProcessor implements StreamsProcessor, Serializable {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TypeConverterProcessor.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/93026a7c/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml
index 1c0e8cb..660afbc 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -80,7 +80,11 @@
             <artifactId>hbc-core</artifactId>
             <version>2.1.0</version>
         </dependency>
-
+        <dependency>
+            <groupId>commons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+            <version>1.2</version>
+        </dependency>
         <dependency>
             <groupId>org.twitter4j</groupId>
             <artifactId>twitter4j-core</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/93026a7c/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterConverterResolver.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterConverterResolver.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterConverterResolver.java
index f14c9f5..5a58e42 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterConverterResolver.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterConverterResolver.java
@@ -18,25 +18,9 @@
 
 package org.apache.streams.twitter.serializer;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.streams.data.ActivityConverter;
 import org.apache.streams.data.ActivityConverterResolver;
-import org.apache.streams.data.DocumentClassifier;
 import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.pojo.Delete;
-import org.apache.streams.twitter.pojo.FriendList;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.pojo.User;
-import org.apache.streams.twitter.pojo.UserstreamEvent;
-
-import java.io.IOException;
-import java.io.Serializable;
+import org.apache.streams.twitter.pojo.*;
 
 /**
  * Created by sblackmon on 12/13/13.
@@ -50,10 +34,12 @@ public class TwitterConverterResolver implements ActivityConverterResolver {
     private static TwitterConverterResolver instance = new TwitterConverterResolver();
 
     public static TwitterConverterResolver getInstance() {
+
+        if( instance == null )
+            instance = new TwitterConverterResolver();
         return instance;
-    }
 
-    private static ObjectMapper mapper = new StreamsJacksonMapper(StreamsTwitterMapper.TWITTER_FORMAT);
+    }
 
     @Override
     public Class bestSerializer(Class documentClass) throws ActivitySerializerException {
@@ -66,7 +52,10 @@ public class TwitterConverterResolver implements ActivityConverterResolver {
             return TwitterJsonUserActivityConverter.class;
         else if (documentClass == UserstreamEvent.class)
             return TwitterJsonUserstreameventActivityConverter.class;
-        else return TwitterJsonTweetActivityConverter.class;
+        else if (documentClass == FriendList.class)
+            return null;
+        else
+            return TwitterJsonTweetActivityConverter.class;
 
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/93026a7c/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterDocumentClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterDocumentClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterDocumentClassifier.java
index b9ca789..1719491 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterDocumentClassifier.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterDocumentClassifier.java
@@ -23,13 +23,11 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
-import org.apache.streams.data.ActivityConverter;
 import org.apache.streams.data.DocumentClassifier;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.pojo.*;
 
 import java.io.IOException;
-import java.io.Serializable;
 
 /**
  * Created by sblackmon on 12/13/13.
@@ -40,13 +38,16 @@ public class TwitterDocumentClassifier implements DocumentClassifier {
 
     }
 
-    private static TwitterDocumentClassifier instance = new TwitterDocumentClassifier();
+    private static TwitterDocumentClassifier instance;
 
     public static TwitterDocumentClassifier getInstance() {
+
+        if( instance == null )
+            instance = new TwitterDocumentClassifier();
         return instance;
     }
 
-    private static ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
+    private static ObjectMapper mapper;
 
     public Class detectClass(Object document) {
 
@@ -56,6 +57,8 @@ public class TwitterDocumentClassifier implements DocumentClassifier {
         String json = (String)document;
         Preconditions.checkArgument(StringUtils.isNotEmpty(json));
 
+        mapper = new StreamsJacksonMapper(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
+
         ObjectNode objectNode;
         try {
             objectNode = (ObjectNode) mapper.readTree(json);