You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/06/23 20:22:21 UTC

[17/45] git commit: Removed commented code per request

Removed commented code per request


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7337b09b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7337b09b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7337b09b

Branch: refs/heads/master
Commit: 7337b09b63c5507131bb8ee9c709a3b77a93260a
Parents: 997c691
Author: rebanks <re...@w2odigital.com>
Authored: Mon Jun 16 11:56:05 2014 -0500
Committer: rebanks <re...@w2odigital.com>
Committed: Mon Jun 16 11:56:05 2014 -0500

----------------------------------------------------------------------
 .../DatasiftTypeConverterProcessor.java         | 25 ++++++++++----------
 .../DatasiftTweetActivitySerializer.java        |  2 +-
 2 files changed, 13 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7337b09b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java
index 33439ba..d04c04b 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java
@@ -21,8 +21,7 @@ package org.apache.streams.datasift.provider;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.core.*;
 import org.apache.streams.datasift.Datasift;
 import org.apache.streams.datasift.serializer.DatasiftActivitySerializer;
 import org.apache.streams.jackson.StreamsJacksonMapper;
@@ -35,7 +34,7 @@ import java.util.List;
 /**
  *
  */
-public class DatasiftTypeConverterProcessor implements StreamsProcessor {
+public class DatasiftTypeConverterProcessor implements StreamsProcessor,DatumStatusCountable {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftTypeConverterProcessor.class);
 
@@ -43,6 +42,7 @@ public class DatasiftTypeConverterProcessor implements StreamsProcessor {
     private Class outClass;
     private DatasiftActivitySerializer datasiftInteractionActivitySerializer;
     private DatasiftConverter converter;
+    private DatumStatusCounter counter;
 
     public final static String TERMINATE = new String("TERMINATE");
 
@@ -52,6 +52,7 @@ public class DatasiftTypeConverterProcessor implements StreamsProcessor {
 
     @Override
     public List<StreamsDatum> process(StreamsDatum entry) {
+        this.counter.incrementAttempt();
         List<StreamsDatum> result = Lists.newLinkedList();
         Object doc;
         try {
@@ -61,7 +62,9 @@ public class DatasiftTypeConverterProcessor implements StreamsProcessor {
             }
         } catch (Exception e) {
             LOGGER.error("Exception converting Datasift Interaction to "+this.outClass.getName()+ " : {}", e);
+            this.counter.incrementStatus(DatumStatus.FAIL);
         }
+        this.counter.incrementStatus(DatumStatus.SUCCESS);
         return result;
     }
 
@@ -77,6 +80,7 @@ public class DatasiftTypeConverterProcessor implements StreamsProcessor {
             LOGGER.warn("Using defaulting datasift converter");
             this.converter = new DefaultConverter(this.outClass);
         }
+        this.counter = new DatumStatusCounter();
     }
 
     @Override
@@ -84,6 +88,11 @@ public class DatasiftTypeConverterProcessor implements StreamsProcessor {
 
     }
 
+    @Override
+    public DatumStatusCounter getDatumStatusCounter() {
+        return this.counter;
+    }
+
     private class ActivityConverter implements DatasiftConverter {
 
         @Override
@@ -109,17 +118,12 @@ public class DatasiftTypeConverterProcessor implements StreamsProcessor {
         public Object convert(Object toConvert, ObjectMapper mapper) {
             try {
                 if(toConvert instanceof String){
-//                    LOGGER.debug(mapper.writeValueAsString(mapper.readValue((String) toConvert, Datasift.class)));
                     return mapper.writeValueAsString(mapper.readValue((String) toConvert, Datasift.class));
                 } else {
                     if(toConvert.getClass().equals(Activity.class)) { //hack to remove additional properties
                         ObjectNode node = mapper.convertValue(toConvert, ObjectNode.class);
                         if(node.has("additionalProperties")) {
                             ObjectNode additionalProperties = (ObjectNode) node.get("additionalProperties");
-//                            node.put("keywords", additionalProperties.get("keywords"));
-//                            node.put("location", additionalProperties.get("location"));
-//                            node.put("hashtags", additionalProperties.get("hashtags"));
-//                            node.put("datasift", additionalProperties.get("datasift"));
 //                            node.put("user_mentions", additionalProperties.get("user_mentions"));
                             node.putAll(additionalProperties);
                             node.remove("additionalProperties");
@@ -128,11 +132,6 @@ public class DatasiftTypeConverterProcessor implements StreamsProcessor {
                             ObjectNode actor = (ObjectNode) node.get("actor");
                             if(actor.has("additionalProperties")) {
                                 ObjectNode additionalProperties = (ObjectNode) actor.get("additionalProperties");
-//                                actor.put("followers", additionalProperties.get("followers"));
-//                                actor.put("location", additionalProperties.get("location"));
-//                                actor.put("screenName", additionalProperties.get("screenName"));
-//                                actor.put("posts", additionalProperties.get("posts"));
-//                                actor.put("favorites", additionalProperties.get("favorties"));
                                 actor.putAll(additionalProperties);
                                 actor.remove("additionalProperties");
                             }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7337b09b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
index e2394ce..f5c4d12 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
@@ -42,7 +42,7 @@ import java.util.Map;
 import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
 
 /**
- * 
+ *
  */
 public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySerializer {