You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/05/05 20:54:34 UTC
[47/52] [abbrv] git commit: STREAMS-41 implementation and
documentation
STREAMS-41
implementation and documentation
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c49e600a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c49e600a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c49e600a
Branch: refs/heads/sblackmon
Commit: c49e600af974d318c5555fce0f558bc161820870
Parents: 69d721e
Author: sblackmon <sb...@w2odigital.com>
Authored: Wed Apr 16 17:12:02 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Wed Apr 16 17:12:02 2014 -0500
----------------------------------------------------------------------
.../DatasiftInteractionActivitySerializer.mup | 324 +++++++++++++++++++
.../DatasiftInteractionActivitySerializer.png | Bin 0 -> 142533 bytes
.../streams-provider-datasift/README.markdown | 22 ++
.../provider/DatasiftEventClassifier.java | 41 +++
.../provider/DatasiftEventProcessor.java | 23 +-
.../provider/DatasiftStreamProvider.java | 4 +-
.../serializer/DatasiftActivitySerializer.java | 181 -----------
.../DatasiftInteractionActivitySerializer.java | 172 ++++++++++
.../DatasiftJsonActivitySerializer.java | 75 +++++
.../DatasiftTwitterActivitySerializer.java | 33 ++
.../serializer/StreamsDatasiftMapper.java | 44 +++
11 files changed, 729 insertions(+), 190 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c49e600a/streams-contrib/streams-provider-datasift/DatasiftInteractionActivitySerializer.mup
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/DatasiftInteractionActivitySerializer.mup b/streams-contrib/streams-provider-datasift/DatasiftInteractionActivitySerializer.mup
new file mode 100644
index 0000000..3106ae8
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/DatasiftInteractionActivitySerializer.mup
@@ -0,0 +1,324 @@
+{
+ "title": "DatasiftInteractionActivitySerializer",
+ "id": 1,
+ "formatVersion": 2,
+ "ideas": {
+ "1": {
+ "title": "activity",
+ "id": 2,
+ "ideas": {
+ "1": {
+ "title": "provider\nid:providers:datasift",
+ "id": 5
+ },
+ "14": {
+ "title": "extensions",
+ "id": 8,
+ "ideas": {
+ "1": {
+ "title": "datasift",
+ "id": 9
+ },
+ "2": {
+ "title": "location",
+ "id": 22,
+ "ideas": {
+ "3": {
+ "title": "coordinates",
+ "id": 29,
+ "ideas": {
+ "2": {
+ "title": "lat",
+ "id": 27,
+ "attr": {
+ "style": {
+ "background": "#E0E0E0"
+ }
+ }
+ },
+ "12": {
+ "title": "lon",
+ "id": 28,
+ "attr": {
+ "style": {
+ "background": "#E0E0E0"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "15": {
+ "title": "verb\npost",
+ "id": 30
+ },
+ "16": {
+ "title": "id\npost:$.interaction.id",
+ "id": 32
+ },
+ "17": {
+ "title": "published",
+ "id": 36
+ },
+ "18": {
+ "title": "content",
+ "id": 39
+ },
+ "19": {
+ "title": "title",
+ "id": 41
+ },
+ "20": {
+ "title": "url",
+ "id": 42
+ },
+ "0.5": {
+ "title": "actor",
+ "id": 10,
+ "ideas": {
+ "1": {
+ "title": "id",
+ "id": 11
+ },
+ "2": {
+ "title": "displayName",
+ "id": 12
+ },
+ "3": {
+ "title": "image",
+ "id": 13,
+ "ideas": {
+ "1": {
+ "title": "url",
+ "id": 14
+ }
+ }
+ },
+ "4": {
+ "title": "url",
+ "id": 18
+ }
+ }
+ },
+ "0.75": {
+ "title": "object",
+ "id": 19,
+ "ideas": {
+ "1": {
+ "title": "objectType",
+ "id": 20
+ }
+ }
+ }
+ }
+ },
+ "-1": {
+ "title": "datasift",
+ "id": 3,
+ "ideas": {
+ "1": {
+ "title": "interaction",
+ "id": 4,
+ "ideas": {
+ "1": {
+ "title": "author",
+ "id": 15,
+ "ideas": {
+ "1": {
+ "title": "avatar",
+ "id": 16
+ },
+ "2": {
+ "title": "link",
+ "id": 17
+ },
+ "0.5": {
+ "title": "id",
+ "id": 44
+ },
+ "0.75": {
+ "title": "username",
+ "id": 45
+ }
+ }
+ },
+ "2": {
+ "title": "contentType",
+ "id": 21
+ },
+ "3": {
+ "title": "geo",
+ "id": 23,
+ "ideas": {
+ "1": {
+ "title": "latitude",
+ "id": 25
+ },
+ "2": {
+ "title": "longitude",
+ "id": 26
+ }
+ }
+ },
+ "4": {
+ "title": "id",
+ "id": 33,
+ "ideas": {}
+ },
+ "5": {
+ "title": "createdAt",
+ "id": 37
+ },
+ "6": {
+ "title": "content",
+ "id": 38
+ },
+ "7": {
+ "title": "title",
+ "id": 40
+ },
+ "8": {
+ "title": "link",
+ "id": 43
+ }
+ }
+ }
+ }
+ }
+ },
+ "links": [
+ {
+ "ideaIdFrom": 3,
+ "ideaIdTo": 9,
+ "attr": {
+ "style": {
+ "color": "#FF0000",
+ "lineStyle": "dashed"
+ }
+ }
+ },
+ {
+ "ideaIdFrom": 16,
+ "ideaIdTo": 14,
+ "attr": {
+ "style": {
+ "color": "#FF0000",
+ "lineStyle": "dashed"
+ }
+ }
+ },
+ {
+ "ideaIdFrom": 17,
+ "ideaIdTo": 18,
+ "attr": {
+ "style": {
+ "color": "#FF0000",
+ "lineStyle": "dashed"
+ }
+ }
+ },
+ {
+ "ideaIdFrom": 21,
+ "ideaIdTo": 20,
+ "attr": {
+ "style": {
+ "color": "#FF0000",
+ "lineStyle": "dashed"
+ }
+ }
+ },
+ {
+ "ideaIdFrom": 25,
+ "ideaIdTo": 27,
+ "attr": {
+ "style": {
+ "color": "#FF0000",
+ "lineStyle": "dashed"
+ }
+ }
+ },
+ {
+ "ideaIdFrom": 26,
+ "ideaIdTo": 28,
+ "attr": {
+ "style": {
+ "color": "#FF0000",
+ "lineStyle": "dashed"
+ }
+ }
+ },
+ {
+ "ideaIdFrom": 33,
+ "ideaIdTo": 32,
+ "attr": {
+ "style": {
+ "color": "#FF0000",
+ "lineStyle": "dashed"
+ }
+ }
+ },
+ {
+ "ideaIdFrom": 37,
+ "ideaIdTo": 36,
+ "attr": {
+ "style": {
+ "color": "#FF0000",
+ "lineStyle": "dashed"
+ }
+ }
+ },
+ {
+ "ideaIdFrom": 38,
+ "ideaIdTo": 39,
+ "attr": {
+ "style": {
+ "color": "#FF0000",
+ "lineStyle": "dashed"
+ }
+ }
+ },
+ {
+ "ideaIdFrom": 40,
+ "ideaIdTo": 41,
+ "attr": {
+ "style": {
+ "color": "#FF0000",
+ "lineStyle": "dashed"
+ }
+ }
+ },
+ {
+ "ideaIdFrom": 43,
+ "ideaIdTo": 42,
+ "attr": {
+ "style": {
+ "color": "#FF0000",
+ "lineStyle": "dashed"
+ }
+ }
+ },
+ {
+ "ideaIdFrom": 44,
+ "ideaIdTo": 11,
+ "attr": {
+ "style": {
+ "color": "#FF0000",
+ "lineStyle": "dashed"
+ }
+ }
+ },
+ {
+ "ideaIdFrom": 45,
+ "ideaIdTo": 12,
+ "attr": {
+ "style": {
+ "color": "#FF0000",
+ "lineStyle": "dashed"
+ }
+ }
+ }
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c49e600a/streams-contrib/streams-provider-datasift/DatasiftInteractionActivitySerializer.png
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/DatasiftInteractionActivitySerializer.png b/streams-contrib/streams-provider-datasift/DatasiftInteractionActivitySerializer.png
new file mode 100644
index 0000000..7d410ba
Binary files /dev/null and b/streams-contrib/streams-provider-datasift/DatasiftInteractionActivitySerializer.png differ
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c49e600a/streams-contrib/streams-provider-datasift/README.markdown
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/README.markdown b/streams-contrib/streams-provider-datasift/README.markdown
new file mode 100644
index 0000000..c828d7e
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/README.markdown
@@ -0,0 +1,22 @@
+streams-provider-datasift
+
+Purpose
+
+ Module connects to datasift APIs, collects events, and passes each message downstream.
+
+EndPoints
+
+ * Streaming - supported, tested
+ * Push - not currently supported
+
+Normalization
+
+ Optionally, module can convert messages to ActivityStreams format
+
+ * Interactions [TwitterJsonTweetActivitySerializer]
+
+[DatasiftInteractionActivitySerializer]: DatasiftInteractionActivitySerializer
+
+ DatasiftInteractionActivitySerializer.class serializes interactions like this:
+
+ ![DatasiftInteractionActivitySerializer.png](DatasiftInteractionActivitySerializer.png)
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c49e600a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventClassifier.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventClassifier.java
new file mode 100644
index 0000000..fa69419
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventClassifier.java
@@ -0,0 +1,41 @@
+package org.apache.streams.datasift.provider;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.datasift.config.Facebook;
+import org.apache.streams.datasift.interaction.Interaction;
+import org.apache.streams.datasift.serializer.StreamsDatasiftMapper;
+import org.apache.streams.datasift.twitter.Twitter;
+import org.apache.streams.datasift.youtube.YouTube;
+
+import java.io.IOException;
+
+/**
+ * Created by sblackmon on 12/13/13.
+ */
+public class DatasiftEventClassifier {
+
+ public static Class detectClass( String json ) {
+
+ Preconditions.checkNotNull(json);
+ Preconditions.checkArgument(StringUtils.isNotEmpty(json));
+
+ ObjectNode objectNode;
+ try {
+ objectNode = (ObjectNode) StreamsDatasiftMapper.getInstance().readTree(json);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return null;
+ }
+
+ if( objectNode.findValue("twitter") != null )
+ return Twitter.class;
+ else if( objectNode.findValue("youtube") != null )
+ return YouTube.class;
+ else if( objectNode.findValue("facebook") != null )
+ return Facebook.class;
+ else
+ return Interaction.class;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c49e600a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java
index 3c0aa8b..f78b8b9 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java
@@ -4,8 +4,10 @@ import com.datasift.client.stream.Interaction;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.serializer.DatasiftActivitySerializer;
+import org.apache.streams.datasift.serializer.DatasiftInteractionActivitySerializer;
+import org.apache.streams.datasift.serializer.DatasiftJsonActivitySerializer;
import org.apache.streams.datasift.twitter.Twitter;
+import org.apache.streams.exceptions.ActivitySerializerException;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.twitter.pojo.Tweet;
import org.slf4j.Logger;
@@ -29,7 +31,7 @@ public class DatasiftEventProcessor implements Runnable {
private Class inClass;
private Class outClass;
- private DatasiftActivitySerializer datasiftInteractionActivitySerializer = new DatasiftActivitySerializer();
+ private DatasiftJsonActivitySerializer datasiftJsonActivitySerializer = new DatasiftJsonActivitySerializer();
public final static String TERMINATE = new String("TERMINATE");
@@ -58,13 +60,20 @@ public class DatasiftEventProcessor implements Runnable {
break;
}
- Thread.sleep(new Random().nextInt(100));
-
- org.apache.streams.datasift.Datasift datasift = mapper.convertValue(item, Datasift.class);
+ String json;
+ org.apache.streams.datasift.Datasift datasift;
+ if(item instanceof String)
+ json = (String)item;
+ if( item instanceof Interaction) {
+ datasift = mapper.convertValue(item, Datasift.class);
+ json = mapper.writeValueAsString(datasift);
+ } else {
+ throw new ActivitySerializerException("unrecognized type");
+ }
// if the target is string, just pass-through
if( String.class.equals(outClass)) {
- outQueue.offer(new StreamsDatum(datasift.toString()));
+ outQueue.offer(new StreamsDatum(json));
}
else if( Interaction.class.equals(outClass))
@@ -89,7 +98,7 @@ public class DatasiftEventProcessor implements Runnable {
// convert to desired format
Interaction entry = (Interaction) item;
if( entry != null ) {
- Activity out = datasiftInteractionActivitySerializer.deserialize(datasift);
+ Activity out = datasiftJsonActivitySerializer.deserialize(json);
if( out != null )
outQueue.offer(new StreamsDatum(out));
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c49e600a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
index d339385..07b68c3 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
@@ -44,7 +44,7 @@ public class DatasiftStreamProvider implements StreamsProvider {
this.config = config;
}
- protected BlockingQueue inQueue = new LinkedBlockingQueue<Interaction>(10000);
+ protected BlockingQueue inQueue = new LinkedBlockingQueue<Interaction>(1000);
protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>();
@@ -176,7 +176,7 @@ public class DatasiftStreamProvider implements StreamsProvider {
inQueue.offer(i);
- if (count.incrementAndGet() % 1000 == 0) {
+ if (count.incrementAndGet() % 100 == 0) {
LOGGER.info("Processed {}:\n " + count.get());
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c49e600a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
deleted file mode 100644
index 93f4edd..0000000
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
+++ /dev/null
@@ -1,181 +0,0 @@
-package org.apache.streams.datasift.serializer;
-
-import com.fasterxml.jackson.databind.AnnotationIntrospector;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.interaction.Interaction;
-import org.apache.streams.pojo.json.*;
-
-import java.io.Serializable;
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
-
-/**
-* Created with IntelliJ IDEA.
-* User: mdelaet
-* Date: 9/30/13
-* Time: 9:24 AM
-* To change this template use File | Settings | File Templates.
-*/
-public class DatasiftActivitySerializer implements ActivitySerializer<Datasift>, Serializable {
-
- public static final String DATE_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy";
-
- ObjectMapper mapper = new ObjectMapper();
-
- @Override
- public String serializationFormat() {
- return "application/json+datasift.com.v1.1";
- }
-
- @Override
- public Datasift serialize(Activity deserialized) {
- throw new UnsupportedOperationException("Cannot currently serialize to Datasift JSON");
- }
-
- @Override
- public Activity deserialize(Datasift serialized) {
-
- AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(mapper.getTypeFactory());
- mapper.setAnnotationIntrospector(introspector);
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
- mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
- mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
- mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.TRUE);
-
- try {
-
- Activity activity = convert(serialized);
-
- return activity;
-
- } catch (Exception e) {
- throw new IllegalArgumentException("Unable to deserialize", e);
- }
-
- }
-
- @Override
- public List<Activity> deserializeAll(List<Datasift> datasifts) {
- List<Activity> activities = Lists.newArrayList();
- for( Datasift datasift : datasifts ) {
- activities.add(deserialize(datasift));
- }
- return activities;
- }
-
- public static Date parse(String str) {
- Date date;
- String dstr;
- DateFormat fmt = new SimpleDateFormat(DATE_FORMAT);
- DateFormat out = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
- try {
- date = fmt.parse(str);
- dstr = out.format(date);
- return out.parse(dstr);
- } catch (ParseException e) {
- throw new IllegalArgumentException("Invalid date format", e);
- }
- }
-
- public static Generator buildGenerator(Interaction interaction) {
- return null;
- }
-
- public static Icon getIcon(Interaction interaction) {
- return null;
- }
-
- public static Provider buildProvider(Interaction interaction) {
- Provider provider = new Provider();
- provider.setId("id:providers:twitter");
- return provider;
- }
-
- public static String getUrls(Interaction interaction) {
- return null;
- }
-
- public static void addDatasiftExtension(Activity activity, Datasift datasift) {
- Map<String, Object> extensions = org.apache.streams.data.util.ActivityUtil.ensureExtensions(activity);
- extensions.put("datasift", datasift);
- }
-
- public static String formatId(String... idparts) {
- return Joiner.on(":").join(Lists.asList("id:datasift", idparts));
- }
-
- public Activity convert(Datasift event) {
-
- Activity activity = new Activity();
- activity.setActor(buildActor(event.getInteraction()));
- activity.setVerb("post");
- activity.setObject(buildActivityObject(event.getInteraction()));
- activity.setId(formatId(activity.getVerb(), event.getInteraction().getId()));
- activity.setTarget(buildTarget(event.getInteraction()));
- activity.setPublished(event.getInteraction().getCreatedAt());
- activity.setGenerator(buildGenerator(event.getInteraction()));
- activity.setIcon(getIcon(event.getInteraction()));
- activity.setProvider(buildProvider(event.getInteraction()));
- activity.setTitle(event.getInteraction().getTitle());
- activity.setContent(event.getInteraction().getContent());
- activity.setUrl(event.getInteraction().getLink());
- activity.setLinks(getLinks(event.getInteraction()));
- addDatasiftExtension(activity, event);
- if( event.getInteraction().getGeo() != null) {
- addLocationExtension(activity, event.getInteraction());
- }
- return activity;
- }
-
- public static Actor buildActor(Interaction interaction) {
- Actor actor = new Actor();
- actor.setId(formatId(interaction.getAuthor().getId().toString()));
- actor.setDisplayName(interaction.getAuthor().getUsername());
- Image image = new Image();
- image.setUrl(interaction.getAuthor().getAvatar());
- actor.setImage(image);
- if (interaction.getAuthor().getLink()!=null){
- actor.setUrl(interaction.getAuthor().getLink());
- }
- return actor;
- }
-
- public static ActivityObject buildActivityObject(Interaction interaction) {
- ActivityObject actObj = new ActivityObject();
- actObj.setObjectType(interaction.getContenttype());
- return actObj;
- }
-
- public static List<Link> getLinks(Interaction interaction) {
- List<Link> links = Lists.newArrayList();
- return links;
- }
-
- public static ActivityObject buildTarget(Interaction interaction) {
- return null;
- }
-
- public static void addLocationExtension(Activity activity, Interaction interaction) {
- Map<String, Object> extensions = ensureExtensions(activity);
- Map<String, Object> location = new HashMap<String, Object>();
- Map<String, Double> coordinates = new HashMap<String, Double>();
- coordinates.put("latitude", interaction.getGeo().getLatitude());
- coordinates.put("longitude", interaction.getGeo().getLongitude());
- location.put("coordinates", coordinates);
- extensions.put("location", location);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c49e600a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializer.java
new file mode 100644
index 0000000..71cfc65
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializer.java
@@ -0,0 +1,172 @@
+package org.apache.streams.datasift.serializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.AnnotationIntrospector;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.interaction.Interaction;
+import org.apache.streams.pojo.json.*;
+import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
+
+/**
+* Created with IntelliJ IDEA.
+* User: sblackmon
+*/
+public class DatasiftInteractionActivitySerializer implements ActivitySerializer<String>, Serializable {
+
+ ObjectMapper mapper = new StreamsDatasiftMapper();
+
+ @Override
+ public String serializationFormat() {
+ return "application/json+datasift.com.v1.1";
+ }
+
+ @Override
+ public String serialize(Activity deserialized) {
+ throw new UnsupportedOperationException("Cannot currently serialize to Datasift JSON");
+ }
+
+ @Override
+ public Activity deserialize(String serialized) {
+
+ mapper = StreamsTwitterMapper.getInstance();
+
+ Datasift datasift = null;
+
+ try {
+ datasift = mapper.readValue(serialized, Datasift.class);
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ Activity activity = new Activity();
+ try {
+
+ activity = convert(datasift);
+
+ return activity;
+
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Unable to deserialize", e);
+ }
+
+ }
+
+ @Override
+ public List<Activity> deserializeAll(List<String> serializedList) {
+ List<Activity> activities = Lists.newArrayList();
+ for( String datasift : serializedList ) {
+ activities.add(deserialize(datasift));
+ }
+ return activities;
+ }
+
+ public static Generator buildGenerator(Interaction interaction) {
+ return null;
+ }
+
+ public static Icon getIcon(Interaction interaction) {
+ return null;
+ }
+
+ public static Provider buildProvider(Interaction interaction) {
+ Provider provider = new Provider();
+ provider.setId("id:providers:datasift");
+ return provider;
+ }
+
+ public static String getUrls(Interaction interaction) {
+ return null;
+ }
+
+ public static void addDatasiftExtension(Activity activity, Datasift datasift) {
+ Map<String, Object> extensions = org.apache.streams.data.util.ActivityUtil.ensureExtensions(activity);
+ extensions.put("datasift", datasift);
+ }
+
+ public static String formatId(String... idparts) {
+ return Joiner.on(":").join(Lists.asList("id:datasift", idparts));
+ }
+
+ public Activity convert(Datasift event) {
+
+ Activity activity = new Activity();
+ activity.setActor(buildActor(event.getInteraction()));
+ activity.setVerb("post");
+ activity.setObject(buildActivityObject(event.getInteraction()));
+ activity.setId(formatId(activity.getVerb(), event.getInteraction().getId()));
+ activity.setTarget(buildTarget(event.getInteraction()));
+ activity.setPublished(event.getInteraction().getCreatedAt());
+ activity.setGenerator(buildGenerator(event.getInteraction()));
+ activity.setIcon(getIcon(event.getInteraction()));
+ activity.setProvider(buildProvider(event.getInteraction()));
+ activity.setTitle(event.getInteraction().getTitle());
+ activity.setContent(event.getInteraction().getContent());
+ activity.setUrl(event.getInteraction().getLink());
+ activity.setLinks(getLinks(event.getInteraction()));
+ addDatasiftExtension(activity, event);
+ if( event.getInteraction().getGeo() != null) {
+ addLocationExtension(activity, event.getInteraction());
+ }
+ return activity;
+ }
+
+ public static Actor buildActor(Interaction interaction) {
+ Actor actor = new Actor();
+ actor.setId(formatId(interaction.getAuthor().getId().toString()));
+ actor.setDisplayName(interaction.getAuthor().getUsername());
+ Image image = new Image();
+ image.setUrl(interaction.getAuthor().getAvatar());
+ actor.setImage(image);
+ if (interaction.getAuthor().getLink()!=null){
+ actor.setUrl(interaction.getAuthor().getLink());
+ }
+ return actor;
+ }
+
+ public static ActivityObject buildActivityObject(Interaction interaction) {
+ ActivityObject actObj = new ActivityObject();
+ actObj.setObjectType(interaction.getContenttype());
+ return actObj;
+ }
+
+ public static List<Link> getLinks(Interaction interaction) {
+ List<Link> links = Lists.newArrayList();
+ return links;
+ }
+
+ public static ActivityObject buildTarget(Interaction interaction) {
+ return null;
+ }
+
+ public static void addLocationExtension(Activity activity, Interaction interaction) {
+ Map<String, Object> extensions = ensureExtensions(activity);
+ Map<String, Object> location = new HashMap<String, Object>();
+ Map<String, Double> coordinates = new HashMap<String, Double>();
+ coordinates.put("latitude", interaction.getGeo().getLatitude());
+ coordinates.put("longitude", interaction.getGeo().getLongitude());
+ location.put("coordinates", coordinates);
+ extensions.put("location", location);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c49e600a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftJsonActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftJsonActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftJsonActivitySerializer.java
new file mode 100644
index 0000000..4a6c860
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftJsonActivitySerializer.java
@@ -0,0 +1,75 @@
+package org.apache.streams.datasift.serializer;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.datasift.interaction.Interaction;
+import org.apache.streams.datasift.provider.DatasiftEventClassifier;
+import org.apache.streams.datasift.twitter.Twitter;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Provider;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by sblackmon on 3/26/14.
+ */
+public class DatasiftJsonActivitySerializer implements ActivitySerializer<String>
+{
+
+ public DatasiftJsonActivitySerializer() {
+
+ }
+
+ DatasiftInteractionActivitySerializer datasiftInteractionActivitySerializer = new DatasiftInteractionActivitySerializer();
+ DatasiftInteractionActivitySerializer datasiftTwitterActivitySerializer = new DatasiftTwitterActivitySerializer();
+
+ @Override
+ public String serializationFormat() {
+ return null;
+ }
+
+ @Override
+ public String serialize(Activity deserialized) throws ActivitySerializerException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Activity deserialize(String serialized) throws ActivitySerializerException {
+
+ Class documentSubType = DatasiftEventClassifier.detectClass(serialized);
+
+ Activity activity;
+ if( documentSubType == Twitter.class )
+ activity = datasiftTwitterActivitySerializer.deserialize(serialized);
+ else if( documentSubType == Interaction.class )
+ activity = datasiftInteractionActivitySerializer.deserialize(serialized);
+ else throw new ActivitySerializerException("unrecognized type");
+
+ return activity;
+ }
+
+ @Override
+ public List<Activity> deserializeAll(List<String> serializedList) {
+ throw new NotImplementedException();
+ }
+
+ public static Provider getProvider() {
+ Provider provider = new Provider();
+ provider.setId("id:providers:datasift");
+ return provider;
+ }
+
+ public static void addTwitterExtension(Activity activity, ObjectNode event) {
+ Map<String, Object> extensions = org.apache.streams.data.util.ActivityUtil.ensureExtensions(activity);
+ extensions.put("datasift", event);
+ }
+
+ public static String formatId(String... idparts) {
+ return Joiner.on(":").join(Lists.asList("id:twitter", idparts));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c49e600a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTwitterActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTwitterActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTwitterActivitySerializer.java
new file mode 100644
index 0000000..51bd985
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTwitterActivitySerializer.java
@@ -0,0 +1,33 @@
+package org.apache.streams.datasift.serializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.interaction.Interaction;
+import org.apache.streams.pojo.json.*;
+import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
+
+/**
+* Created with IntelliJ IDEA.
+* User: sblackmon
+*/
+public class DatasiftTwitterActivitySerializer extends DatasiftInteractionActivitySerializer {
+
+ public Activity convert(Datasift event) {
+ Activity activity = super.convert(event);
+ activity.getExtensions().setAdditionalProperty("twitter", event.getTwitter());
+ return activity;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c49e600a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/StreamsDatasiftMapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/StreamsDatasiftMapper.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/StreamsDatasiftMapper.java
new file mode 100644
index 0000000..f5b39ce
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/StreamsDatasiftMapper.java
@@ -0,0 +1,44 @@
+package org.apache.streams.datasift.serializer;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.io.IOException;
+
+/**
+ * Created by sblackmon on 3/27/14.
+ */
+public class StreamsDatasiftMapper extends StreamsJacksonMapper {
+
+ public static final DateTimeFormatter DATASIFT_FORMAT = DateTimeFormat.forPattern("EEE, dd MMM yyyy HH:mm:ss Z");
+
+ private static final StreamsDatasiftMapper INSTANCE = new StreamsDatasiftMapper();
+
+ public static StreamsDatasiftMapper getInstance(){
+ return INSTANCE;
+ }
+
+ public StreamsDatasiftMapper() {
+ super();
+ registerModule(new SimpleModule()
+ {
+ {
+ addDeserializer(DateTime.class, new StdDeserializer<DateTime>(DateTime.class) {
+ @Override
+ public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException, JsonProcessingException {
+ return DATASIFT_FORMAT.parseDateTime(jpar.getValueAsString());
+ }
+ });
+ }
+ });
+
+ }
+
+}