You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/23 21:11:56 UTC
[11/28] incubator-streams git commit: improved error handling
serialize twitter type support classes
improved error handling
serialize twitter type support classes
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/93026a7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/93026a7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/93026a7c
Branch: refs/heads/master
Commit: 93026a7c2110df4ff441d2732f68085ebf933d53
Parents: 8737bda
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Sat Nov 15 12:51:11 2014 -0600
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Sat Nov 15 12:51:50 2014 -0600
----------------------------------------------------------------------
.../converter/ActivityConverterProcessor.java | 23 ++++++++++++++--
.../converter/TypeConverterProcessor.java | 5 ++--
.../streams-provider-twitter/pom.xml | 6 +++-
.../serializer/TwitterConverterResolver.java | 29 ++++++--------------
.../serializer/TwitterDocumentClassifier.java | 11 +++++---
5 files changed, 43 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/93026a7c/streams-components/streams-converters/src/main/java/org/apache/streams/converter/ActivityConverterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/ActivityConverterProcessor.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/ActivityConverterProcessor.java
index 46ba788..629f979 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/ActivityConverterProcessor.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/ActivityConverterProcessor.java
@@ -74,6 +74,9 @@ public class ActivityConverterProcessor extends TypeConverterProcessor {
break;
}
+ //Preconditions.checkNotNull(datumClass);
+ if( datumClass == null) return result;
+
// This implementation is primitive, greedy, takes first it can resolve
Class converterClass = null;
for( ActivityConverterResolver resolver : resolvers ) {
@@ -82,25 +85,39 @@ public class ActivityConverterProcessor extends TypeConverterProcessor {
break;
}
+ //Preconditions.checkNotNull(converterClass);
+ if( converterClass == null) return result;
+
ActivityConverter converter = ActivityConverterFactory.getInstance(converterClass);
+ //Preconditions.checkNotNull(converter);
+ if( converter == null) return result;
+
Object typedDoc;
if( datumClass.isInstance(inDoc) )
typedDoc = inDoc;
else
typedDoc = TypeConverterUtil.convert(inDoc, datumClass, mapper);
+ //Preconditions.checkNotNull(typedDoc);
+ if( typedDoc == null) return result;
+
Activity activity = converter.deserialize(typedDoc);
+ //Preconditions.checkNotNull(activity);
+ if( activity == null) return result;
+
entry.setDocument(activity);
result.add(entry);
- } catch( Throwable e ) {
- LOGGER.warn("Unable to serialize!", e.getMessage());
+ } catch( Exception e ) {
+ LOGGER.warn("Unable to serialize! " + e.getMessage());
+ e.printStackTrace();
+ } finally {
+ return result;
}
- return result;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/93026a7c/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java
index 6bfeead..5ff5567 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java
@@ -19,7 +19,6 @@ under the License.
package org.apache.streams.converter;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
@@ -27,13 +26,13 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
+import java.io.Serializable;
import java.util.List;
/**
*
*/
-public class TypeConverterProcessor implements StreamsProcessor {
+public class TypeConverterProcessor implements StreamsProcessor, Serializable {
private final static Logger LOGGER = LoggerFactory.getLogger(TypeConverterProcessor.class);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/93026a7c/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml
index 1c0e8cb..660afbc 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -80,7 +80,11 @@
<artifactId>hbc-core</artifactId>
<version>2.1.0</version>
</dependency>
-
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ <version>1.2</version>
+ </dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-core</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/93026a7c/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterConverterResolver.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterConverterResolver.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterConverterResolver.java
index f14c9f5..5a58e42 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterConverterResolver.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterConverterResolver.java
@@ -18,25 +18,9 @@
package org.apache.streams.twitter.serializer;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.streams.data.ActivityConverter;
import org.apache.streams.data.ActivityConverterResolver;
-import org.apache.streams.data.DocumentClassifier;
import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.pojo.Delete;
-import org.apache.streams.twitter.pojo.FriendList;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.pojo.User;
-import org.apache.streams.twitter.pojo.UserstreamEvent;
-
-import java.io.IOException;
-import java.io.Serializable;
+import org.apache.streams.twitter.pojo.*;
/**
* Created by sblackmon on 12/13/13.
@@ -50,10 +34,12 @@ public class TwitterConverterResolver implements ActivityConverterResolver {
private static TwitterConverterResolver instance = new TwitterConverterResolver();
public static TwitterConverterResolver getInstance() {
+
+ if( instance == null )
+ instance = new TwitterConverterResolver();
return instance;
- }
- private static ObjectMapper mapper = new StreamsJacksonMapper(StreamsTwitterMapper.TWITTER_FORMAT);
+ }
@Override
public Class bestSerializer(Class documentClass) throws ActivitySerializerException {
@@ -66,7 +52,10 @@ public class TwitterConverterResolver implements ActivityConverterResolver {
return TwitterJsonUserActivityConverter.class;
else if (documentClass == UserstreamEvent.class)
return TwitterJsonUserstreameventActivityConverter.class;
- else return TwitterJsonTweetActivityConverter.class;
+ else if (documentClass == FriendList.class)
+ return null;
+ else
+ return TwitterJsonTweetActivityConverter.class;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/93026a7c/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterDocumentClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterDocumentClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterDocumentClassifier.java
index b9ca789..1719491 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterDocumentClassifier.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterDocumentClassifier.java
@@ -23,13 +23,11 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
-import org.apache.streams.data.ActivityConverter;
import org.apache.streams.data.DocumentClassifier;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.twitter.pojo.*;
import java.io.IOException;
-import java.io.Serializable;
/**
* Created by sblackmon on 12/13/13.
@@ -40,13 +38,16 @@ public class TwitterDocumentClassifier implements DocumentClassifier {
}
- private static TwitterDocumentClassifier instance = new TwitterDocumentClassifier();
+ private static TwitterDocumentClassifier instance;
public static TwitterDocumentClassifier getInstance() {
+
+ if( instance == null )
+ instance = new TwitterDocumentClassifier();
return instance;
}
- private static ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
+ private static ObjectMapper mapper;
public Class detectClass(Object document) {
@@ -56,6 +57,8 @@ public class TwitterDocumentClassifier implements DocumentClassifier {
String json = (String)document;
Preconditions.checkArgument(StringUtils.isNotEmpty(json));
+ mapper = new StreamsJacksonMapper(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT));
+
ObjectNode objectNode;
try {
objectNode = (ObjectNode) mapper.readTree(json);