You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/05/05 20:54:34 UTC

[47/52] [abbrv] git commit: STREAMS-41 implementation and documentation

STREAMS-41
implementation and documentation


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c49e600a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c49e600a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c49e600a

Branch: refs/heads/sblackmon
Commit: c49e600af974d318c5555fce0f558bc161820870
Parents: 69d721e
Author: sblackmon <sb...@w2odigital.com>
Authored: Wed Apr 16 17:12:02 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Wed Apr 16 17:12:02 2014 -0500

----------------------------------------------------------------------
 .../DatasiftInteractionActivitySerializer.mup   | 324 +++++++++++++++++++
 .../DatasiftInteractionActivitySerializer.png   | Bin 0 -> 142533 bytes
 .../streams-provider-datasift/README.markdown   |  22 ++
 .../provider/DatasiftEventClassifier.java       |  41 +++
 .../provider/DatasiftEventProcessor.java        |  23 +-
 .../provider/DatasiftStreamProvider.java        |   4 +-
 .../serializer/DatasiftActivitySerializer.java  | 181 -----------
 .../DatasiftInteractionActivitySerializer.java  | 172 ++++++++++
 .../DatasiftJsonActivitySerializer.java         |  75 +++++
 .../DatasiftTwitterActivitySerializer.java      |  33 ++
 .../serializer/StreamsDatasiftMapper.java       |  44 +++
 11 files changed, 729 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c49e600a/streams-contrib/streams-provider-datasift/DatasiftInteractionActivitySerializer.mup
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/DatasiftInteractionActivitySerializer.mup b/streams-contrib/streams-provider-datasift/DatasiftInteractionActivitySerializer.mup
new file mode 100644
index 0000000..3106ae8
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/DatasiftInteractionActivitySerializer.mup
@@ -0,0 +1,324 @@
+{
+  "title": "DatasiftInteractionActivitySerializer",
+  "id": 1,
+  "formatVersion": 2,
+  "ideas": {
+    "1": {
+      "title": "activity",
+      "id": 2,
+      "ideas": {
+        "1": {
+          "title": "provider\nid:providers:datasift",
+          "id": 5
+        },
+        "14": {
+          "title": "extensions",
+          "id": 8,
+          "ideas": {
+            "1": {
+              "title": "datasift",
+              "id": 9
+            },
+            "2": {
+              "title": "location",
+              "id": 22,
+              "ideas": {
+                "3": {
+                  "title": "coordinates",
+                  "id": 29,
+                  "ideas": {
+                    "2": {
+                      "title": "lat",
+                      "id": 27,
+                      "attr": {
+                        "style": {
+                          "background": "#E0E0E0"
+                        }
+                      }
+                    },
+                    "12": {
+                      "title": "lon",
+                      "id": 28,
+                      "attr": {
+                        "style": {
+                          "background": "#E0E0E0"
+                        }
+                      }
+                    }
+                  }
+                }
+              }
+            }
+          }
+        },
+        "15": {
+          "title": "verb\npost",
+          "id": 30
+        },
+        "16": {
+          "title": "id\npost:$.interaction.id",
+          "id": 32
+        },
+        "17": {
+          "title": "published",
+          "id": 36
+        },
+        "18": {
+          "title": "content",
+          "id": 39
+        },
+        "19": {
+          "title": "title",
+          "id": 41
+        },
+        "20": {
+          "title": "url",
+          "id": 42
+        },
+        "0.5": {
+          "title": "actor",
+          "id": 10,
+          "ideas": {
+            "1": {
+              "title": "id",
+              "id": 11
+            },
+            "2": {
+              "title": "displayName",
+              "id": 12
+            },
+            "3": {
+              "title": "image",
+              "id": 13,
+              "ideas": {
+                "1": {
+                  "title": "url",
+                  "id": 14
+                }
+              }
+            },
+            "4": {
+              "title": "url",
+              "id": 18
+            }
+          }
+        },
+        "0.75": {
+          "title": "object",
+          "id": 19,
+          "ideas": {
+            "1": {
+              "title": "objectType",
+              "id": 20
+            }
+          }
+        }
+      }
+    },
+    "-1": {
+      "title": "datasift",
+      "id": 3,
+      "ideas": {
+        "1": {
+          "title": "interaction",
+          "id": 4,
+          "ideas": {
+            "1": {
+              "title": "author",
+              "id": 15,
+              "ideas": {
+                "1": {
+                  "title": "avatar",
+                  "id": 16
+                },
+                "2": {
+                  "title": "link",
+                  "id": 17
+                },
+                "0.5": {
+                  "title": "id",
+                  "id": 44
+                },
+                "0.75": {
+                  "title": "username",
+                  "id": 45
+                }
+              }
+            },
+            "2": {
+              "title": "contentType",
+              "id": 21
+            },
+            "3": {
+              "title": "geo",
+              "id": 23,
+              "ideas": {
+                "1": {
+                  "title": "latitude",
+                  "id": 25
+                },
+                "2": {
+                  "title": "longitude",
+                  "id": 26
+                }
+              }
+            },
+            "4": {
+              "title": "id",
+              "id": 33,
+              "ideas": {}
+            },
+            "5": {
+              "title": "createdAt",
+              "id": 37
+            },
+            "6": {
+              "title": "content",
+              "id": 38
+            },
+            "7": {
+              "title": "title",
+              "id": 40
+            },
+            "8": {
+              "title": "link",
+              "id": 43
+            }
+          }
+        }
+      }
+    }
+  },
+  "links": [
+    {
+      "ideaIdFrom": 3,
+      "ideaIdTo": 9,
+      "attr": {
+        "style": {
+          "color": "#FF0000",
+          "lineStyle": "dashed"
+        }
+      }
+    },
+    {
+      "ideaIdFrom": 16,
+      "ideaIdTo": 14,
+      "attr": {
+        "style": {
+          "color": "#FF0000",
+          "lineStyle": "dashed"
+        }
+      }
+    },
+    {
+      "ideaIdFrom": 17,
+      "ideaIdTo": 18,
+      "attr": {
+        "style": {
+          "color": "#FF0000",
+          "lineStyle": "dashed"
+        }
+      }
+    },
+    {
+      "ideaIdFrom": 21,
+      "ideaIdTo": 20,
+      "attr": {
+        "style": {
+          "color": "#FF0000",
+          "lineStyle": "dashed"
+        }
+      }
+    },
+    {
+      "ideaIdFrom": 25,
+      "ideaIdTo": 27,
+      "attr": {
+        "style": {
+          "color": "#FF0000",
+          "lineStyle": "dashed"
+        }
+      }
+    },
+    {
+      "ideaIdFrom": 26,
+      "ideaIdTo": 28,
+      "attr": {
+        "style": {
+          "color": "#FF0000",
+          "lineStyle": "dashed"
+        }
+      }
+    },
+    {
+      "ideaIdFrom": 33,
+      "ideaIdTo": 32,
+      "attr": {
+        "style": {
+          "color": "#FF0000",
+          "lineStyle": "dashed"
+        }
+      }
+    },
+    {
+      "ideaIdFrom": 37,
+      "ideaIdTo": 36,
+      "attr": {
+        "style": {
+          "color": "#FF0000",
+          "lineStyle": "dashed"
+        }
+      }
+    },
+    {
+      "ideaIdFrom": 38,
+      "ideaIdTo": 39,
+      "attr": {
+        "style": {
+          "color": "#FF0000",
+          "lineStyle": "dashed"
+        }
+      }
+    },
+    {
+      "ideaIdFrom": 40,
+      "ideaIdTo": 41,
+      "attr": {
+        "style": {
+          "color": "#FF0000",
+          "lineStyle": "dashed"
+        }
+      }
+    },
+    {
+      "ideaIdFrom": 43,
+      "ideaIdTo": 42,
+      "attr": {
+        "style": {
+          "color": "#FF0000",
+          "lineStyle": "dashed"
+        }
+      }
+    },
+    {
+      "ideaIdFrom": 44,
+      "ideaIdTo": 11,
+      "attr": {
+        "style": {
+          "color": "#FF0000",
+          "lineStyle": "dashed"
+        }
+      }
+    },
+    {
+      "ideaIdFrom": 45,
+      "ideaIdTo": 12,
+      "attr": {
+        "style": {
+          "color": "#FF0000",
+          "lineStyle": "dashed"
+        }
+      }
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c49e600a/streams-contrib/streams-provider-datasift/DatasiftInteractionActivitySerializer.png
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/DatasiftInteractionActivitySerializer.png b/streams-contrib/streams-provider-datasift/DatasiftInteractionActivitySerializer.png
new file mode 100644
index 0000000..7d410ba
Binary files /dev/null and b/streams-contrib/streams-provider-datasift/DatasiftInteractionActivitySerializer.png differ

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c49e600a/streams-contrib/streams-provider-datasift/README.markdown
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/README.markdown b/streams-contrib/streams-provider-datasift/README.markdown
new file mode 100644
index 0000000..c828d7e
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/README.markdown
@@ -0,0 +1,22 @@
+streams-provider-datasift
+
+Purpose
+
+  Module connects to datasift APIs, collects events, and passes each message downstream.
+
+EndPoints
+
+  * Streaming - supported, tested
+  * Push - not currently supported
+
+Normalization
+
+  Optionally, module can convert messages to ActivityStreams format
+
+  * Interactions [TwitterJsonTweetActivitySerializer]
+
+[DatasiftInteractionActivitySerializer]: DatasiftInteractionActivitySerializer
+
+  DatasiftInteractionActivitySerializer.class serializes interactions like this:
+
+  ![DatasiftInteractionActivitySerializer.png](DatasiftInteractionActivitySerializer.png)

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c49e600a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventClassifier.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventClassifier.java
new file mode 100644
index 0000000..fa69419
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventClassifier.java
@@ -0,0 +1,41 @@
+package org.apache.streams.datasift.provider;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.datasift.config.Facebook;
+import org.apache.streams.datasift.interaction.Interaction;
+import org.apache.streams.datasift.serializer.StreamsDatasiftMapper;
+import org.apache.streams.datasift.twitter.Twitter;
+import org.apache.streams.datasift.youtube.YouTube;
+
+import java.io.IOException;
+
+/**
+ * Created by sblackmon on 12/13/13.
+ */
+public class DatasiftEventClassifier {
+
+    public static Class detectClass( String json ) {
+
+        Preconditions.checkNotNull(json);
+        Preconditions.checkArgument(StringUtils.isNotEmpty(json));
+
+        ObjectNode objectNode;
+        try {
+            objectNode = (ObjectNode) StreamsDatasiftMapper.getInstance().readTree(json);
+        } catch (IOException e) {
+            e.printStackTrace();
+            return null;
+        }
+
+        if( objectNode.findValue("twitter") != null )
+            return Twitter.class;
+        else if( objectNode.findValue("youtube") != null )
+            return YouTube.class;
+        else if( objectNode.findValue("facebook") != null )
+            return Facebook.class;
+        else
+            return Interaction.class;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c49e600a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java
index 3c0aa8b..f78b8b9 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java
@@ -4,8 +4,10 @@ import com.datasift.client.stream.Interaction;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.serializer.DatasiftActivitySerializer;
+import org.apache.streams.datasift.serializer.DatasiftInteractionActivitySerializer;
+import org.apache.streams.datasift.serializer.DatasiftJsonActivitySerializer;
 import org.apache.streams.datasift.twitter.Twitter;
+import org.apache.streams.exceptions.ActivitySerializerException;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.twitter.pojo.Tweet;
 import org.slf4j.Logger;
@@ -29,7 +31,7 @@ public class DatasiftEventProcessor implements Runnable {
     private Class inClass;
     private Class outClass;
 
-    private DatasiftActivitySerializer datasiftInteractionActivitySerializer = new DatasiftActivitySerializer();
+    private DatasiftJsonActivitySerializer datasiftJsonActivitySerializer = new DatasiftJsonActivitySerializer();
 
     public final static String TERMINATE = new String("TERMINATE");
 
@@ -58,13 +60,20 @@ public class DatasiftEventProcessor implements Runnable {
                     break;
                 }
 
-                Thread.sleep(new Random().nextInt(100));
-
-                org.apache.streams.datasift.Datasift datasift = mapper.convertValue(item, Datasift.class);
+                String json;
+                org.apache.streams.datasift.Datasift datasift;
+                if(item instanceof String)
+                    json = (String)item;
+                if( item instanceof Interaction) {
+                    datasift = mapper.convertValue(item, Datasift.class);
+                    json = mapper.writeValueAsString(datasift);
+                } else {
+                    throw new ActivitySerializerException("unrecognized type");
+                }
 
                 // if the target is string, just pass-through
                 if( String.class.equals(outClass)) {
-                    outQueue.offer(new StreamsDatum(datasift.toString()));
+                    outQueue.offer(new StreamsDatum(json));
 
                 }
                 else if( Interaction.class.equals(outClass))
@@ -89,7 +98,7 @@ public class DatasiftEventProcessor implements Runnable {
                     // convert to desired format
                     Interaction entry = (Interaction) item;
                     if( entry != null ) {
-                        Activity out = datasiftInteractionActivitySerializer.deserialize(datasift);
+                        Activity out = datasiftJsonActivitySerializer.deserialize(json);
 
                         if( out != null )
                             outQueue.offer(new StreamsDatum(out));

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c49e600a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
index d339385..07b68c3 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
@@ -44,7 +44,7 @@ public class DatasiftStreamProvider implements StreamsProvider {
         this.config = config;
     }
 
-    protected BlockingQueue inQueue = new LinkedBlockingQueue<Interaction>(10000);
+    protected BlockingQueue inQueue = new LinkedBlockingQueue<Interaction>(1000);
 
     protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>();
 
@@ -176,7 +176,7 @@ public class DatasiftStreamProvider implements StreamsProvider {
 
             inQueue.offer(i);
 
-            if (count.incrementAndGet() % 1000 == 0) {
+            if (count.incrementAndGet() % 100 == 0) {
                 LOGGER.info("Processed {}:\n " + count.get());
 
             }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c49e600a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
deleted file mode 100644
index 93f4edd..0000000
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java
+++ /dev/null
@@ -1,181 +0,0 @@
-package org.apache.streams.datasift.serializer;
-
-import com.fasterxml.jackson.databind.AnnotationIntrospector;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.interaction.Interaction;
-import org.apache.streams.pojo.json.*;
-
-import java.io.Serializable;
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
-
-/**
-* Created with IntelliJ IDEA.
-* User: mdelaet
-* Date: 9/30/13
-* Time: 9:24 AM
-* To change this template use File | Settings | File Templates.
-*/
-public class DatasiftActivitySerializer implements ActivitySerializer<Datasift>, Serializable {
-
-    public static final String DATE_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy";
-
-    ObjectMapper mapper = new ObjectMapper();
-
-    @Override
-    public String serializationFormat() {
-        return "application/json+datasift.com.v1.1";
-    }
-
-    @Override
-    public Datasift serialize(Activity deserialized) {
-        throw new UnsupportedOperationException("Cannot currently serialize to Datasift JSON");
-    }
-
-    @Override
-    public Activity deserialize(Datasift serialized) {
-
-        AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(mapper.getTypeFactory());
-        mapper.setAnnotationIntrospector(introspector);
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
-        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.TRUE);
-
-        try {
-
-            Activity activity = convert(serialized);
-
-            return activity;
-
-        } catch (Exception e) {
-            throw new IllegalArgumentException("Unable to deserialize", e);
-        }
-
-    }
-
-    @Override
-    public List<Activity> deserializeAll(List<Datasift> datasifts) {
-        List<Activity> activities = Lists.newArrayList();
-        for( Datasift datasift : datasifts ) {
-            activities.add(deserialize(datasift));
-        }
-        return activities;
-    }
-
-    public static Date parse(String str) {
-        Date date;
-        String dstr;
-        DateFormat fmt = new SimpleDateFormat(DATE_FORMAT);
-        DateFormat out = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
-        try {
-            date = fmt.parse(str);
-            dstr = out.format(date);
-            return out.parse(dstr);
-        } catch (ParseException e) {
-            throw new IllegalArgumentException("Invalid date format", e);
-        }
-    }
-
-    public static Generator buildGenerator(Interaction interaction) {
-        return null;
-    }
-
-    public static Icon getIcon(Interaction interaction) {
-        return null;
-    }
-
-    public static Provider buildProvider(Interaction interaction) {
-        Provider provider = new Provider();
-        provider.setId("id:providers:twitter");
-        return provider;
-    }
-
-    public static String getUrls(Interaction interaction) {
-        return null;
-    }
-
-    public static void addDatasiftExtension(Activity activity, Datasift datasift) {
-        Map<String, Object> extensions = org.apache.streams.data.util.ActivityUtil.ensureExtensions(activity);
-        extensions.put("datasift", datasift);
-    }
-
-    public static String formatId(String... idparts) {
-        return Joiner.on(":").join(Lists.asList("id:datasift", idparts));
-    }
-
-    public Activity convert(Datasift event) {
-
-        Activity activity = new Activity();
-        activity.setActor(buildActor(event.getInteraction()));
-        activity.setVerb("post");
-        activity.setObject(buildActivityObject(event.getInteraction()));
-        activity.setId(formatId(activity.getVerb(), event.getInteraction().getId()));
-        activity.setTarget(buildTarget(event.getInteraction()));
-        activity.setPublished(event.getInteraction().getCreatedAt());
-        activity.setGenerator(buildGenerator(event.getInteraction()));
-        activity.setIcon(getIcon(event.getInteraction()));
-        activity.setProvider(buildProvider(event.getInteraction()));
-        activity.setTitle(event.getInteraction().getTitle());
-        activity.setContent(event.getInteraction().getContent());
-        activity.setUrl(event.getInteraction().getLink());
-        activity.setLinks(getLinks(event.getInteraction()));
-        addDatasiftExtension(activity, event);
-        if( event.getInteraction().getGeo() != null) {
-            addLocationExtension(activity, event.getInteraction());
-        }
-        return activity;
-    }
-
-    public static Actor buildActor(Interaction interaction) {
-        Actor actor = new Actor();
-        actor.setId(formatId(interaction.getAuthor().getId().toString()));
-        actor.setDisplayName(interaction.getAuthor().getUsername());
-        Image image = new Image();
-        image.setUrl(interaction.getAuthor().getAvatar());
-        actor.setImage(image);
-        if (interaction.getAuthor().getLink()!=null){
-            actor.setUrl(interaction.getAuthor().getLink());
-        }
-        return actor;
-    }
-
-    public static ActivityObject buildActivityObject(Interaction interaction) {
-        ActivityObject actObj = new ActivityObject();
-        actObj.setObjectType(interaction.getContenttype());
-        return actObj;
-    }
-
-    public static List<Link> getLinks(Interaction interaction) {
-        List<Link> links = Lists.newArrayList();
-        return links;
-    }
-
-    public static ActivityObject buildTarget(Interaction interaction) {
-        return null;
-    }
-
-    public static void addLocationExtension(Activity activity, Interaction interaction) {
-        Map<String, Object> extensions = ensureExtensions(activity);
-        Map<String, Object> location = new HashMap<String, Object>();
-        Map<String, Double> coordinates = new HashMap<String, Double>();
-        coordinates.put("latitude", interaction.getGeo().getLatitude());
-        coordinates.put("longitude", interaction.getGeo().getLongitude());
-        location.put("coordinates", coordinates);
-        extensions.put("location", location);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c49e600a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializer.java
new file mode 100644
index 0000000..71cfc65
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInteractionActivitySerializer.java
@@ -0,0 +1,172 @@
+package org.apache.streams.datasift.serializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.AnnotationIntrospector;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.interaction.Interaction;
+import org.apache.streams.pojo.json.*;
+import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
+
+/**
+* Created with IntelliJ IDEA.
+* User: sblackmon
+*/
+public class DatasiftInteractionActivitySerializer implements ActivitySerializer<String>, Serializable {
+
+    ObjectMapper mapper = new StreamsDatasiftMapper();
+
+    @Override
+    public String serializationFormat() {
+        return "application/json+datasift.com.v1.1";
+    }
+
+    @Override
+    public String serialize(Activity deserialized) {
+        throw new UnsupportedOperationException("Cannot currently serialize to Datasift JSON");
+    }
+
+    @Override
+    public Activity deserialize(String serialized) {
+
+        mapper = StreamsTwitterMapper.getInstance();
+
+        Datasift datasift = null;
+
+        try {
+            datasift = mapper.readValue(serialized, Datasift.class);
+        } catch (JsonProcessingException e) {
+            e.printStackTrace();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        Activity activity = new Activity();
+        try {
+
+            activity = convert(datasift);
+
+            return activity;
+
+        } catch (Exception e) {
+            throw new IllegalArgumentException("Unable to deserialize", e);
+        }
+
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<String> serializedList) {
+        List<Activity> activities = Lists.newArrayList();
+        for( String datasift : serializedList ) {
+            activities.add(deserialize(datasift));
+        }
+        return activities;
+    }
+
+    public static Generator buildGenerator(Interaction interaction) {
+        return null;
+    }
+
+    public static Icon getIcon(Interaction interaction) {
+        return null;
+    }
+
+    public static Provider buildProvider(Interaction interaction) {
+        Provider provider = new Provider();
+        provider.setId("id:providers:datasift");
+        return provider;
+    }
+
+    public static String getUrls(Interaction interaction) {
+        return null;
+    }
+
+    public static void addDatasiftExtension(Activity activity, Datasift datasift) {
+        Map<String, Object> extensions = org.apache.streams.data.util.ActivityUtil.ensureExtensions(activity);
+        extensions.put("datasift", datasift);
+    }
+
+    public static String formatId(String... idparts) {
+        return Joiner.on(":").join(Lists.asList("id:datasift", idparts));
+    }
+
+    public Activity convert(Datasift event) {
+
+        Activity activity = new Activity();
+        activity.setActor(buildActor(event.getInteraction()));
+        activity.setVerb("post");
+        activity.setObject(buildActivityObject(event.getInteraction()));
+        activity.setId(formatId(activity.getVerb(), event.getInteraction().getId()));
+        activity.setTarget(buildTarget(event.getInteraction()));
+        activity.setPublished(event.getInteraction().getCreatedAt());
+        activity.setGenerator(buildGenerator(event.getInteraction()));
+        activity.setIcon(getIcon(event.getInteraction()));
+        activity.setProvider(buildProvider(event.getInteraction()));
+        activity.setTitle(event.getInteraction().getTitle());
+        activity.setContent(event.getInteraction().getContent());
+        activity.setUrl(event.getInteraction().getLink());
+        activity.setLinks(getLinks(event.getInteraction()));
+        addDatasiftExtension(activity, event);
+        if( event.getInteraction().getGeo() != null) {
+            addLocationExtension(activity, event.getInteraction());
+        }
+        return activity;
+    }
+
+    public static Actor buildActor(Interaction interaction) {
+        Actor actor = new Actor();
+        actor.setId(formatId(interaction.getAuthor().getId().toString()));
+        actor.setDisplayName(interaction.getAuthor().getUsername());
+        Image image = new Image();
+        image.setUrl(interaction.getAuthor().getAvatar());
+        actor.setImage(image);
+        if (interaction.getAuthor().getLink()!=null){
+            actor.setUrl(interaction.getAuthor().getLink());
+        }
+        return actor;
+    }
+
+    public static ActivityObject buildActivityObject(Interaction interaction) {
+        ActivityObject actObj = new ActivityObject();
+        actObj.setObjectType(interaction.getContenttype());
+        return actObj;
+    }
+
+    public static List<Link> getLinks(Interaction interaction) {
+        List<Link> links = Lists.newArrayList();
+        return links;
+    }
+
+    public static ActivityObject buildTarget(Interaction interaction) {
+        return null;
+    }
+
+    public static void addLocationExtension(Activity activity, Interaction interaction) {
+        Map<String, Object> extensions = ensureExtensions(activity);
+        Map<String, Object> location = new HashMap<String, Object>();
+        Map<String, Double> coordinates = new HashMap<String, Double>();
+        coordinates.put("latitude", interaction.getGeo().getLatitude());
+        coordinates.put("longitude", interaction.getGeo().getLongitude());
+        location.put("coordinates", coordinates);
+        extensions.put("location", location);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c49e600a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftJsonActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftJsonActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftJsonActivitySerializer.java
new file mode 100644
index 0000000..4a6c860
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftJsonActivitySerializer.java
@@ -0,0 +1,75 @@
+package org.apache.streams.datasift.serializer;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.datasift.interaction.Interaction;
+import org.apache.streams.datasift.provider.DatasiftEventClassifier;
+import org.apache.streams.datasift.twitter.Twitter;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Provider;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by sblackmon on 3/26/14.
+ */
+public class DatasiftJsonActivitySerializer implements ActivitySerializer<String>
+{
+
+    public DatasiftJsonActivitySerializer() {
+
+    }
+
+    DatasiftInteractionActivitySerializer datasiftInteractionActivitySerializer = new DatasiftInteractionActivitySerializer();
+    DatasiftInteractionActivitySerializer datasiftTwitterActivitySerializer = new DatasiftTwitterActivitySerializer();
+
+    @Override
+    public String serializationFormat() {
+        return null;
+    }
+
+    @Override
+    public String serialize(Activity deserialized) throws ActivitySerializerException {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public Activity deserialize(String serialized) throws ActivitySerializerException {
+
+        Class documentSubType = DatasiftEventClassifier.detectClass(serialized);
+
+        Activity activity;
+        if( documentSubType == Twitter.class )
+            activity = datasiftTwitterActivitySerializer.deserialize(serialized);
+        else if( documentSubType == Interaction.class )
+            activity = datasiftInteractionActivitySerializer.deserialize(serialized);
+        else throw new ActivitySerializerException("unrecognized type");
+
+        return activity;
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<String> serializedList) {
+        throw new NotImplementedException();
+    }
+
+    public static Provider getProvider() {
+        Provider provider = new Provider();
+        provider.setId("id:providers:datasift");
+        return provider;
+    }
+
+    public static void addTwitterExtension(Activity activity, ObjectNode event) {
+        Map<String, Object> extensions = org.apache.streams.data.util.ActivityUtil.ensureExtensions(activity);
+        extensions.put("datasift", event);
+    }
+
+    public static String formatId(String... idparts) {
+        return Joiner.on(":").join(Lists.asList("id:twitter", idparts));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c49e600a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTwitterActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTwitterActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTwitterActivitySerializer.java
new file mode 100644
index 0000000..51bd985
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTwitterActivitySerializer.java
@@ -0,0 +1,33 @@
+package org.apache.streams.datasift.serializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.datasift.Datasift;
+import org.apache.streams.datasift.interaction.Interaction;
+import org.apache.streams.pojo.json.*;
+import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
+
+/**
+* Created with IntelliJ IDEA.
+* User: sblackmon
+*/
+public class DatasiftTwitterActivitySerializer extends DatasiftInteractionActivitySerializer {
+
+    public Activity convert(Datasift event) {
+        Activity activity = super.convert(event);
+        activity.getExtensions().setAdditionalProperty("twitter", event.getTwitter());
+        return activity;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c49e600a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/StreamsDatasiftMapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/StreamsDatasiftMapper.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/StreamsDatasiftMapper.java
new file mode 100644
index 0000000..f5b39ce
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/StreamsDatasiftMapper.java
@@ -0,0 +1,44 @@
+package org.apache.streams.datasift.serializer;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.io.IOException;
+
+/**
+ * Created by sblackmon on 3/27/14.
+ */
+public class StreamsDatasiftMapper extends StreamsJacksonMapper {
+
+    public static final DateTimeFormatter DATASIFT_FORMAT = DateTimeFormat.forPattern("EEE, dd MMM yyyy HH:mm:ss Z");
+
+    private static final StreamsDatasiftMapper INSTANCE = new StreamsDatasiftMapper();
+
+    public static StreamsDatasiftMapper getInstance(){
+        return INSTANCE;
+    }
+
+    public StreamsDatasiftMapper() {
+        super();
+        registerModule(new SimpleModule()
+        {
+            {
+                addDeserializer(DateTime.class, new StdDeserializer<DateTime>(DateTime.class) {
+                    @Override
+                    public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException, JsonProcessingException {
+                        return DATASIFT_FORMAT.parseDateTime(jpar.getValueAsString());
+                    }
+                });
+            }
+        });
+
+    }
+
+}