You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/07/21 06:49:22 UTC

[07/18] git commit: STREAMS-105 | Updated the InstagramTypeConverter to use the conversion utility functions provided in InstagramActivityUtil

STREAMS-105 | Updated the InstagramTypeConverter to use the conversion utility functions provided in InstagramActivityUtil


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/fa3f9220
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/fa3f9220
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/fa3f9220

Branch: refs/heads/master
Commit: fa3f92200d19605e931954a154e59123d1a36f03
Parents: 1163653
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Wed Jul 2 10:48:35 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Wed Jul 2 10:48:35 2014 -0500

----------------------------------------------------------------------
 .../processor/InstagramTypeConverter.java       | 116 +++----------------
 .../serializer/util/InstagramActivityUtil.java  |  32 +++--
 2 files changed, 34 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fa3f9220/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
index 14260e3..7fb1ec6 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
@@ -18,52 +18,34 @@
 
 package org.apache.streams.instagram.processor;
 
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Lists;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.instagram.serializer.InstagramJsonActivitySerializer;
-import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.instagram.serializer.util.InstagramActivityUtil;
 import org.apache.streams.pojo.json.Activity;
 import org.jinstagram.entity.users.feed.MediaFeedData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Queue;
 
-/**
- * Created by sblackmon on 12/10/13.
- */
 public class InstagramTypeConverter implements StreamsProcessor {
 
     public final static String STREAMS_ID = "InstagramTypeConverter";
 
     private final static Logger LOGGER = LoggerFactory.getLogger(InstagramTypeConverter.class);
 
-    private ObjectMapper mapper;
-
     private Queue<MediaFeedData> inQueue;
     private Queue<StreamsDatum> outQueue;
 
-    private Class inClass;
-    private Class outClass;
-
-    private InstagramJsonActivitySerializer instagramJsonActivitySerializer;
+    private InstagramActivityUtil instagramActivityUtil;
 
     private int count = 0;
 
     public final static String TERMINATE = new String("TERMINATE");
 
-    public InstagramTypeConverter(Class inClass, Class outClass) {
-        this.inClass = inClass;
-        this.outClass = outClass;
+    public InstagramTypeConverter() {
     }
 
     public Queue<StreamsDatum> getProcessorOutputQueue() {
@@ -74,100 +56,31 @@ public class InstagramTypeConverter implements StreamsProcessor {
         inQueue = inputQueue;
     }
 
-    public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException {
-
-        Object result = null;
-
-        if( outClass.equals( Activity.class )) {
-            LOGGER.debug("ACTIVITY");
-            result = instagramJsonActivitySerializer.deserialize(
-                    mapper.writeValueAsString(event));
-        } else if( outClass.equals( ObjectNode.class )) {
-            LOGGER.debug("OBJECTNODE");
-            result = mapper.convertValue(event, ObjectNode.class);
-        } else if( outClass.equals( String.class )) {
-            LOGGER.debug("OBJECTNODE");
-            result = mapper.writeValueAsString(event);
-        }
-
-
-    // no supported conversion were applied
-        if( result != null ) {
-            count ++;
-            return result;
-        }
-
-        LOGGER.debug("CONVERT FAILED");
-
-        return null;
-
-    }
-
-    public boolean validate(Object document, Class klass) {
-
-        // TODO
-        return true;
-    }
-
-    public boolean isValidJSON(final String json) {
-        boolean valid = false;
-        try {
-            final JsonParser parser = new ObjectMapper().getJsonFactory()
-                    .createJsonParser(json);
-            while (parser.nextToken() != null) {
-            }
-            valid = true;
-        } catch (JsonParseException jpe) {
-            LOGGER.warn("validate: {}", jpe);
-        } catch (IOException ioe) {
-            LOGGER.warn("validate: {}", ioe);
-        }
-
-        return valid;
-    }
-
     @Override
     public List<StreamsDatum> process(StreamsDatum entry) {
 
         StreamsDatum result = null;
 
         try {
-
             Object item = entry.getDocument();
-            ObjectNode node;
 
             LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
 
-            if( item instanceof String ) {
-
-                // if the target is string, just pass-through
-                if( String.class.equals(outClass)) {
-                    result = entry;
-                }
-                else {
-                    // first check for valid json
-                    node = (ObjectNode)mapper.readTree((String)item);
+            if(item instanceof MediaFeedData) {
+                //We don't need to use the mapper, since we have a process to convert between
+                //MediaFeedData objects and Activity objects already
+                Activity activity = new Activity();
 
-                    Object out = convert(node, String.class, outClass);
+                instagramActivityUtil.updateActivity((MediaFeedData)item, activity);
 
-                    if( out != null && validate(out, outClass))
-                        result = new StreamsDatum(out);
+                if(activity.getId() != null) {
+                    result = new StreamsDatum(activity);
+                    count++;
                 }
-
-            } else if( item instanceof ObjectNode ) {
-
-                // first check for valid json
-                node = (ObjectNode)mapper.valueToTree(item);
-
-                Object out = convert(node, ObjectNode.class, outClass);
-
-                if( out != null && validate(out, outClass))
-                    result = new StreamsDatum(out);
-
             }
-
         } catch (Exception e) {
             e.printStackTrace();
+            LOGGER.error("Exception while converting MediaFeedData to Activity: {}", e.getMessage());
         }
 
         if( result != null )
@@ -178,13 +91,12 @@ public class InstagramTypeConverter implements StreamsProcessor {
 
     @Override
     public void prepare(Object o) {
-        mapper = new StreamsJacksonMapper();
-        instagramJsonActivitySerializer = new InstagramJsonActivitySerializer();
+        instagramActivityUtil = new InstagramActivityUtil();
     }
 
     @Override
     public void cleanUp() {
-
+        //noop
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fa3f9220/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
index 0561ba7..bd926d5 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
@@ -53,7 +53,9 @@ public class InstagramActivityUtil {
      */
     public static void updateActivity(MediaFeedData item, Activity activity) throws ActivitySerializerException {
         activity.setActor(buildActor(item));
-        activity.setPublished(new DateTime(Long.parseLong(item.getCreatedTime()) * 1000));
+
+        if(item.getCreatedTime() != null)
+            activity.setPublished(new DateTime(Long.parseLong(item.getCreatedTime()) * 1000));
 
         activity.setId(formatId(activity.getVerb(),
             Optional.fromNullable(
@@ -78,16 +80,20 @@ public class InstagramActivityUtil {
     public static  Actor buildActor(MediaFeedData item) {
         Actor actor = new Actor();
 
-        Image image = new Image();
-        image.setUrl(item.getUser().getProfilePictureUrl());
+        try {
+            Image image = new Image();
+            image.setUrl(item.getUser().getProfilePictureUrl());
 
-        Map<String, Object> extensions = new HashMap<String, Object>();
-        extensions.put("screenName", item.getUser().getUserName());
+            Map<String, Object> extensions = new HashMap<String, Object>();
+            extensions.put("screenName", item.getUser().getUserName());
 
-        actor.setId(formatId(String.valueOf(item.getUser().getId())));
-        actor.setImage(image);
-        actor.setAdditionalProperty("extensions", extensions);
-        actor.setAdditionalProperty("handle", item.getUser().getUserName());
+            actor.setId(formatId(String.valueOf(item.getUser().getId())));
+            actor.setImage(image);
+            actor.setAdditionalProperty("extensions", extensions);
+            actor.setAdditionalProperty("handle", item.getUser().getUserName());
+        } catch (Exception e) {
+            LOGGER.error("Exception trying to build actor object: {}", e.getMessage());
+        }
 
         return actor;
     }
@@ -244,9 +250,11 @@ public class InstagramActivityUtil {
 
         addLocationExtension(activity, item);
 
-        Map<String, Object> likes = new HashMap<String, Object>();
-        likes.put("count", item.getLikes().getCount());
-        extensions.put("likes", likes);
+        if(item.getLikes() != null) {
+            Map<String, Object> likes = new HashMap<String, Object>();
+            likes.put("count", item.getLikes().getCount());
+            extensions.put("likes", likes);
+        }
 
         extensions.put("hashtags", item.getTags());