You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/06/23 20:22:21 UTC
[17/45] git commit: Removed commented code per request
Removed commented code per request
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7337b09b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7337b09b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7337b09b
Branch: refs/heads/master
Commit: 7337b09b63c5507131bb8ee9c709a3b77a93260a
Parents: 997c691
Author: rebanks <re...@w2odigital.com>
Authored: Mon Jun 16 11:56:05 2014 -0500
Committer: rebanks <re...@w2odigital.com>
Committed: Mon Jun 16 11:56:05 2014 -0500
----------------------------------------------------------------------
.../DatasiftTypeConverterProcessor.java | 25 ++++++++++----------
.../DatasiftTweetActivitySerializer.java | 2 +-
2 files changed, 13 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7337b09b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java
index 33439ba..d04c04b 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java
@@ -21,8 +21,7 @@ package org.apache.streams.datasift.provider;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.core.*;
import org.apache.streams.datasift.Datasift;
import org.apache.streams.datasift.serializer.DatasiftActivitySerializer;
import org.apache.streams.jackson.StreamsJacksonMapper;
@@ -35,7 +34,7 @@ import java.util.List;
/**
*
*/
-public class DatasiftTypeConverterProcessor implements StreamsProcessor {
+public class DatasiftTypeConverterProcessor implements StreamsProcessor,DatumStatusCountable {
private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftTypeConverterProcessor.class);
@@ -43,6 +42,7 @@ public class DatasiftTypeConverterProcessor implements StreamsProcessor {
private Class outClass;
private DatasiftActivitySerializer datasiftInteractionActivitySerializer;
private DatasiftConverter converter;
+ private DatumStatusCounter counter;
public final static String TERMINATE = new String("TERMINATE");
@@ -52,6 +52,7 @@ public class DatasiftTypeConverterProcessor implements StreamsProcessor {
@Override
public List<StreamsDatum> process(StreamsDatum entry) {
+ this.counter.incrementAttempt();
List<StreamsDatum> result = Lists.newLinkedList();
Object doc;
try {
@@ -61,7 +62,9 @@ public class DatasiftTypeConverterProcessor implements StreamsProcessor {
}
} catch (Exception e) {
LOGGER.error("Exception converting Datasift Interaction to "+this.outClass.getName()+ " : {}", e);
+ this.counter.incrementStatus(DatumStatus.FAIL);
}
+ this.counter.incrementStatus(DatumStatus.SUCCESS);
return result;
}
@@ -77,6 +80,7 @@ public class DatasiftTypeConverterProcessor implements StreamsProcessor {
LOGGER.warn("Using defaulting datasift converter");
this.converter = new DefaultConverter(this.outClass);
}
+ this.counter = new DatumStatusCounter();
}
@Override
@@ -84,6 +88,11 @@ public class DatasiftTypeConverterProcessor implements StreamsProcessor {
}
+ @Override
+ public DatumStatusCounter getDatumStatusCounter() {
+ return this.counter;
+ }
+
private class ActivityConverter implements DatasiftConverter {
@Override
@@ -109,17 +118,12 @@ public class DatasiftTypeConverterProcessor implements StreamsProcessor {
public Object convert(Object toConvert, ObjectMapper mapper) {
try {
if(toConvert instanceof String){
-// LOGGER.debug(mapper.writeValueAsString(mapper.readValue((String) toConvert, Datasift.class)));
return mapper.writeValueAsString(mapper.readValue((String) toConvert, Datasift.class));
} else {
if(toConvert.getClass().equals(Activity.class)) { //hack to remove additional properties
ObjectNode node = mapper.convertValue(toConvert, ObjectNode.class);
if(node.has("additionalProperties")) {
ObjectNode additionalProperties = (ObjectNode) node.get("additionalProperties");
-// node.put("keywords", additionalProperties.get("keywords"));
-// node.put("location", additionalProperties.get("location"));
-// node.put("hashtags", additionalProperties.get("hashtags"));
-// node.put("datasift", additionalProperties.get("datasift"));
// node.put("user_mentions", additionalProperties.get("user_mentions"));
node.putAll(additionalProperties);
node.remove("additionalProperties");
@@ -128,11 +132,6 @@ public class DatasiftTypeConverterProcessor implements StreamsProcessor {
ObjectNode actor = (ObjectNode) node.get("actor");
if(actor.has("additionalProperties")) {
ObjectNode additionalProperties = (ObjectNode) actor.get("additionalProperties");
-// actor.put("followers", additionalProperties.get("followers"));
-// actor.put("location", additionalProperties.get("location"));
-// actor.put("screenName", additionalProperties.get("screenName"));
-// actor.put("posts", additionalProperties.get("posts"));
-// actor.put("favorites", additionalProperties.get("favorties"));
actor.putAll(additionalProperties);
actor.remove("additionalProperties");
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7337b09b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
index e2394ce..f5c4d12 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
@@ -42,7 +42,7 @@ import java.util.Map;
import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
/**
- *
+ *
*/
public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySerializer {