You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/05/05 20:54:39 UTC

[52/52] [abbrv] git commit: sblackmon changes to sblackmon branch

sblackmon changes to sblackmon branch


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/040e2c4c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/040e2c4c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/040e2c4c

Branch: refs/heads/sblackmon
Commit: 040e2c4c605f88aebcaa5979ed6a58aeed84fa49
Parents: ef59fa4
Author: sblackmon <sb...@w2odigital.com>
Authored: Tue Apr 29 15:10:25 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Tue Apr 29 15:10:25 2014 -0500

----------------------------------------------------------------------
 .../streams/console/ConsolePersistWriter.java   |  2 +
 .../streams-persist-elasticsearch/pom.xml       |  2 +-
 .../ElasticsearchPersistWriter.java             | 64 ++++++++------
 .../streams/hdfs/WebHdfsPersistReaderTask.java  |  3 +-
 .../apache/streams/json/JsonPathExtractor.java  |  6 +-
 .../json/test/JsonPathExtractorTest.java        | 88 +++++++++++++++++++
 .../apache/streams/json/test/JsonPathTest.java  | 92 --------------------
 .../provider/DatasiftEventProcessor.java        |  1 +
 .../serializer/StreamsDatasiftMapper.java       |  9 +-
 .../main/jsonschema/com/datasift/Datasift.json  | 14 ++-
 .../test/DatasiftEventClassifierTest.java       | 15 ++--
 .../src/main/xmlschema/com/sysomos/sysomos.xsd  |  2 +-
 .../twitter/processor/TwitterTypeConverter.java | 17 +++-
 .../serializer/StreamsTwitterMapper.java        | 12 +++
 .../TwitterJsonRetweetActivitySerializer.java   |  3 +-
 .../TwitterJsonTweetActivitySerializer.java     |  8 +-
 ...erJsonUserstreameventActivitySerializer.java | 32 ++++---
 .../jsonschema/com/twitter/UserstreamEvent.json | 20 ++++-
 .../streams/twitter/test/TweetSerDeTest.java    |  2 -
 .../org/apache/streams/core/StreamsDatum.java   |  4 +
 .../streams/jackson/StreamsJacksonMapper.java   |  3 +-
 .../streams/jackson/StreamsJacksonModule.java   |  5 ++
 streams-runtimes/streams-runtime-pig/pom.xml    | 39 ++++++---
 .../streams/pig/StreamsProcessDatumExec.java    | 34 ++++++--
 .../streams/pig/StreamsProcessDocumentExec.java | 67 +++++---------
 .../streams/pig/StreamsSerializerExec.java      | 23 +++--
 .../src/test/java/PigProcessorTest.java         | 32 -------
 .../src/test/java/PigSerializerTest.java        | 40 ---------
 .../streams/pig/test/PigProcessDatumTest.java   | 49 +++++++++++
 .../streams/pig/test/PigSerializerTest.java     | 46 ++++++++++
 .../src/test/resources/pigserializertest.pig    |  9 +-
 .../org/apache/streams/util/ComponentUtils.java | 16 ++++
 32 files changed, 438 insertions(+), 321 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
index e8efc6d..1f04ed3 100644
--- a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
+++ b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
@@ -13,6 +13,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 
 public class ConsolePersistWriter implements StreamsPersistWriter {
 
+    public final static String STREAMS_ID = "ConsolePersistWriter";
+
     private static final Logger LOGGER = LoggerFactory.getLogger(ConsolePersistWriter.class);
 
     protected volatile Queue<StreamsDatum> persistQueue;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-persist-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/pom.xml b/streams-contrib/streams-persist-elasticsearch/pom.xml
index 95f3357..12371b2 100644
--- a/streams-contrib/streams-persist-elasticsearch/pom.xml
+++ b/streams-contrib/streams-persist-elasticsearch/pom.xml
@@ -12,7 +12,7 @@
     <artifactId>streams-persist-elasticsearch</artifactId>
 
     <properties>
-        <elasticsearch.version>1.1.0</elasticsearch.version>
+        <elasticsearch.version>1.1.1</elasticsearch.version>
     </properties>
 
     <dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 405d1b8..71a42ea 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -7,6 +7,7 @@ import com.google.common.base.Preconditions;
 import com.typesafe.config.Config;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.*;
+import org.apache.streams.data.util.RFC3339Utils;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
@@ -17,6 +18,7 @@ import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.client.Client;
@@ -24,6 +26,8 @@ import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.index.query.IdsQueryBuilder;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
+import org.joda.time.DateTime;
+import org.joda.time.format.ISODateTimeFormat;
 import org.json.JSONException;
 import org.json.JSONObject;
 import org.slf4j.Logger;
@@ -118,13 +122,14 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
         String json;
         try {
             String id = streamsDatum.getId();
+            String ts = Long.toString(streamsDatum.getTimestamp().getMillis());
             if( streamsDatum.getDocument() instanceof String )
                 json = streamsDatum.getDocument().toString();
             else {
                 json = mapper.writeValueAsString(streamsDatum.getDocument());
             }
 
-            add(config.getIndex(), config.getType(), id, json);
+            add(config.getIndex(), config.getType(), id, json, ts);
 
         } catch (Exception e) {
             LOGGER.warn("{} {}", e.getMessage());
@@ -333,20 +338,24 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
 
     public void add(String indexName, String type, String json)
     {
-        add(indexName, type, null, json);
+        add(indexName, type, null, json, null);
     }
 
-    public void add(String indexName, String type, String id, String json)
+    public void add(String indexName, String type, String id, String json, String ts)
     {
-        IndexRequest indexRequest;
+        IndexRequestBuilder indexRequestBuilder = new IndexRequestBuilder(manager.getClient());
 
-        // They didn't specify an ID, so we will create one for them.
-        if(id == null)
-            indexRequest = new IndexRequest(indexName, type);
-        else
-            indexRequest = new IndexRequest(indexName, type, id);
+        indexRequestBuilder.setIndex(indexName);
+        indexRequestBuilder.setType(type);
 
-        indexRequest.source(json);
+        indexRequestBuilder.setSource(json);
+
+        // / They didn't specify an ID, so we will create one for them.
+        if(id != null)
+            indexRequestBuilder.setId(id);
+
+        if(ts != null)
+            indexRequestBuilder.setTimestamp(ts);
 
         // If there is a parentID that is associated with this bulk, then we are
         // going to have to parse the raw JSON and attempt to dereference
@@ -357,7 +366,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
             {
                 // The JSONObject constructor can throw an exception, it is called
                 // out explicitly here so we can catch it.
-                indexRequest = indexRequest.parent(new JSONObject(json).getString(parentID));
+                indexRequestBuilder.setParent(new JSONObject(json).getString(parentID));
             }
             catch(JSONException e)
             {
@@ -365,7 +374,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
                 totalFailed++;
             }
         }
-        add(indexRequest);
+        add(indexRequestBuilder.request());
     }
 
     public void add(UpdateRequest updateRequest)
@@ -479,22 +488,23 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
             }
         }
     }
-    public void add(String indexName, String type, Map<String, Object> toImport)
-    {
-        for (String id : toImport.keySet())
-            add(indexName, type, id, (String)toImport.get(id));
-    }
 
-    private void checkThenAddBatch(String index, String type, Map<String, String> workingBatch)
-    {
-        Set<String> invalidIDs = checkIds(workingBatch.keySet(), index, type);
-
-        for(String toAddId : workingBatch.keySet())
-            if(!invalidIDs.contains(toAddId))
-                add(index, type, toAddId, workingBatch.get(toAddId));
-
-        LOGGER.info("Adding Batch: {} -> {}", workingBatch.size(), invalidIDs.size());
-    }
+//    public void add(String indexName, String type, Map<String, Object> toImport)
+//    {
+//        for (String id : toImport.keySet())
+//            add(indexName, type, id, (String)toImport.get(id));
+//    }
+//
+//    private void checkThenAddBatch(String index, String type, Map<String, String> workingBatch)
+//    {
+//        Set<String> invalidIDs = checkIds(workingBatch.keySet(), index, type);
+//
+//        for(String toAddId : workingBatch.keySet())
+//            if(!invalidIDs.contains(toAddId))
+//                add(index, type, toAddId, workingBatch.get(toAddId));
+//
+//        LOGGER.info("Adding Batch: {} -> {}", workingBatch.size(), invalidIDs.size());
+//    }
 
 
     private Set<String> checkIds(Set<String> input, String index, String type) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index b04350e..65bd6dc 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -4,6 +4,7 @@ import com.google.common.base.Strings;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.streams.core.DatumStatus;
 import org.apache.streams.core.StreamsDatum;
+import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +44,7 @@ public class WebHdfsPersistReaderTask implements Runnable {
                         if( !Strings.isNullOrEmpty(line) ) {
                             reader.countersCurrent.incrementAttempt();
                             String[] fields = line.split(Character.toString(reader.DELIMITER));
-                            StreamsDatum entry = new StreamsDatum(fields[3], fields[0]);
+                            StreamsDatum entry = new StreamsDatum(fields[3], fields[0], new DateTime(Long.parseLong(fields[2])));
                             write( entry );
                             reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
                         }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
index 3059b1e..eab1da1 100644
--- a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
+++ b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
@@ -24,7 +24,7 @@ import java.util.List;
 public class JsonPathExtractor implements StreamsProcessor {
 
     public JsonPathExtractor() {
-        System.out.println("creating JsonPathExtractor for nada");
+        System.out.println("creating JsonPathExtractor");
     }
 
     public JsonPathExtractor(String pathExpression) {
@@ -85,8 +85,7 @@ public class JsonPathExtractor implements StreamsProcessor {
                             StreamsDatum matchDatum = new StreamsDatum(match);
                             result.add(matchDatum);
                         } else if ( item instanceof JSONObject ) {
-                            ObjectNode objectNode = mapper.readValue(mapper.writeValueAsString(item), ObjectNode.class);
-                            StreamsDatum matchDatum = new StreamsDatum(objectNode);
+                            StreamsDatum matchDatum = new StreamsDatum(item);
                             result.add(matchDatum);
                         }
                     }
@@ -109,6 +108,7 @@ public class JsonPathExtractor implements StreamsProcessor {
     public void prepare(Object configurationObject) {
         if( configurationObject instanceof String )
             jsonPath = JsonPath.compile((String)(configurationObject));
+
         mapper.registerModule(new JsonOrgModule());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java b/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
new file mode 100644
index 0000000..2484939
--- /dev/null
+++ b/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
@@ -0,0 +1,88 @@
+package org.apache.streams.json.test;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.io.FileUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.json.JsonPathExtractor;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+* Created with IntelliJ IDEA.
+* User: sblackmon
+* Date: 8/20/13
+* Time: 5:57 PM
+* To change this template use File | Settings | File Templates.
+*/
+public class JsonPathExtractorTest {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(JsonPathExtractorTest.class);
+
+    private String testJson;
+
+    @Before
+    public void initialize() {
+        try {
+            testJson = FileUtils.readFileToString(new File("src/test/resources/books.json"));
+        } catch (IOException e) {
+            e.printStackTrace();
+            Assert.fail();
+        }
+    }
+
+    @Test
+    public void test1()
+    {
+        JsonPathExtractor extractor = new JsonPathExtractor();
+        extractor.prepare("$.store.book[*].author");
+        List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson));
+        assertThat(result.size(), is(2));
+        assertTrue(result.get(0).getDocument() instanceof String);
+        assertTrue(result.get(1).getDocument() instanceof String);
+    }
+
+    @Ignore
+    @Test
+    public void test2()
+    {
+        JsonPathExtractor extractor = new JsonPathExtractor();
+        extractor.prepare("$.store.book[?(@.category == 'reference')]");
+        List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson));
+        assertThat(result.size(), is(1));
+        assertTrue(result.get(0).getDocument() instanceof ObjectNode);
+    }
+
+    @Ignore
+    @Test
+    public void test3()
+    {
+        JsonPathExtractor extractor = new JsonPathExtractor();
+        extractor.prepare("$.store.book[?(@.price > 10)]");
+        List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson));
+        assertThat(result.size(), is(1));
+        assertTrue(result.get(0).getDocument() instanceof ObjectNode);
+    }
+
+    @Ignore
+    @Test
+    public void test4()
+    {
+        JsonPathExtractor extractor = new JsonPathExtractor();
+        extractor.prepare("$.store.book[?(@.isbn)]");
+        List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson));
+        assertThat(result.size(), is(1));
+        assertTrue(result.get(0).getDocument() instanceof ObjectNode);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathTest.java b/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathTest.java
deleted file mode 100644
index fe3a41b..0000000
--- a/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package org.apache.streams.json.test;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.json.JsonPathExtractor;
-import org.apache.streams.pojo.json.Activity;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-import java.util.List;
-import java.util.Scanner;
-
-import static org.hamcrest.CoreMatchers.*;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-/**
-* Created with IntelliJ IDEA.
-* User: sblackmon
-* Date: 8/20/13
-* Time: 5:57 PM
-* To change this template use File | Settings | File Templates.
-*/
-public class JsonPathTest {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(JsonPathTest.class);
-
-    private String testJson;
-
-    @Before
-    public void initialize() {
-        try {
-            testJson = FileUtils.readFileToString(new File("src/test/resources/books.json"));
-        } catch (IOException e) {
-            e.printStackTrace();
-            Assert.fail();
-        }
-    }
-
-    @Test
-    public void test1()
-    {
-        JsonPathExtractor extractor = new JsonPathExtractor();
-        extractor.prepare("$.store.book[*].author");
-        List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson));
-        assertThat(result.size(), is(2));
-        assertTrue(result.get(0).getDocument() instanceof String);
-        assertTrue(result.get(1).getDocument() instanceof String);
-    }
-
-    @Test
-    public void test2()
-    {
-        JsonPathExtractor extractor = new JsonPathExtractor();
-        extractor.prepare("$.store.book[?(@.category == 'reference')]");
-        List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson));
-        assertThat(result.size(), is(1));
-        assertTrue(result.get(0).getDocument() instanceof ObjectNode);
-    }
-
-    @Test
-    public void test3()
-    {
-        JsonPathExtractor extractor = new JsonPathExtractor();
-        extractor.prepare("$.store.book[?(@.price > 10)]");
-        List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson));
-        assertThat(result.size(), is(1));
-        assertTrue(result.get(0).getDocument() instanceof ObjectNode);
-    }
-
-    @Test
-    public void test4()
-    {
-        JsonPathExtractor extractor = new JsonPathExtractor();
-        extractor.prepare("$.store.book[?(@.isbn)]");
-        List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson));
-        assertThat(result.size(), is(1));
-        assertTrue(result.get(0).getDocument() instanceof ObjectNode);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java
index b70226e..5432e82 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java
@@ -72,6 +72,7 @@ public class DatasiftEventProcessor implements Runnable {
                     json = (String) item;
                     wrapper = mapper.readValue(json, ObjectNode.class);
                     datasift = mapper.convertValue(wrapper.get("data"), Datasift.class);
+                    json = mapper.writeValueAsString(datasift);
                 } else if( item instanceof Interaction ) {
                     datasift = mapper.convertValue(item, Datasift.class);
                     json = mapper.writeValueAsString(datasift);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/StreamsDatasiftMapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/StreamsDatasiftMapper.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/StreamsDatasiftMapper.java
index f5b39ce..b22ef03 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/StreamsDatasiftMapper.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/StreamsDatasiftMapper.java
@@ -5,6 +5,8 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.DeserializationContext;
 import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
 import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.streams.data.util.RFC3339Utils;
+import org.apache.streams.jackson.StreamsDateTimeDeserializer;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.joda.time.DateTime;
 import org.joda.time.format.DateTimeFormat;
@@ -32,7 +34,12 @@ public class StreamsDatasiftMapper extends StreamsJacksonMapper {
             {
                 addDeserializer(DateTime.class, new StdDeserializer<DateTime>(DateTime.class) {
                     @Override
-                    public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException, JsonProcessingException {
+                    public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException {
+                        DateTime dateTime;
+                        try {
+                            dateTime = RFC3339Utils.getInstance().parseUTC(jpar.getValueAsString());
+                            return dateTime;
+                        } catch( Exception e ) {};
                         return DATASIFT_FORMAT.parseDateTime(jpar.getValueAsString());
                     }
                 });

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
index 18c5faf..ea5d9ed 100644
--- a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
+++ b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
@@ -812,7 +812,12 @@
                     ]
                 },
                 "mention_ids": {
-                    "type": "integer"
+                    "type": "array",
+                    "items": [
+                        {
+                            "type": "integer"
+                        }
+                    ]
                 },
                 "mentions": {
                     "type": "array",
@@ -988,7 +993,12 @@
                             }
                         },
                         "mention_ids": {
-                            "type": "integer"
+                            "type": "array",
+                            "items": [
+                                {
+                                    "type": "integer"
+                                }
+                            ]
                         },
                         "mentions": {
                             "type": "array",

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-provider-datasift/src/test/java/com/datasift/test/DatasiftEventClassifierTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java/com/datasift/test/DatasiftEventClassifierTest.java b/streams-contrib/streams-provider-datasift/src/test/java/com/datasift/test/DatasiftEventClassifierTest.java
index 378b906..ad19a21 100644
--- a/streams-contrib/streams-provider-datasift/src/test/java/com/datasift/test/DatasiftEventClassifierTest.java
+++ b/streams-contrib/streams-provider-datasift/src/test/java/com/datasift/test/DatasiftEventClassifierTest.java
@@ -5,6 +5,7 @@ import org.apache.streams.datasift.blog.Blog;
 import org.apache.streams.datasift.board.Board;
 import org.apache.streams.datasift.config.Facebook;
 import org.apache.streams.datasift.provider.DatasiftEventClassifier;
+import org.apache.streams.datasift.twitter.Twitter;
 import org.apache.streams.datasift.youtube.YouTube;
 import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Retweet;
@@ -18,18 +19,18 @@ import org.junit.Test;
  */
 public class DatasiftEventClassifierTest {
 
-    //private String twitter = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":410898682381615105,\"id_str\":\"410898682381615105\",\"text\":\"Men's Basketball Single-Game Tickets Available - A limited number of tickets remain for Kentucky's upcoming men's ... http:\\/\\/t.co\\/SH5YZGpdRx\",\"source\":\"\\u003ca href=\\\"http:\\/\\/www.hootsuite.com\\\" rel=\\\"nofollow\\\"\\u003eHootSuite\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":91407775,\"id_str\":\"91407775\",\"name\":\"Winchester, KY\",\"screen_name\":\"winchester_ky\",\"location\":\"\",\"url\":null,\"description\":null,\"protected\":false,\"followers_count\":136,\"friends_count\":0,\"listed_count\":1,\"created_at\":\"Fri Nov 20 19:29:02 +0000 2009\",\"favourites_count\":0,\"utc_offset\":null,\"time_zone\":null,\"geo_enabled\":false,\"verified\
 ":false,\"statuses_count\":1793,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C0DEED\",\"profile_background_image_url\":\"http:\\/\\/abs.twimg.com\\/images\\/themes\\/theme1\\/bg.png\",\"profile_background_image_url_https\":\"https:\\/\\/abs.twimg.com\\/images\\/themes\\/theme1\\/bg.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/613854495\\/winchester_sociallogo_normal.jpg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/613854495\\/winchester_sociallogo_normal.jpg\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"C0DEED\",\"profile_sidebar_fill_color\":\"DDEEF6\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":true,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contribu
 tors\":null,\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/t.co\\/SH5YZGpdRx\",\"expanded_url\":\"http:\\/\\/ow.ly\\/2C2XL1\",\"display_url\":\"ow.ly\\/2C2XL1\",\"indices\":[118,140]}],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"}\n";
+    private String twitter = "{\"interaction\":{\"schema\":{\"version\":3},\"source\":\"Twitter for iPhone\",\"author\":{\"username\":\"_Zaire_910\",\"name\":\"t.✌️\",\"id\":546470895,\"avatar\":\"http://pbs.twimg.com/profile_images/456939014537232384/2INEvVhl_normal.jpeg\",\"link\":\"http://twitter.com/_Zaire_910\",\"language\":\"en\"},\"type\":\"twitter\",\"created_at\":\"Fri, 18 Apr 2014 20:07:55 +0000\",\"received_at\":1.397851676143E9,\"content\":\"Then i couldnt even eat it cuh i was high asf , then we went n picked up bri took ha to subway n shit ha n quan took a day in subway\",\"id\":\"1e3c73520e01af80e074f351c8efd5fa\",\"link\":\"http://twitter.com/_Zaire_910/status/457249186267021313\"},\"klout\":{\"score\":47},\"language\":{\"tag\":\"en\",\"tag_extended\":\"en\",\"confidence\":99},\"salience\":{\"content\":{\"sentiment\":-7}},\"twitter\":{\"created_at\":\"Fri, 18 Apr 2014 20:07:55 +0000\",\"filter_level\":\"medium\",\"id\":\"457249186267021313\",\"lang\":\"en\",\"sou
 rce\":\"<a href=\\\"http://twitter.com/download/iphone\\\" rel=\\\"nofollow\\\">Twitter for iPhone</a>\",\"text\":\"Then i couldnt even eat it cuh i was high asf , then we went n picked up bri took ha to subway n shit ha n quan took a day in subway\",\"user\":{\"name\":\"t.✌️\",\"description\":\"free my brother , Teddy , nicc and my uncle\",\"statuses_count\":19037,\"followers_count\":1456,\"friends_count\":1541,\"screen_name\":\"_Zaire_910\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/456939014537232384/2INEvVhl_normal.jpeg\",\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/456939014537232384/2INEvVhl_normal.jpeg\",\"lang\":\"en\",\"time_zone\":\"Atlantic Time (Canada)\",\"utc_offset\":-10800,\"listed_count\":0,\"id\":546470895,\"id_str\":\"546470895\",\"geo_enabled\":false,\"verified\":false,\"favourites_count\":539,\"created_at\":\"Fri, 06 Apr 2012 00:17:13 +0000\"}}}\n";
     private String facebook = "{\"facebook\":{\"author\":{\"avatar\":\"https:\\/\\/graph.facebook.com\\/100000529773118\\/picture\",\"id\":\"100000529773118\",\"link\":\"http:\\/\\/www.facebook.com\\/profile.php?id=100000529773118\",\"name\":\"Ghatholl Perkasa\"},\"caption\":\"jakartagreater.com\",\"created_at\":\"Thu, 03 Oct 2013 00:48:32 +0000\",\"description\":\"Pangdam VI Mulawarman menjelaskan, tahun 2014 mendapatkan dukungan 2 unit tank Leopard-2, serta 2 unit Helikopter Apache, menemani Helikopter Bell 412 EP\",\"id\":\"100000529773118_709380572422929\",\"link\":\"http:\\/\\/jakartagreater.com\\/helikopter-apache-dan-tank-leopard-temani-bell-412-ep-di-kalimantan\\/\",\"message\":\"http:\\/\\/jakartagreater.com\\/helikopter-apache-dan-tank-leopard-temani-bell-412-ep-di-kalimantan\\/\",\"name\":\"Helikopter Apache dan Tank Leopard Temani Bell 412 EP di Kalimantan | JakartaGreater\",\"source\":\"web\",\"type\":\"link\"},\"interaction\":{\"author\":{\"avatar\":\"https:\\/\\/graph
 .facebook.com\\/100000529773118\\/picture\",\"id\":\"100000529773118\",\"link\":\"http:\\/\\/www.facebook.com\\/profile.php?id=100000529773118\",\"name\":\"Ghatholl Perkasa\"},\"content\":\"http:\\/\\/jakartagreater.com\\/helikopter-apache-dan-tank-leopard-temani-bell-412-ep-di-kalimantan\\/\",\"created_at\":\"Thu, 03 Oct 2013 00:49:02 +0000\",\"id\":\"1e32bc586b7fa000e066c7349fcb4420\",\"link\":\"http:\\/\\/www.facebook.com\\/100000529773118_709380572422929\",\"schema\":{\"version\":3},\"source\":\"web\",\"subtype\":\"link\",\"title\":\"Helikopter Apache dan Tank Leopard Temani Bell 412 EP di Kalimantan | JakartaGreater\",\"type\":\"facebook\"},\"links\":{\"code\":[200],\"created_at\":[\"Wed, 02 Oct 2013 14:42:46 +0000\"],\"meta\":{\"charset\":[\"UTF-8\"],\"content_type\":[\"text\\/html\"],\"description\":[\"Pangdam VI Mulawarman menjelaskan, tahun 2014 mendapatkan dukungan 2 unit tank Leopard-2, serta 2 unit Helikopter Apache, menemani Helikopter Bell 412 EP\"],\"keywords\":[[\"He
 likopter Apache Kalimantan\"]],\"lang\":[\"en-us\"]},\"normalized_url\":[\"http:\\/\\/jakartagreater.com\\/helikopter-apache-dan-tank-leopard-temani-bell-412-ep-di-kalimantan\"],\"retweet_count\":[0],\"title\":[\"Helikopter Apache dan Tank Leopard Temani Bell 412 EP di Kalimantan | JakartaGreater\"],\"url\":[\"http:\\/\\/jakartagreater.com\\/helikopter-apache-dan-tank-leopard-temani-bell-412-ep-di-kalimantan\\/\"]},\"salience\":{\"content\":{\"sentiment\":0},\"title\":{\"sentiment\":0}}}\n";
     private String youtube = "{\"interaction\":{\"author\":{\"link\":\"http:\\/\\/youtube.com\\/y_4c0uy7ikisinxqjnquva\",\"name\":\"y_4c0uy7ikisinxqjnquva\"},\"content\":\"comenten y subo otros.\",\"contenttype\":\"html\",\"created_at\":\"Thu, 03 Oct 2013 01:54:22 +0000\",\"id\":\"1e32bc7c8499aa80e0612c55f4551676\",\"link\":\"http:\\/\\/www.youtube.com\\/watch?v=Yj6ckGTNJ7M\",\"schema\":{\"version\":3},\"title\":\"FA! Fuerte Apache NO TE CONFUNDAS 2013] DEMO\",\"type\":\"youtube\"},\"language\":{\"confidence\":62,\"tag\":\"es\"},\"salience\":{\"content\":{\"sentiment\":7},\"title\":{\"sentiment\":0}},\"youtube\":{\"author\":{\"link\":\"http:\\/\\/youtube.com\\/y_4c0uy7ikisinxqjnquva\",\"name\":\"y_4c0uy7ikisinxqjnquva\"},\"category\":\"People &#38; Blogs\",\"content\":\"comenten y subo otros.\",\"contenttype\":\"html\",\"created_at\":\"Thu, 03 Oct 2013 01:04:41 +0000\",\"duration\":\"219\",\"id\":\"1e32bc7c8499aa80e0612c55f4551676\",\"title\":\"FA! Fuerte Apache NO TE CONFUNDAS 2013
 ] DEMO\",\"type\":\"video\",\"videolink\":\"http:\\/\\/www.youtube.com\\/watch?v=Yj6ckGTNJ7M\"}}\n";
     private String board = "{\"board\":{\"anchor\":\"post8881971\",\"author\":{\"avatar\":\"http:\\/\\/www.gstatic.com\\/psa\\/static\\/1.gif\",\"link\":\"http:\\/\\/forums.techguy.org\\/members\\/496500-phantom010.html\",\"location\":\"Cyberspace\",\"name\":\"Phantom010\",\"registered\":\"Sun, 01 Mar 2009 00:00:00 +0000\",\"username\":\"Phantom010\"},\"boardname\":\"Tech Support Guy - Free help for Windows 7, Vista, XP, and more!\",\"categories\":\"Software\",\"content\":\"<div class=\\\"KonaBody\\\"><div style=\\\"margin:20px; margin-top:5px; \\\"> <div class=\\\"smallfont\\\" style=\\\"margin-bottom:2px\\\">Quote:<\\/div> <table cellpadding=\\\"6\\\" cellspacing=\\\"0\\\" border=\\\"0\\\" width=\\\"100%\\\"> <tr> <td class=\\\"alt2\\\" style=\\\"border:1px inset\\\"> <div>\\n\\t\\t\\t\\t\\tOriginally Posted by <strong>donsor<\\/strong> <a href=\\\"http:\\/\\/forums.techguy.org\\/all-other-software\\/1122437-apache-open-office.html#post8881771\\\" rel=\\\"nofollow\\\"><img class=\
 \\"inlineimg sprite-viewpost\\\" pagespeed_lazy_src=\\\"http:\\/\\/attach.tsgstatic.com\\/tsg\\/styles\\/common\\/blank.png\\\" width=\\\"12\\\" height=\\\"12\\\" alt=\\\"View Post\\\" src=\\\"http:\\/\\/www.gstatic.com\\/psa\\/static\\/1.gif\\\" onload=\\\"pagespeed.lazyLoadImages.loadIfVisible(this);\\\"\\/><\\/a> <\\/div> <div style=\\\"font-style:italic\\\">I tried downloading Libreoffice but encountered some problem. No big deal I'll find something else.<\\/div> <\\/td> <\\/tr> <\\/table> <\\/div>What kind of problem?<\\/div>\",\"contenttype\":\"html\",\"countrycode\":\"US\",\"crawled\":\"Sun, 23 Mar 2014 09:18:13 +0000\",\"created_at\":\"Sat, 22 Mar 2014 16:03:00 -0500\",\"domain\":\"www.techguy.org\",\"forumid\":\"469bbbf115a\",\"forumname\":\"All Other Software\",\"forumurl\":\"http:\\/\\/forums.techguy.org\\/18-all-other-software\\/\",\"gmt\":\"-5\",\"id\":\"1e3b20559a7daa00e072105aaa3de61e\",\"language\":\"English\",\"link\":\"http:\\/\\/forums.techguy.org\\/all-other-soft
 ware\\/1122437-apache-open-office.html#post8881971\",\"siteid\":\"3cbc7f773\",\"thread\":\"http:\\/\\/forums.techguy.org\\/all-other-software\\/1122437-apache-open-office.html\",\"threadid\":\"1122437\",\"title\":\"Apache Open Office\",\"topics\":\"Computers\",\"type\":\"post\"},\"interaction\":{\"author\":{\"avatar\":\"http:\\/\\/www.gstatic.com\\/psa\\/static\\/1.gif\",\"link\":\"http:\\/\\/forums.techguy.org\\/members\\/496500-phantom010.html\",\"location\":\"Cyberspace\",\"name\":\"Phantom010\",\"registered\":\"Sun, 01 Mar 2009 00:00:00 +0000\",\"username\":\"Phantom010\"},\"content\":\"<div class=\\\"KonaBody\\\"><div style=\\\"margin:20px; margin-top:5px; \\\"> <div class=\\\"smallfont\\\" style=\\\"margin-bottom:2px\\\">Quote:<\\/div> <table cellpadding=\\\"6\\\" cellspacing=\\\"0\\\" border=\\\"0\\\" width=\\\"100%\\\"> <tr> <td class=\\\"alt2\\\" style=\\\"border:1px inset\\\"> <div>\\n\\t\\t\\t\\t\\tOriginally Posted by <strong>donsor<\\/strong> <a href=\\\"http:\\/\\/foru
 ms.techguy.org\\/all-other-software\\/1122437-apache-open-office.html#post8881771\\\" rel=\\\"nofollow\\\"><img class=\\\"inlineimg sprite-viewpost\\\" pagespeed_lazy_src=\\\"http:\\/\\/attach.tsgstatic.com\\/tsg\\/styles\\/common\\/blank.png\\\" width=\\\"12\\\" height=\\\"12\\\" alt=\\\"View Post\\\" src=\\\"http:\\/\\/www.gstatic.com\\/psa\\/static\\/1.gif\\\" onload=\\\"pagespeed.lazyLoadImages.loadIfVisible(this);\\\"\\/><\\/a> <\\/div> <div style=\\\"font-style:italic\\\">I tried downloading Libreoffice but encountered some problem. No big deal I'll find something else.<\\/div> <\\/td> <\\/tr> <\\/table> <\\/div>What kind of problem?<\\/div>\",\"contenttype\":\"html\",\"created_at\":\"Sat, 22 Mar 2014 16:03:00 -0500\",\"id\":\"1e3b20559a7daa00e072105aaa3de61e\",\"link\":\"http:\\/\\/forums.techguy.org\\/all-other-software\\/1122437-apache-open-office.html#post8881971\",\"schema\":{\"version\":3},\"subtype\":\"post\",\"title\":\"Apache Open Office\",\"type\":\"board\"},\"langua
 ge\":{\"confidence\":99,\"tag\":\"en\",\"tag_extended\":\"en\"},\"salience\":{\"content\":{\"sentiment\":-4},\"title\":{\"sentiment\":0}}}";
     private String blog = "{\"blog\":{\"author\":{\"name\":\"Stileex\"},\"blog\":{\"link\":\"http:\\/\\/laptop.toprealtime.net\\/\",\"title\":\"Laptop News\"},\"blogid\":\"71107731\",\"content\":\"If you are a gaming freak and play video games even when you are on the move, you need a gaming laptop that gives a smooth and fast performance and has great graphics and audio output. MSI has come up with a small and lightweight laptop with stunning\\u00A0\\u2026\",\"contenttype\":\"html\",\"created_at\":\"Sat, 22 Mar 2014 21:35:23 +0000\",\"domain\":\"laptop.toprealtime.net\",\"guid\":\"8921b89d1faad3d22a6e2a7ed55765e1\",\"id\":\"1e3b209dfc61af80e072ce1ee22bb088\",\"lang\":\"en\",\"link\":\"http:\\/\\/laptop.toprealtime.net\\/2014\\/msi-ge60-apache-pro-003-15-6%e2%80%b3-excellent-gaming-laptop-pressandupdate\\/\",\"parseddate\":\"Sun, 23 Mar 2014 00:21:59 +0000\",\"postid\":\"ed8205a9-2812-4fa6-a730-692b75adc9d7\",\"title\":\"MSI GE60 Apache Pro 003 15.6\\u2033: Excellent Gaming Laptop \
 \u2013 PressAndUpdate\",\"type\":\"post\"},\"interaction\":{\"author\":{\"name\":\"Stileex\"},\"content\":\"If you are a gaming freak and play video games even when you are on the move, you need a gaming laptop that gives a smooth and fast performance and has great graphics and audio output. MSI has come up with a small and lightweight laptop with stunning\\u00A0\\u2026\",\"contenttype\":\"html\",\"created_at\":\"Sat, 22 Mar 2014 21:35:23 +0000\",\"id\":\"1e3b209dfc61af80e072ce1ee22bb088\",\"link\":\"http:\\/\\/laptop.toprealtime.net\\/2014\\/msi-ge60-apache-pro-003-15-6%e2%80%b3-excellent-gaming-laptop-pressandupdate\\/\",\"received_at\":4743640105474628098,\"schema\":{\"version\":3},\"subtype\":\"post\",\"title\":\"MSI GE60 Apache Pro 003 15.6\\u2033: Excellent Gaming Laptop \\u2013 PressAndUpdate\",\"type\":\"blog\"},\"language\":{\"confidence\":99,\"tag\":\"en\",\"tag_extended\":\"en\"},\"salience\":{\"content\":{\"entities\":[{\"name\":\"MSI\",\"sentiment\":0,\"confident\":1,\"
 label\":\"Company\",\"evidence\":2,\"type\":\"Company\",\"about\":1,\"themes\":[\"fast performance\",\"great graphics\",\"audio output\",\"gaming freak\",\"gaming laptop\",\"lightweight laptop\",\"stunning \\u2026\"]}],\"sentiment\":2,\"topics\":[{\"name\":\"Video Games\",\"score\":0.64548820257187,\"additional\":\"If you are a gaming freak and play video games even when you are on the move, you need a gaming laptop that gives a smooth and fast performance and has great graphics and audio output. MSI has come up with a small and lightweight laptop with stunning \\u2026\",\"hits\":0},{\"name\":\"Hardware\",\"score\":0.45651730895042,\"additional\":\"If you are a gaming freak and play video games even when you are on the move, you need a gaming laptop that gives a smooth and fast performance and has great graphics and audio output. MSI has come up with a small and lightweight laptop with stunning \\u2026\",\"hits\":0}]},\"title\":{\"sentiment\":6,\"topics\":[{\"name\":\"Video Games\",
 \"score\":0.62834107875824,\"additional\":\"MSI GE60 Apache Pro 003 15.6\\u2033: Excellent Gaming Laptop \\u2013 PressAndUpdate\",\"hits\":0}]}}}";
 
-//    @Test
-//    public void testDetectTwitter() {
-//        Class result = TwitterEventClassifier.detectClass(twitter);
-//        if( !result.equals(Tweet.class) )
-//            Assert.fail();
-//    }
+    @Test
+    public void testDetectTwitter() {
+        Class result = DatasiftEventClassifier.detectClass(twitter);
+        if( !result.equals(Twitter.class) )
+            Assert.fail();
+    }
 
     @Test
     public void testDetectFacebook() {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-provider-sysomos/src/main/xmlschema/com/sysomos/sysomos.xsd
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/xmlschema/com/sysomos/sysomos.xsd b/streams-contrib/streams-provider-sysomos/src/main/xmlschema/com/sysomos/sysomos.xsd
index 6558189..b41ecfe 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/xmlschema/com/sysomos/sysomos.xsd
+++ b/streams-contrib/streams-provider-sysomos/src/main/xmlschema/com/sysomos/sysomos.xsd
@@ -50,7 +50,7 @@
                                         <xs:element type="xs:anyURI" name="tweetJsonLink" minOccurs="0"/>
                                         <xs:element type="xs:anyURI" name="tweetHbLink" minOccurs="0"/>
                                         <xs:element type="xs:string" name="twitterFollowers" minOccurs="0"/>
-                                        <xs:element type="xs:string" name=" " minOccurs="0"/>
+                                        <xs:element type="xs:string" name="twitterFollowing" minOccurs="0"/>
                                     </xs:sequence>
                                 </xs:complexType>
                             </xs:element>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
index 5e88bdd..aca8dc4 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
@@ -43,11 +43,16 @@ public class TwitterTypeConverter implements StreamsProcessor {
 
     public final static String TERMINATE = new String("TERMINATE");
 
+    public TwitterTypeConverter() {
+        // for pig
+    }
+
     public TwitterTypeConverter(Class inClass, Class outClass) {
         this.inClass = inClass;
         this.outClass = outClass;
     }
 
+
     public Queue<StreamsDatum> getProcessorOutputQueue() {
         return outQueue;
     }
@@ -175,9 +180,19 @@ public class TwitterTypeConverter implements StreamsProcessor {
     }
 
     @Override
-    public void prepare(Object o) {
+    public void prepare(Object configurationObject) {
         mapper = new StreamsTwitterMapper();
         twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
+        if( configurationObject instanceof String[] ) {
+            // for pig
+            String[] prepareArgs = (String[]) configurationObject;
+            try {
+                inClass = Class.forName(prepareArgs[0]);
+                outClass = Class.forName(prepareArgs[1]);
+            } catch (ClassNotFoundException e) {
+                e.printStackTrace();
+            }
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
index 6b61036..da17af6 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
@@ -24,6 +24,18 @@ public class StreamsTwitterMapper extends StreamsJacksonMapper {
 
     public static final DateTimeFormatter TWITTER_FORMAT = DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss Z yyyy");
 
+    public static final Long getMillis(String dateTime) {
+
+        // this function is for pig which doesn't handle exceptions well
+        try {
+            Long result = TWITTER_FORMAT.parseMillis(dateTime);
+            return result;
+        } catch( Exception e ) {
+            return null;
+        }
+
+    }
+
     private static final StreamsTwitterMapper INSTANCE = new StreamsTwitterMapper();
 
     public static StreamsTwitterMapper getInstance(){

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
index 51e39b1..0d13eaa 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
@@ -87,7 +87,7 @@ public class TwitterJsonRetweetActivitySerializer implements ActivitySerializer<
         } catch( Exception e ) {
             throw new ActivitySerializerException("Unable to determine content", e);
         }
-        activity.setUrl("http://twitter.com/" + retweet.getIdStr());
+        activity.setUrl("http://twitter.com/" + retweet.getUser().getIdStr() + "/status/" + retweet.getIdStr());
         activity.setLinks(TwitterJsonTweetActivitySerializer.getLinks(retweet.getRetweetedStatus()));
         addTwitterExtension(activity, mapper.convertValue(retweet, ObjectNode.class));
         addLocationExtension(activity, retweet);
@@ -130,7 +130,6 @@ public class TwitterJsonRetweetActivitySerializer implements ActivitySerializer<
     public static void addLocationExtension(Activity activity, Retweet retweet) {
         Map<String, Object> extensions = ensureExtensions(activity);
         Map<String, Object> location = new HashMap<String, Object>();
-        location.put("id", TwitterJsonActivitySerializer.formatId(retweet.getIdStr()));
         location.put("coordinates", retweet.getCoordinates());
         extensions.put("location", location);
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
index e830884..74c011a 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
@@ -82,7 +82,7 @@ public class TwitterJsonTweetActivitySerializer implements ActivitySerializer<St
         activity.setProvider(getProvider());
         activity.setTitle("");
         activity.setContent(tweet.getText());
-        activity.setUrl("http://twitter.com/" + tweet.getIdStr());
+        activity.setUrl("http://twitter.com/" + tweet.getUser().getIdStr() + "/status/" + tweet.getIdStr());
         activity.setLinks(getLinks(tweet));
 
         addTwitterExtension(activity, mapper.convertValue(tweet, ObjectNode.class));
@@ -132,12 +132,6 @@ public class TwitterJsonTweetActivitySerializer implements ActivitySerializer<St
     public static void addLocationExtension(Activity activity, Tweet tweet) {
         Map<String, Object> extensions = ensureExtensions(activity);
         Map<String, Object> location = new HashMap<String, Object>();
-        location.put("id", formatId(
-                Optional.fromNullable(
-                        tweet.getIdStr())
-                        .or(Optional.of(tweet.getId().toString()))
-                        .orNull()
-        ));
         location.put("coordinates", tweet.getCoordinates());
         extensions.put("location", location);
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java
index 3f43477..c0768e3 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java
@@ -12,6 +12,7 @@ import org.apache.streams.pojo.json.ActivityObject;
 import org.apache.streams.pojo.json.Actor;
 import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.UserstreamEvent;
 
 import java.util.List;
 
@@ -46,42 +47,45 @@ public class TwitterJsonUserstreameventActivitySerializer implements ActivitySer
         return null;
     }
 
-    public Activity convert(ObjectNode event) throws ActivitySerializerException {
+    public Activity convert(ObjectNode item) throws ActivitySerializerException {
 
         ObjectMapper mapper = StreamsTwitterMapper.getInstance();
-        Delete delete = null;
+        UserstreamEvent event = null;
         try {
-            delete = mapper.treeToValue(event, Delete.class);
+            event = mapper.treeToValue(item, UserstreamEvent.class);
         } catch (JsonProcessingException e) {
             e.printStackTrace();
         }
 
         Activity activity = new Activity();
-        activity.setActor(buildActor(delete));
-        activity.setVerb("delete");
-        activity.setObject(buildActivityObject(delete));
-        activity.setId(TwitterJsonActivitySerializer.formatId(activity.getVerb(), delete.getDelete().getStatus().getIdStr()));
+        activity.setActor(buildActor(event));
+        activity.setVerb(detectVerb(event));
+        activity.setObject(buildActivityObject(event));
+        activity.setId(TwitterJsonActivitySerializer.formatId(activity.getVerb()));
         if(Strings.isNullOrEmpty(activity.getId()))
             throw new ActivitySerializerException("Unable to determine activity id");
         activity.setProvider(getProvider());
-        addTwitterExtension(activity, event);
         return activity;
     }
 
-    public Actor buildActor(Delete delete) {
+    public Actor buildActor(UserstreamEvent event) {
         Actor actor = new Actor();
-        actor.setId(formatId(delete.getDelete().getStatus().getUserIdStr()));
+        //actor.setId(formatId(delete.getDelete().getStatus().getUserIdStr()));
         return actor;
     }
 
-    public ActivityObject buildActivityObject(Delete delete) {
+    public ActivityObject buildActivityObject(UserstreamEvent event) {
         ActivityObject actObj = new ActivityObject();
-        actObj.setId(formatId(delete.getDelete().getStatus().getIdStr()));
-        actObj.setObjectType("tweet");
+        //actObj.setId(formatId(delete.getDelete().getStatus().getIdStr()));
+        //actObj.setObjectType("tweet");
         return actObj;
     }
 
-    public ActivityObject buildTarget(Tweet tweet) {
+    public String detectVerb(UserstreamEvent event) {
+        return null;
+    }
+
+    public ActivityObject buildTarget(UserstreamEvent event) {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/UserstreamEvent.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/UserstreamEvent.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/UserstreamEvent.json
index 652567e..07b5883 100644
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/UserstreamEvent.json
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/UserstreamEvent.json
@@ -10,9 +10,23 @@
         },
         "event_type": {
             "type": "string",
-            "items": {
-                "type": "integer"
-            }
+            "enum" : [
+                "access_revoked",
+                "block",
+                "unblock",
+                "favorite",
+                "unfavorite",
+                "follow",
+                "unfollow",
+                "list_created",
+                "list_destroyed",
+                "list_updated",
+                "list_member_added",
+                "list_member_removed",
+                "list_user_subscribed",
+                "list_user_unsubscribed",
+                "user_update"
+            ]
         },
         "source": {
             "type": "string",

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
index 1f6c86d..241aaed 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
@@ -45,8 +45,6 @@ public class TweetSerDeTest {
     public void Tests()
     {
         mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
 
         InputStream is = TweetSerDeTest.class.getResourceAsStream("/testtweets.txt");
         InputStreamReader isr = new InputStreamReader(is);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java b/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
index 78623b0..9866697 100644
--- a/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
+++ b/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
@@ -117,6 +117,10 @@ public class StreamsDatum implements Serializable {
         return id;
     }
 
+    public void setId(String id) {
+        this.id = id;
+    }
+
     @Override
     public boolean equals(Object o) {
         if(o instanceof StreamsDatum) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
index f9934b6..f489911 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
@@ -32,7 +32,7 @@ public class StreamsJacksonMapper extends ObjectMapper {
         super();
         registerModule(new StreamsJacksonModule());
         disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
-        configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
+        configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
         configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, Boolean.TRUE);
         configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
         configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
@@ -42,6 +42,7 @@ public class StreamsJacksonMapper extends ObjectMapper {
         configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, Boolean.FALSE);
         configure(SerializationFeature.WRITE_EMPTY_JSON_ARRAYS, Boolean.FALSE);
         configure(SerializationFeature.WRITE_NULL_MAP_VALUES, Boolean.FALSE);
+        configure(SerializationFeature.WRITE_NULL_MAP_VALUES, Boolean.FALSE);
         setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.DEFAULT);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
index bcf0f65..6fcc1f1 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
@@ -4,6 +4,8 @@ import com.fasterxml.jackson.databind.module.SimpleModule;
 import org.joda.time.DateTime;
 import org.joda.time.Period;
 
+import java.util.Map;
+
 /**
  * Created by sblackmon on 3/27/14.
  */
@@ -16,6 +18,9 @@ public class StreamsJacksonModule extends SimpleModule {
 
         addSerializer(Period.class, new StreamsPeriodSerializer(Period.class));
         addDeserializer(Period.class, new StreamsPeriodDeserializer(Period.class));
+
+        addSerializer(Map.class, new StreamsMapSerializer(Map.class));
+
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-runtimes/streams-runtime-pig/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/pom.xml b/streams-runtimes/streams-runtime-pig/pom.xml
index e62a7c8..2b0a9c6 100644
--- a/streams-runtimes/streams-runtime-pig/pom.xml
+++ b/streams-runtimes/streams-runtime-pig/pom.xml
@@ -29,8 +29,8 @@
     <artifactId>streams-runtime-pig</artifactId>
 
     <properties>
-        <hadoop-client.version>2.0.0-cdh4.5.0.1-SNAPSHOT</hadoop-client.version>
-        <pig.version>0.11.0-cdh4.5.0.1-SNAPSHOT</pig.version>
+        <hadoop-client.version>2.0.0-cdh4.5.0</hadoop-client.version>
+        <pig.version>0.11.0-cdh4.5.0</pig.version>
     </properties>
 
     <dependencies>
@@ -44,18 +44,18 @@
             <artifactId>streams-core</artifactId>
             <version>0.1-SNAPSHOT</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.streams</groupId>
-            <artifactId>streams-runtime-local</artifactId>
-            <version>0.1-SNAPSHOT</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.streams</groupId>
-            <artifactId>streams-runtime-local</artifactId>
-            <version>0.1-SNAPSHOT</version>
-            <scope>test-jar</scope>
-        </dependency>
+        <!--<dependency>-->
+            <!--<groupId>org.apache.streams</groupId>-->
+            <!--<artifactId>streams-runtime-local</artifactId>-->
+            <!--<version>0.1-SNAPSHOT</version>-->
+            <!--<scope>test-jar</scope>-->
+        <!--</dependency>-->
+        <!--<dependency>-->
+            <!--<groupId>org.apache.streams</groupId>-->
+            <!--<artifactId>streams-runtime-local</artifactId>-->
+            <!--<version>0.1-SNAPSHOT</version>-->
+            <!--<scope>test</scope>-->
+        <!--</dependency>-->
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-provider-twitter</artifactId>
@@ -89,6 +89,10 @@
             <artifactId>pigunit</artifactId>
         </dependency>
         <dependency>
+            <groupId>com.linkedin.datafu</groupId>
+            <artifactId>datafu</artifactId>
+        </dependency>
+        <dependency>
             <groupId>jline</groupId>
             <artifactId>jline</artifactId>
             <version>2.11</version>
@@ -122,6 +126,13 @@
                 <version>${pig.version}</version>
                 <scope>test</scope>
             </dependency>
+            <dependency>
+                <groupId>com.linkedin.datafu</groupId>
+                <artifactId>datafu</artifactId>
+                <version>1.2.0</version>
+                <scope>compile</scope>
+            </dependency>
+
         </dependencies>
     </dependencyManagement>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
index 6b4705d..2f7538d 100644
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
@@ -2,6 +2,7 @@ package org.apache.streams.pig;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import datafu.pig.util.AliasableEvalFunc;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.EvalFunc;
@@ -10,9 +11,12 @@ import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.util.RFC3339Utils;
+import org.joda.time.DateTime;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -23,7 +27,7 @@ import java.util.concurrent.TimeUnit;
  * Created by sblackmon on 3/25/14.
  */
 @MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 30, intDefault = 10)
-public class StreamsProcessDatumExec extends EvalFunc<DataBag> {
+public class StreamsProcessDatumExec extends AliasableEvalFunc<DataBag> {
 
     TupleFactory mTupleFactory = TupleFactory.getInstance();
     BagFactory mBagFactory = BagFactory.getInstance();
@@ -46,19 +50,26 @@ public class StreamsProcessDatumExec extends EvalFunc<DataBag> {
     }
 
     @Override
-    public DataBag exec(Tuple line) throws IOException {
+    public DataBag exec(Tuple input) throws IOException {
 
-        if (line == null || line.size() == 0)
+        if (input == null || input.size() == 0)
             return null;
 
+        DataBag output = BagFactory.getInstance().newDefaultBag();
+
         Configuration conf = UDFContext.getUDFContext().getJobConf();
 
-        String id = (String)line.get(0);
-        String provider = (String)line.get(1);
-        Long timestamp = (Long)line.get(2);
-        String object = (String)line.get(3);
+        String id = getString(input, "id");
+        String source = getString(input, "source");
+        Long timestamp;
+        try {
+            timestamp = getLong(input, "timestamp");
+        } catch( Exception e ) {
+            timestamp = RFC3339Utils.parseUTC(getString(input, "timestamp")).getMillis();
+        }
+        String object = getString(input, "object");
 
-        StreamsDatum entry = new StreamsDatum(object);
+        StreamsDatum entry = new StreamsDatum(object, id, new DateTime(timestamp));
 
         List<StreamsDatum> resultSet = streamsProcessor.process(entry);
         List<Tuple> resultTupleList = Lists.newArrayList();
@@ -66,7 +77,7 @@ public class StreamsProcessDatumExec extends EvalFunc<DataBag> {
         for( StreamsDatum resultDatum : resultSet ) {
             Tuple tuple = mTupleFactory.newTuple();
             tuple.append(id);
-            tuple.append(provider);
+            tuple.append(source);
             tuple.append(timestamp);
             tuple.append(resultDatum.getDocument());
             resultTupleList.add(tuple);
@@ -81,4 +92,9 @@ public class StreamsProcessDatumExec extends EvalFunc<DataBag> {
     public void finish() {
         streamsProcessor.cleanUp();
     }
+
+    @Override
+    public Schema getOutputSchema(Schema schema) {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
index 5b0890f..5455fd9 100644
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
@@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import datafu.pig.util.SimpleEvalFunc;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.EvalFunc;
@@ -28,79 +29,53 @@ import java.util.concurrent.TimeUnit;
  * Created by sblackmon on 3/25/14.
  */
 @MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 30, intDefault = 10)
-public class StreamsProcessDocumentExec extends EvalFunc<String> {
-
-    TupleFactory mTupleFactory = TupleFactory.getInstance();
-    BagFactory mBagFactory = BagFactory.getInstance();
+public class StreamsProcessDocumentExec extends SimpleEvalFunc<String> {
 
     StreamsProcessor streamsProcessor;
     ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
     public StreamsProcessDocumentExec(String... execArgs) throws ClassNotFoundException{
-        System.out.println("A");
         Preconditions.checkNotNull(execArgs);
-        System.out.println(Arrays.toString(execArgs));
-        System.out.println("B");
         Preconditions.checkArgument(execArgs.length > 0);
-        System.out.println(execArgs.length);
         String classFullName = execArgs[0];
-        System.out.println("C");
         Preconditions.checkNotNull(classFullName);
-        System.out.println("E");
+        String[] prepareArgs = (String[]) ArrayUtils.remove(execArgs, 0);
         streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(classFullName));
         if( execArgs.length == 1 ) {
-            System.out.println("PREPARE NULL");
             streamsProcessor.prepare(null);
-        } else if( execArgs.length == 2 ) {
-            System.out.println("PREPARE 1 " + execArgs[1]);
-            streamsProcessor.prepare(execArgs[1]);
-        } else {
-            System.out.println("PREPARE ARRAY " + Arrays.toString(execArgs));
-            streamsProcessor.prepare(execArgs);
+        } else if( execArgs.length > 1 ) {
+            streamsProcessor.prepare(prepareArgs);
         }
-        System.out.println("F");
     }
 
-    @Override
-    public String exec(Tuple input) throws IOException {
+    public String call(String input) throws IOException {
 
-        System.out.println("H");
         Preconditions.checkNotNull(streamsProcessor);
-        Preconditions.checkNotNull(input);
-        Preconditions.checkArgument(input.size() == 1);
-        System.out.println("I");
-
-        String document = (String) input.get(0);
-
-        Preconditions.checkNotNull(document);
 
-        // System.out.println(document);
+        try {
 
-        StreamsDatum entry = new StreamsDatum(document);
+            Preconditions.checkNotNull(input);
 
-        Preconditions.checkNotNull(entry);
+            StreamsDatum entry = new StreamsDatum(input);
 
-        // System.out.println(entry);
+            Preconditions.checkNotNull(entry);
 
-        List<StreamsDatum> resultSet = streamsProcessor.process(entry);
+            List<StreamsDatum> resultSet = streamsProcessor.process(entry);
 
-        System.out.println(resultSet);
+            Object resultDoc = null;
+            resultDoc = resultSet.get(0).getDocument();
 
-        Object resultDoc = null;
-        for( StreamsDatum resultDatum : resultSet ) {
-            resultDoc = resultDatum.getDocument();
-        }
-
-        Preconditions.checkNotNull(resultDoc);
+            Preconditions.checkNotNull(resultDoc);
 
-        System.out.println(resultDoc);
+            if( resultDoc instanceof String )
+                return (String) resultDoc;
+            else
+                return mapper.writeValueAsString(resultDoc);
 
-        if( resultDoc instanceof String )
-            return (String) resultDoc;
-        else if( resultDoc instanceof ObjectNode)
-            return mapper.writeValueAsString(resultDoc);
-        else
+        } catch (Exception e) {
+            System.err.println("Error with " + input.toString());
             return null;
+        }
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
index 67f3c61..50c7fab 100644
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
@@ -3,6 +3,7 @@ package org.apache.streams.pig;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import datafu.pig.util.SimpleEvalFunc;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.EvalFunc;
@@ -26,7 +27,7 @@ import java.util.concurrent.TimeUnit;
  * Created by sblackmon on 3/25/14.
  */
 @MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 10, intDefault = 10)
-public class StreamsSerializerExec extends EvalFunc<String> {
+public class StreamsSerializerExec extends SimpleEvalFunc<String> {
 
     ActivitySerializer activitySerializer;
     ObjectMapper mapper = StreamsJacksonMapper.getInstance();
@@ -42,26 +43,24 @@ public class StreamsSerializerExec extends EvalFunc<String> {
         activitySerializer = StreamsComponentFactory.getSerializerInstance(Class.forName(classFullName));
     }
 
-    @Override
-    public String exec(Tuple input) throws IOException {
+    public String call(String input) throws IOException {
 
         Preconditions.checkNotNull(activitySerializer);
         Preconditions.checkNotNull(input);
-        Preconditions.checkArgument(input.size() == 1);
-        Configuration conf = UDFContext.getUDFContext().getJobConf();
 
-        String document = (String) input.get(0);
-
-        Preconditions.checkNotNull(document);
         Activity activity = null;
         try {
-            activity = activitySerializer.deserialize(document);
+            activity = activitySerializer.deserialize(input);
+
+            Preconditions.checkNotNull(activity);
+
+            return mapper.writeValueAsString(activity);
+
         } catch( Exception e ) {
             e.printStackTrace();
-        }
-        Preconditions.checkNotNull(activity);
 
-        return mapper.writeValueAsString(activity);
+            return null;
+        }
 
     }