You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/07 19:59:23 UTC

incubator-streams git commit: need to add these classes to SCM

Repository: incubator-streams
Updated Branches:
  refs/heads/STREAMS-212 612676d4e -> 800bce964


need to add these classes to SCM


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/800bce96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/800bce96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/800bce96

Branch: refs/heads/STREAMS-212
Commit: 800bce964753ab4d4c8fe1a0fbf13b1557c7c48b
Parents: 612676d
Author: sblackmon <sb...@apache.org>
Authored: Fri Nov 7 10:58:39 2014 -0800
Committer: sblackmon <sb...@apache.org>
Committed: Fri Nov 7 10:58:39 2014 -0800

----------------------------------------------------------------------
 .../serializer/DatasiftEventClassifier.java     |  35 +++
 .../DatasiftInteractionActivitySerializer.java  | 222 +++++++++++++++++++
 2 files changed, 257 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/800bce96/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java
new file mode 100644
index 0000000..e90de6a
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftEventClassifier.java
@@ -0,0 +1,35 @@
+package org.apache.streams.datasift.serializer;
+
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.instagram.Instagram;
+import org.apache.streams.datasift.interaction.Interaction;
+import org.apache.streams.datasift.twitter.Twitter;
+
+/**
+ * Created by sblackmon on 11/6/14.
+ */
+public class DatasiftEventClassifier {
+
+    public static Class detectClass(Datasift event) {
+
+        if(event.getTwitter() != null) {
+            return Twitter.class;
+        } else if(event.getInstagram() != null) {
+            return Instagram.class;
+        } else {
+            return Interaction.class;
+        }
+    }
+
+    public static ActivitySerializer bestSerializer(Datasift event) {
+
+        if(event.getTwitter() != null) {
+            return DatasiftTweetActivitySerializer.getInstance();
+        } else if(event.getInstagram() != null) {
+            return DatasiftInstagramActivitySerializer.getInstance();
+        } else {
+            return DatasiftInteractionActivitySerializer.getInstance();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/800bce96/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializer.java
new file mode 100644
index 0000000..c856dc2
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializer.java
@@ -0,0 +1,222 @@
+package org.apache.streams.datasift.serializer;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.interaction.Interaction;
+import org.apache.streams.datasift.links.Links;
+import org.apache.streams.datasift.util.StreamsDatasiftMapper;
+import org.apache.streams.pojo.json.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
+
+/**
+ *
+ */
+public class DatasiftInteractionActivitySerializer implements ActivitySerializer<Datasift>, Serializable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(DatasiftInteractionActivitySerializer.class);
+
+    private static DatasiftInteractionActivitySerializer instance = new DatasiftInteractionActivitySerializer();
+
+    public static DatasiftInteractionActivitySerializer getInstance() {
+        return instance;
+    }
+
+    ObjectMapper mapper = StreamsDatasiftMapper.getInstance();
+
+    @Override
+    public String serializationFormat() {
+        return "application/json+datasift.com.v1.1";
+    }
+
+    @Override
+    public Datasift serialize(Activity deserialized) {
+        throw new UnsupportedOperationException("Cannot currently serialize to Datasift JSON");
+    }
+
+    public Activity deserialize(String datasiftJson) {
+        try {
+            return deserialize(this.mapper.readValue(datasiftJson, Datasift.class));
+        } catch (Exception e) {
+            LOGGER.error("Exception while trying convert,\n {},\n to a Datasift object.", datasiftJson);
+            LOGGER.error("Exception : {}", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public Activity deserialize(Datasift serialized) {
+
+        try {
+
+            Activity activity = convert(serialized);
+
+            return activity;
+
+        } catch (Exception e) {
+            throw new IllegalArgumentException("Unable to deserialize", e);
+        }
+
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<Datasift> datasifts) {
+        List<Activity> activities = Lists.newArrayList();
+        for( Datasift datasift : datasifts ) {
+            activities.add(deserialize(datasift));
+        }
+        return activities;
+    }
+
+    public static Generator buildGenerator(Interaction interaction) {
+        Generator generator = new Generator();
+        generator.setDisplayName(interaction.getSource());
+        generator.setId(interaction.getSource());
+        return generator;
+    }
+
+    public static Icon getIcon(Interaction interaction) {
+        return null;
+    }
+
+    public static Provider buildProvider(Interaction interaction) {
+        Provider provider = new Provider();
+        provider.setId("id:providers:"+interaction.getType());
+        provider.setDisplayName(interaction.getType());
+        return provider;
+    }
+
+    public static String getUrls(Interaction interaction) {
+        return null;
+    }
+
+    public static void addDatasiftExtension(Activity activity, Datasift datasift) {
+        Map<String, Object> extensions = org.apache.streams.data.util.ActivityUtil.ensureExtensions(activity);
+        extensions.put("datasift", datasift);
+    }
+
+    public static String formatId(String... idparts) {
+        return Joiner.on(":").join(Lists.asList("id:datasift", idparts));
+    }
+
+    public Activity convert(Datasift event) {
+
+        Preconditions.checkNotNull(event);
+        Preconditions.checkNotNull(event.getInteraction());
+
+        Activity activity = new Activity();
+        activity.setActor(buildActor(event.getInteraction()));
+        activity.setVerb(selectVerb(event));
+        activity.setObject(buildActivityObject(event.getInteraction()));
+        activity.setId(formatId(activity.getVerb(), event.getInteraction().getId()));
+        activity.setTarget(buildTarget(event.getInteraction()));
+        activity.setPublished(event.getInteraction().getCreatedAt());
+        activity.setGenerator(buildGenerator(event.getInteraction()));
+        activity.setIcon(getIcon(event.getInteraction()));
+        activity.setProvider(buildProvider(event.getInteraction()));
+        activity.setTitle(event.getInteraction().getTitle());
+        activity.setContent(event.getInteraction().getContent());
+        activity.setUrl(event.getInteraction().getLink());
+        activity.setLinks(getLinks(event));
+        addDatasiftExtension(activity, event);
+        if( event.getInteraction().getGeo() != null) {
+            addLocationExtension(activity, event.getInteraction());
+        }
+        return activity;
+    }
+
+    private String selectVerb(Datasift event) {
+        return "post";
+    }
+
+    public Actor buildActor(Interaction interaction) {
+        Actor actor = new Actor();
+        org.apache.streams.datasift.interaction.Author author = interaction.getAuthor();
+        if(author == null) {
+            LOGGER.warn("Interaction does not contain author information.");
+            return actor;
+        }
+        String userName = author.getUsername();
+        String name = author.getName();
+        Long id  = author.getId();
+        if(userName != null) {
+            actor.setDisplayName(userName);
+        } else {
+            actor.setDisplayName(name);
+        }
+
+        if(id != null) {
+            actor.setId(id.toString());
+        } else {
+            if(userName != null)
+                actor.setId(userName);
+            else
+                actor.setId(name);
+        }
+        Image image = new Image();
+        image.setUrl(interaction.getAuthor().getAvatar());
+        actor.setImage(image);
+        if (interaction.getAuthor().getLink()!=null){
+            actor.setUrl(interaction.getAuthor().getLink());
+        }
+        return actor;
+    }
+
+    public static ActivityObject buildActivityObject(Interaction interaction) {
+        ActivityObject actObj = new ActivityObject();
+        actObj.setObjectType(interaction.getContenttype());
+        actObj.setUrl(interaction.getLink());
+        actObj.setId(formatId("post",interaction.getId()));
+        actObj.setContent(interaction.getContent());
+
+        return actObj;
+    }
+
+    public static List<String> getLinks(Datasift event) {
+        List<String> result = Lists.newArrayList();
+        Links links = event.getLinks();
+        if(links == null)
+            return null;
+        for(Object link : links.getNormalizedUrl()) {
+            if(link != null) {
+                if(link instanceof String) {
+                    result.add((String) link);
+                } else {
+                    LOGGER.warn("link is not of type String : {}", link.getClass().getName());
+                }
+            }
+        }
+        return result;
+    }
+
+    public static ActivityObject buildTarget(Interaction interaction) {
+        return null;
+    }
+
+    public static void addLocationExtension(Activity activity, Interaction interaction) {
+        Map<String, Object> extensions = ensureExtensions(activity);
+        Map<String, Object> location = new HashMap<String, Object>();
+        Map<String, Double> coordinates = new HashMap<String, Double>();
+        coordinates.put("latitude", interaction.getGeo().getLatitude());
+        coordinates.put("longitude", interaction.getGeo().getLongitude());
+        location.put("coordinates", coordinates);
+        extensions.put("location", location);
+    }
+
+    public static String firstStringIfNotNull(List<Object> list) {
+        if( list != null && list.size() > 0) {
+            return (String) list.get(0);
+        } else return null;
+    }
+}