You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/07/21 06:49:22 UTC
[07/18] git commit: STREAMS-105 | Updated the InstagramTypeConverter
to use the conversion utility functions provided in InstagramActivityUtil
STREAMS-105 | Updated the InstagramTypeConverter to use the conversion utility functions provided in InstagramActivityUtil
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/fa3f9220
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/fa3f9220
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/fa3f9220
Branch: refs/heads/master
Commit: fa3f92200d19605e931954a154e59123d1a36f03
Parents: 1163653
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Wed Jul 2 10:48:35 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Wed Jul 2 10:48:35 2014 -0500
----------------------------------------------------------------------
.../processor/InstagramTypeConverter.java | 116 +++----------------
.../serializer/util/InstagramActivityUtil.java | 32 +++--
2 files changed, 34 insertions(+), 114 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fa3f9220/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
index 14260e3..7fb1ec6 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
@@ -18,52 +18,34 @@
package org.apache.streams.instagram.processor;
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.instagram.serializer.InstagramJsonActivitySerializer;
-import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.instagram.serializer.util.InstagramActivityUtil;
import org.apache.streams.pojo.json.Activity;
import org.jinstagram.entity.users.feed.MediaFeedData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.List;
import java.util.Queue;
-/**
- * Created by sblackmon on 12/10/13.
- */
public class InstagramTypeConverter implements StreamsProcessor {
public final static String STREAMS_ID = "InstagramTypeConverter";
private final static Logger LOGGER = LoggerFactory.getLogger(InstagramTypeConverter.class);
- private ObjectMapper mapper;
-
private Queue<MediaFeedData> inQueue;
private Queue<StreamsDatum> outQueue;
- private Class inClass;
- private Class outClass;
-
- private InstagramJsonActivitySerializer instagramJsonActivitySerializer;
+ private InstagramActivityUtil instagramActivityUtil;
private int count = 0;
public final static String TERMINATE = new String("TERMINATE");
- public InstagramTypeConverter(Class inClass, Class outClass) {
- this.inClass = inClass;
- this.outClass = outClass;
+ public InstagramTypeConverter() {
}
public Queue<StreamsDatum> getProcessorOutputQueue() {
@@ -74,100 +56,31 @@ public class InstagramTypeConverter implements StreamsProcessor {
inQueue = inputQueue;
}
- public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException {
-
- Object result = null;
-
- if( outClass.equals( Activity.class )) {
- LOGGER.debug("ACTIVITY");
- result = instagramJsonActivitySerializer.deserialize(
- mapper.writeValueAsString(event));
- } else if( outClass.equals( ObjectNode.class )) {
- LOGGER.debug("OBJECTNODE");
- result = mapper.convertValue(event, ObjectNode.class);
- } else if( outClass.equals( String.class )) {
- LOGGER.debug("OBJECTNODE");
- result = mapper.writeValueAsString(event);
- }
-
-
- // no supported conversion were applied
- if( result != null ) {
- count ++;
- return result;
- }
-
- LOGGER.debug("CONVERT FAILED");
-
- return null;
-
- }
-
- public boolean validate(Object document, Class klass) {
-
- // TODO
- return true;
- }
-
- public boolean isValidJSON(final String json) {
- boolean valid = false;
- try {
- final JsonParser parser = new ObjectMapper().getJsonFactory()
- .createJsonParser(json);
- while (parser.nextToken() != null) {
- }
- valid = true;
- } catch (JsonParseException jpe) {
- LOGGER.warn("validate: {}", jpe);
- } catch (IOException ioe) {
- LOGGER.warn("validate: {}", ioe);
- }
-
- return valid;
- }
-
@Override
public List<StreamsDatum> process(StreamsDatum entry) {
StreamsDatum result = null;
try {
-
Object item = entry.getDocument();
- ObjectNode node;
LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
- if( item instanceof String ) {
-
- // if the target is string, just pass-through
- if( String.class.equals(outClass)) {
- result = entry;
- }
- else {
- // first check for valid json
- node = (ObjectNode)mapper.readTree((String)item);
+ if(item instanceof MediaFeedData) {
+ //We don't need to use the mapper, since we have a process to convert between
+ //MediaFeedData objects and Activity objects already
+ Activity activity = new Activity();
- Object out = convert(node, String.class, outClass);
+ instagramActivityUtil.updateActivity((MediaFeedData)item, activity);
- if( out != null && validate(out, outClass))
- result = new StreamsDatum(out);
+ if(activity.getId() != null) {
+ result = new StreamsDatum(activity);
+ count++;
}
-
- } else if( item instanceof ObjectNode ) {
-
- // first check for valid json
- node = (ObjectNode)mapper.valueToTree(item);
-
- Object out = convert(node, ObjectNode.class, outClass);
-
- if( out != null && validate(out, outClass))
- result = new StreamsDatum(out);
-
}
-
} catch (Exception e) {
e.printStackTrace();
+ LOGGER.error("Exception while converting MediaFeedData to Activity: {}", e.getMessage());
}
if( result != null )
@@ -178,13 +91,12 @@ public class InstagramTypeConverter implements StreamsProcessor {
@Override
public void prepare(Object o) {
- mapper = new StreamsJacksonMapper();
- instagramJsonActivitySerializer = new InstagramJsonActivitySerializer();
+ instagramActivityUtil = new InstagramActivityUtil();
}
@Override
public void cleanUp() {
-
+ //noop
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fa3f9220/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
index 0561ba7..bd926d5 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
@@ -53,7 +53,9 @@ public class InstagramActivityUtil {
*/
public static void updateActivity(MediaFeedData item, Activity activity) throws ActivitySerializerException {
activity.setActor(buildActor(item));
- activity.setPublished(new DateTime(Long.parseLong(item.getCreatedTime()) * 1000));
+
+ if(item.getCreatedTime() != null)
+ activity.setPublished(new DateTime(Long.parseLong(item.getCreatedTime()) * 1000));
activity.setId(formatId(activity.getVerb(),
Optional.fromNullable(
@@ -78,16 +80,20 @@ public class InstagramActivityUtil {
public static Actor buildActor(MediaFeedData item) {
Actor actor = new Actor();
- Image image = new Image();
- image.setUrl(item.getUser().getProfilePictureUrl());
+ try {
+ Image image = new Image();
+ image.setUrl(item.getUser().getProfilePictureUrl());
- Map<String, Object> extensions = new HashMap<String, Object>();
- extensions.put("screenName", item.getUser().getUserName());
+ Map<String, Object> extensions = new HashMap<String, Object>();
+ extensions.put("screenName", item.getUser().getUserName());
- actor.setId(formatId(String.valueOf(item.getUser().getId())));
- actor.setImage(image);
- actor.setAdditionalProperty("extensions", extensions);
- actor.setAdditionalProperty("handle", item.getUser().getUserName());
+ actor.setId(formatId(String.valueOf(item.getUser().getId())));
+ actor.setImage(image);
+ actor.setAdditionalProperty("extensions", extensions);
+ actor.setAdditionalProperty("handle", item.getUser().getUserName());
+ } catch (Exception e) {
+ LOGGER.error("Exception trying to build actor object: {}", e.getMessage());
+ }
return actor;
}
@@ -244,9 +250,11 @@ public class InstagramActivityUtil {
addLocationExtension(activity, item);
- Map<String, Object> likes = new HashMap<String, Object>();
- likes.put("count", item.getLikes().getCount());
- extensions.put("likes", likes);
+ if(item.getLikes() != null) {
+ Map<String, Object> likes = new HashMap<String, Object>();
+ likes.put("count", item.getLikes().getCount());
+ extensions.put("likes", likes);
+ }
extensions.put("hashtags", item.getTags());