You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/05/05 20:54:39 UTC
[52/52] [abbrv] git commit: sblackmon changes to sblackmon branch
sblackmon changes to sblackmon branch
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/040e2c4c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/040e2c4c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/040e2c4c
Branch: refs/heads/sblackmon
Commit: 040e2c4c605f88aebcaa5979ed6a58aeed84fa49
Parents: ef59fa4
Author: sblackmon <sb...@w2odigital.com>
Authored: Tue Apr 29 15:10:25 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Tue Apr 29 15:10:25 2014 -0500
----------------------------------------------------------------------
.../streams/console/ConsolePersistWriter.java | 2 +
.../streams-persist-elasticsearch/pom.xml | 2 +-
.../ElasticsearchPersistWriter.java | 64 ++++++++------
.../streams/hdfs/WebHdfsPersistReaderTask.java | 3 +-
.../apache/streams/json/JsonPathExtractor.java | 6 +-
.../json/test/JsonPathExtractorTest.java | 88 +++++++++++++++++++
.../apache/streams/json/test/JsonPathTest.java | 92 --------------------
.../provider/DatasiftEventProcessor.java | 1 +
.../serializer/StreamsDatasiftMapper.java | 9 +-
.../main/jsonschema/com/datasift/Datasift.json | 14 ++-
.../test/DatasiftEventClassifierTest.java | 15 ++--
.../src/main/xmlschema/com/sysomos/sysomos.xsd | 2 +-
.../twitter/processor/TwitterTypeConverter.java | 17 +++-
.../serializer/StreamsTwitterMapper.java | 12 +++
.../TwitterJsonRetweetActivitySerializer.java | 3 +-
.../TwitterJsonTweetActivitySerializer.java | 8 +-
...erJsonUserstreameventActivitySerializer.java | 32 ++++---
.../jsonschema/com/twitter/UserstreamEvent.json | 20 ++++-
.../streams/twitter/test/TweetSerDeTest.java | 2 -
.../org/apache/streams/core/StreamsDatum.java | 4 +
.../streams/jackson/StreamsJacksonMapper.java | 3 +-
.../streams/jackson/StreamsJacksonModule.java | 5 ++
streams-runtimes/streams-runtime-pig/pom.xml | 39 ++++++---
.../streams/pig/StreamsProcessDatumExec.java | 34 ++++++--
.../streams/pig/StreamsProcessDocumentExec.java | 67 +++++---------
.../streams/pig/StreamsSerializerExec.java | 23 +++--
.../src/test/java/PigProcessorTest.java | 32 -------
.../src/test/java/PigSerializerTest.java | 40 ---------
.../streams/pig/test/PigProcessDatumTest.java | 49 +++++++++++
.../streams/pig/test/PigSerializerTest.java | 46 ++++++++++
.../src/test/resources/pigserializertest.pig | 9 +-
.../org/apache/streams/util/ComponentUtils.java | 16 ++++
32 files changed, 438 insertions(+), 321 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
index e8efc6d..1f04ed3 100644
--- a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
+++ b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
@@ -13,6 +13,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
public class ConsolePersistWriter implements StreamsPersistWriter {
+ public final static String STREAMS_ID = "ConsolePersistWriter";
+
private static final Logger LOGGER = LoggerFactory.getLogger(ConsolePersistWriter.class);
protected volatile Queue<StreamsDatum> persistQueue;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-persist-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/pom.xml b/streams-contrib/streams-persist-elasticsearch/pom.xml
index 95f3357..12371b2 100644
--- a/streams-contrib/streams-persist-elasticsearch/pom.xml
+++ b/streams-contrib/streams-persist-elasticsearch/pom.xml
@@ -12,7 +12,7 @@
<artifactId>streams-persist-elasticsearch</artifactId>
<properties>
- <elasticsearch.version>1.1.0</elasticsearch.version>
+ <elasticsearch.version>1.1.1</elasticsearch.version>
</properties>
<dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 405d1b8..71a42ea 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -7,6 +7,7 @@ import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.*;
+import org.apache.streams.data.util.RFC3339Utils;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
@@ -17,6 +18,7 @@ import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
@@ -24,6 +26,8 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
+import org.joda.time.DateTime;
+import org.joda.time.format.ISODateTimeFormat;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
@@ -118,13 +122,14 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
String json;
try {
String id = streamsDatum.getId();
+ String ts = Long.toString(streamsDatum.getTimestamp().getMillis());
if( streamsDatum.getDocument() instanceof String )
json = streamsDatum.getDocument().toString();
else {
json = mapper.writeValueAsString(streamsDatum.getDocument());
}
- add(config.getIndex(), config.getType(), id, json);
+ add(config.getIndex(), config.getType(), id, json, ts);
} catch (Exception e) {
LOGGER.warn("{} {}", e.getMessage());
@@ -333,20 +338,24 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
public void add(String indexName, String type, String json)
{
- add(indexName, type, null, json);
+ add(indexName, type, null, json, null);
}
- public void add(String indexName, String type, String id, String json)
+ public void add(String indexName, String type, String id, String json, String ts)
{
- IndexRequest indexRequest;
+ IndexRequestBuilder indexRequestBuilder = new IndexRequestBuilder(manager.getClient());
- // They didn't specify an ID, so we will create one for them.
- if(id == null)
- indexRequest = new IndexRequest(indexName, type);
- else
- indexRequest = new IndexRequest(indexName, type, id);
+ indexRequestBuilder.setIndex(indexName);
+ indexRequestBuilder.setType(type);
- indexRequest.source(json);
+ indexRequestBuilder.setSource(json);
+
+ // / They didn't specify an ID, so we will create one for them.
+ if(id != null)
+ indexRequestBuilder.setId(id);
+
+ if(ts != null)
+ indexRequestBuilder.setTimestamp(ts);
// If there is a parentID that is associated with this bulk, then we are
// going to have to parse the raw JSON and attempt to dereference
@@ -357,7 +366,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
{
// The JSONObject constructor can throw an exception, it is called
// out explicitly here so we can catch it.
- indexRequest = indexRequest.parent(new JSONObject(json).getString(parentID));
+ indexRequestBuilder.setParent(new JSONObject(json).getString(parentID));
}
catch(JSONException e)
{
@@ -365,7 +374,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
totalFailed++;
}
}
- add(indexRequest);
+ add(indexRequestBuilder.request());
}
public void add(UpdateRequest updateRequest)
@@ -479,22 +488,23 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
}
}
}
- public void add(String indexName, String type, Map<String, Object> toImport)
- {
- for (String id : toImport.keySet())
- add(indexName, type, id, (String)toImport.get(id));
- }
- private void checkThenAddBatch(String index, String type, Map<String, String> workingBatch)
- {
- Set<String> invalidIDs = checkIds(workingBatch.keySet(), index, type);
-
- for(String toAddId : workingBatch.keySet())
- if(!invalidIDs.contains(toAddId))
- add(index, type, toAddId, workingBatch.get(toAddId));
-
- LOGGER.info("Adding Batch: {} -> {}", workingBatch.size(), invalidIDs.size());
- }
+// public void add(String indexName, String type, Map<String, Object> toImport)
+// {
+// for (String id : toImport.keySet())
+// add(indexName, type, id, (String)toImport.get(id));
+// }
+//
+// private void checkThenAddBatch(String index, String type, Map<String, String> workingBatch)
+// {
+// Set<String> invalidIDs = checkIds(workingBatch.keySet(), index, type);
+//
+// for(String toAddId : workingBatch.keySet())
+// if(!invalidIDs.contains(toAddId))
+// add(index, type, toAddId, workingBatch.get(toAddId));
+//
+// LOGGER.info("Adding Batch: {} -> {}", workingBatch.size(), invalidIDs.size());
+// }
private Set<String> checkIds(Set<String> input, String index, String type) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index b04350e..65bd6dc 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -4,6 +4,7 @@ import com.google.common.base.Strings;
import org.apache.hadoop.fs.FileStatus;
import org.apache.streams.core.DatumStatus;
import org.apache.streams.core.StreamsDatum;
+import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +44,7 @@ public class WebHdfsPersistReaderTask implements Runnable {
if( !Strings.isNullOrEmpty(line) ) {
reader.countersCurrent.incrementAttempt();
String[] fields = line.split(Character.toString(reader.DELIMITER));
- StreamsDatum entry = new StreamsDatum(fields[3], fields[0]);
+ StreamsDatum entry = new StreamsDatum(fields[3], fields[0], new DateTime(Long.parseLong(fields[2])));
write( entry );
reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
index 3059b1e..eab1da1 100644
--- a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
+++ b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
@@ -24,7 +24,7 @@ import java.util.List;
public class JsonPathExtractor implements StreamsProcessor {
public JsonPathExtractor() {
- System.out.println("creating JsonPathExtractor for nada");
+ System.out.println("creating JsonPathExtractor");
}
public JsonPathExtractor(String pathExpression) {
@@ -85,8 +85,7 @@ public class JsonPathExtractor implements StreamsProcessor {
StreamsDatum matchDatum = new StreamsDatum(match);
result.add(matchDatum);
} else if ( item instanceof JSONObject ) {
- ObjectNode objectNode = mapper.readValue(mapper.writeValueAsString(item), ObjectNode.class);
- StreamsDatum matchDatum = new StreamsDatum(objectNode);
+ StreamsDatum matchDatum = new StreamsDatum(item);
result.add(matchDatum);
}
}
@@ -109,6 +108,7 @@ public class JsonPathExtractor implements StreamsProcessor {
public void prepare(Object configurationObject) {
if( configurationObject instanceof String )
jsonPath = JsonPath.compile((String)(configurationObject));
+
mapper.registerModule(new JsonOrgModule());
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java b/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
new file mode 100644
index 0000000..2484939
--- /dev/null
+++ b/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
@@ -0,0 +1,88 @@
+package org.apache.streams.json.test;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.io.FileUtils;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.json.JsonPathExtractor;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+* Created with IntelliJ IDEA.
+* User: sblackmon
+* Date: 8/20/13
+* Time: 5:57 PM
+* To change this template use File | Settings | File Templates.
+*/
+public class JsonPathExtractorTest {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(JsonPathExtractorTest.class);
+
+ private String testJson;
+
+ @Before
+ public void initialize() {
+ try {
+ testJson = FileUtils.readFileToString(new File("src/test/resources/books.json"));
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void test1()
+ {
+ JsonPathExtractor extractor = new JsonPathExtractor();
+ extractor.prepare("$.store.book[*].author");
+ List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson));
+ assertThat(result.size(), is(2));
+ assertTrue(result.get(0).getDocument() instanceof String);
+ assertTrue(result.get(1).getDocument() instanceof String);
+ }
+
+ @Ignore
+ @Test
+ public void test2()
+ {
+ JsonPathExtractor extractor = new JsonPathExtractor();
+ extractor.prepare("$.store.book[?(@.category == 'reference')]");
+ List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson));
+ assertThat(result.size(), is(1));
+ assertTrue(result.get(0).getDocument() instanceof ObjectNode);
+ }
+
+ @Ignore
+ @Test
+ public void test3()
+ {
+ JsonPathExtractor extractor = new JsonPathExtractor();
+ extractor.prepare("$.store.book[?(@.price > 10)]");
+ List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson));
+ assertThat(result.size(), is(1));
+ assertTrue(result.get(0).getDocument() instanceof ObjectNode);
+ }
+
+ @Ignore
+ @Test
+ public void test4()
+ {
+ JsonPathExtractor extractor = new JsonPathExtractor();
+ extractor.prepare("$.store.book[?(@.isbn)]");
+ List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson));
+ assertThat(result.size(), is(1));
+ assertTrue(result.get(0).getDocument() instanceof ObjectNode);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathTest.java b/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathTest.java
deleted file mode 100644
index fe3a41b..0000000
--- a/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package org.apache.streams.json.test;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.json.JsonPathExtractor;
-import org.apache.streams.pojo.json.Activity;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-import java.util.List;
-import java.util.Scanner;
-
-import static org.hamcrest.CoreMatchers.*;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-/**
-* Created with IntelliJ IDEA.
-* User: sblackmon
-* Date: 8/20/13
-* Time: 5:57 PM
-* To change this template use File | Settings | File Templates.
-*/
-public class JsonPathTest {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(JsonPathTest.class);
-
- private String testJson;
-
- @Before
- public void initialize() {
- try {
- testJson = FileUtils.readFileToString(new File("src/test/resources/books.json"));
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- @Test
- public void test1()
- {
- JsonPathExtractor extractor = new JsonPathExtractor();
- extractor.prepare("$.store.book[*].author");
- List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson));
- assertThat(result.size(), is(2));
- assertTrue(result.get(0).getDocument() instanceof String);
- assertTrue(result.get(1).getDocument() instanceof String);
- }
-
- @Test
- public void test2()
- {
- JsonPathExtractor extractor = new JsonPathExtractor();
- extractor.prepare("$.store.book[?(@.category == 'reference')]");
- List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson));
- assertThat(result.size(), is(1));
- assertTrue(result.get(0).getDocument() instanceof ObjectNode);
- }
-
- @Test
- public void test3()
- {
- JsonPathExtractor extractor = new JsonPathExtractor();
- extractor.prepare("$.store.book[?(@.price > 10)]");
- List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson));
- assertThat(result.size(), is(1));
- assertTrue(result.get(0).getDocument() instanceof ObjectNode);
- }
-
- @Test
- public void test4()
- {
- JsonPathExtractor extractor = new JsonPathExtractor();
- extractor.prepare("$.store.book[?(@.isbn)]");
- List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson));
- assertThat(result.size(), is(1));
- assertTrue(result.get(0).getDocument() instanceof ObjectNode);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java
index b70226e..5432e82 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java
@@ -72,6 +72,7 @@ public class DatasiftEventProcessor implements Runnable {
json = (String) item;
wrapper = mapper.readValue(json, ObjectNode.class);
datasift = mapper.convertValue(wrapper.get("data"), Datasift.class);
+ json = mapper.writeValueAsString(datasift);
} else if( item instanceof Interaction ) {
datasift = mapper.convertValue(item, Datasift.class);
json = mapper.writeValueAsString(datasift);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/StreamsDatasiftMapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/StreamsDatasiftMapper.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/StreamsDatasiftMapper.java
index f5b39ce..b22ef03 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/StreamsDatasiftMapper.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/StreamsDatasiftMapper.java
@@ -5,6 +5,8 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.streams.data.util.RFC3339Utils;
+import org.apache.streams.jackson.StreamsDateTimeDeserializer;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
@@ -32,7 +34,12 @@ public class StreamsDatasiftMapper extends StreamsJacksonMapper {
{
addDeserializer(DateTime.class, new StdDeserializer<DateTime>(DateTime.class) {
@Override
- public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException, JsonProcessingException {
+ public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException {
+ DateTime dateTime;
+ try {
+ dateTime = RFC3339Utils.getInstance().parseUTC(jpar.getValueAsString());
+ return dateTime;
+ } catch( Exception e ) {};
return DATASIFT_FORMAT.parseDateTime(jpar.getValueAsString());
}
});
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
index 18c5faf..ea5d9ed 100644
--- a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
+++ b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
@@ -812,7 +812,12 @@
]
},
"mention_ids": {
- "type": "integer"
+ "type": "array",
+ "items": [
+ {
+ "type": "integer"
+ }
+ ]
},
"mentions": {
"type": "array",
@@ -988,7 +993,12 @@
}
},
"mention_ids": {
- "type": "integer"
+ "type": "array",
+ "items": [
+ {
+ "type": "integer"
+ }
+ ]
},
"mentions": {
"type": "array",
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-provider-datasift/src/test/java/com/datasift/test/DatasiftEventClassifierTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java/com/datasift/test/DatasiftEventClassifierTest.java b/streams-contrib/streams-provider-datasift/src/test/java/com/datasift/test/DatasiftEventClassifierTest.java
index 378b906..ad19a21 100644
--- a/streams-contrib/streams-provider-datasift/src/test/java/com/datasift/test/DatasiftEventClassifierTest.java
+++ b/streams-contrib/streams-provider-datasift/src/test/java/com/datasift/test/DatasiftEventClassifierTest.java
@@ -5,6 +5,7 @@ import org.apache.streams.datasift.blog.Blog;
import org.apache.streams.datasift.board.Board;
import org.apache.streams.datasift.config.Facebook;
import org.apache.streams.datasift.provider.DatasiftEventClassifier;
+import org.apache.streams.datasift.twitter.Twitter;
import org.apache.streams.datasift.youtube.YouTube;
import org.apache.streams.twitter.pojo.Delete;
import org.apache.streams.twitter.pojo.Retweet;
@@ -18,18 +19,18 @@ import org.junit.Test;
*/
public class DatasiftEventClassifierTest {
- //private String twitter = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":410898682381615105,\"id_str\":\"410898682381615105\",\"text\":\"Men's Basketball Single-Game Tickets Available - A limited number of tickets remain for Kentucky's upcoming men's ... http:\\/\\/t.co\\/SH5YZGpdRx\",\"source\":\"\\u003ca href=\\\"http:\\/\\/www.hootsuite.com\\\" rel=\\\"nofollow\\\"\\u003eHootSuite\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":91407775,\"id_str\":\"91407775\",\"name\":\"Winchester, KY\",\"screen_name\":\"winchester_ky\",\"location\":\"\",\"url\":null,\"description\":null,\"protected\":false,\"followers_count\":136,\"friends_count\":0,\"listed_count\":1,\"created_at\":\"Fri Nov 20 19:29:02 +0000 2009\",\"favourites_count\":0,\"utc_offset\":null,\"time_zone\":null,\"geo_enabled\":false,\"verified\
":false,\"statuses_count\":1793,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C0DEED\",\"profile_background_image_url\":\"http:\\/\\/abs.twimg.com\\/images\\/themes\\/theme1\\/bg.png\",\"profile_background_image_url_https\":\"https:\\/\\/abs.twimg.com\\/images\\/themes\\/theme1\\/bg.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/613854495\\/winchester_sociallogo_normal.jpg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/613854495\\/winchester_sociallogo_normal.jpg\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"C0DEED\",\"profile_sidebar_fill_color\":\"DDEEF6\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":true,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contribu
tors\":null,\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/t.co\\/SH5YZGpdRx\",\"expanded_url\":\"http:\\/\\/ow.ly\\/2C2XL1\",\"display_url\":\"ow.ly\\/2C2XL1\",\"indices\":[118,140]}],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"}\n";
+ private String twitter = "{\"interaction\":{\"schema\":{\"version\":3},\"source\":\"Twitter for iPhone\",\"author\":{\"username\":\"_Zaire_910\",\"name\":\"t.✌️\",\"id\":546470895,\"avatar\":\"http://pbs.twimg.com/profile_images/456939014537232384/2INEvVhl_normal.jpeg\",\"link\":\"http://twitter.com/_Zaire_910\",\"language\":\"en\"},\"type\":\"twitter\",\"created_at\":\"Fri, 18 Apr 2014 20:07:55 +0000\",\"received_at\":1.397851676143E9,\"content\":\"Then i couldnt even eat it cuh i was high asf , then we went n picked up bri took ha to subway n shit ha n quan took a day in subway\",\"id\":\"1e3c73520e01af80e074f351c8efd5fa\",\"link\":\"http://twitter.com/_Zaire_910/status/457249186267021313\"},\"klout\":{\"score\":47},\"language\":{\"tag\":\"en\",\"tag_extended\":\"en\",\"confidence\":99},\"salience\":{\"content\":{\"sentiment\":-7}},\"twitter\":{\"created_at\":\"Fri, 18 Apr 2014 20:07:55 +0000\",\"filter_level\":\"medium\",\"id\":\"457249186267021313\",\"lang\":\"en\",\"sou
rce\":\"<a href=\\\"http://twitter.com/download/iphone\\\" rel=\\\"nofollow\\\">Twitter for iPhone</a>\",\"text\":\"Then i couldnt even eat it cuh i was high asf , then we went n picked up bri took ha to subway n shit ha n quan took a day in subway\",\"user\":{\"name\":\"t.✌️\",\"description\":\"free my brother , Teddy , nicc and my uncle\",\"statuses_count\":19037,\"followers_count\":1456,\"friends_count\":1541,\"screen_name\":\"_Zaire_910\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/456939014537232384/2INEvVhl_normal.jpeg\",\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/456939014537232384/2INEvVhl_normal.jpeg\",\"lang\":\"en\",\"time_zone\":\"Atlantic Time (Canada)\",\"utc_offset\":-10800,\"listed_count\":0,\"id\":546470895,\"id_str\":\"546470895\",\"geo_enabled\":false,\"verified\":false,\"favourites_count\":539,\"created_at\":\"Fri, 06 Apr 2012 00:17:13 +0000\"}}}\n";
private String facebook = "{\"facebook\":{\"author\":{\"avatar\":\"https:\\/\\/graph.facebook.com\\/100000529773118\\/picture\",\"id\":\"100000529773118\",\"link\":\"http:\\/\\/www.facebook.com\\/profile.php?id=100000529773118\",\"name\":\"Ghatholl Perkasa\"},\"caption\":\"jakartagreater.com\",\"created_at\":\"Thu, 03 Oct 2013 00:48:32 +0000\",\"description\":\"Pangdam VI Mulawarman menjelaskan, tahun 2014 mendapatkan dukungan 2 unit tank Leopard-2, serta 2 unit Helikopter Apache, menemani Helikopter Bell 412 EP\",\"id\":\"100000529773118_709380572422929\",\"link\":\"http:\\/\\/jakartagreater.com\\/helikopter-apache-dan-tank-leopard-temani-bell-412-ep-di-kalimantan\\/\",\"message\":\"http:\\/\\/jakartagreater.com\\/helikopter-apache-dan-tank-leopard-temani-bell-412-ep-di-kalimantan\\/\",\"name\":\"Helikopter Apache dan Tank Leopard Temani Bell 412 EP di Kalimantan | JakartaGreater\",\"source\":\"web\",\"type\":\"link\"},\"interaction\":{\"author\":{\"avatar\":\"https:\\/\\/graph
.facebook.com\\/100000529773118\\/picture\",\"id\":\"100000529773118\",\"link\":\"http:\\/\\/www.facebook.com\\/profile.php?id=100000529773118\",\"name\":\"Ghatholl Perkasa\"},\"content\":\"http:\\/\\/jakartagreater.com\\/helikopter-apache-dan-tank-leopard-temani-bell-412-ep-di-kalimantan\\/\",\"created_at\":\"Thu, 03 Oct 2013 00:49:02 +0000\",\"id\":\"1e32bc586b7fa000e066c7349fcb4420\",\"link\":\"http:\\/\\/www.facebook.com\\/100000529773118_709380572422929\",\"schema\":{\"version\":3},\"source\":\"web\",\"subtype\":\"link\",\"title\":\"Helikopter Apache dan Tank Leopard Temani Bell 412 EP di Kalimantan | JakartaGreater\",\"type\":\"facebook\"},\"links\":{\"code\":[200],\"created_at\":[\"Wed, 02 Oct 2013 14:42:46 +0000\"],\"meta\":{\"charset\":[\"UTF-8\"],\"content_type\":[\"text\\/html\"],\"description\":[\"Pangdam VI Mulawarman menjelaskan, tahun 2014 mendapatkan dukungan 2 unit tank Leopard-2, serta 2 unit Helikopter Apache, menemani Helikopter Bell 412 EP\"],\"keywords\":[[\"He
likopter Apache Kalimantan\"]],\"lang\":[\"en-us\"]},\"normalized_url\":[\"http:\\/\\/jakartagreater.com\\/helikopter-apache-dan-tank-leopard-temani-bell-412-ep-di-kalimantan\"],\"retweet_count\":[0],\"title\":[\"Helikopter Apache dan Tank Leopard Temani Bell 412 EP di Kalimantan | JakartaGreater\"],\"url\":[\"http:\\/\\/jakartagreater.com\\/helikopter-apache-dan-tank-leopard-temani-bell-412-ep-di-kalimantan\\/\"]},\"salience\":{\"content\":{\"sentiment\":0},\"title\":{\"sentiment\":0}}}\n";
private String youtube = "{\"interaction\":{\"author\":{\"link\":\"http:\\/\\/youtube.com\\/y_4c0uy7ikisinxqjnquva\",\"name\":\"y_4c0uy7ikisinxqjnquva\"},\"content\":\"comenten y subo otros.\",\"contenttype\":\"html\",\"created_at\":\"Thu, 03 Oct 2013 01:54:22 +0000\",\"id\":\"1e32bc7c8499aa80e0612c55f4551676\",\"link\":\"http:\\/\\/www.youtube.com\\/watch?v=Yj6ckGTNJ7M\",\"schema\":{\"version\":3},\"title\":\"FA! Fuerte Apache NO TE CONFUNDAS 2013] DEMO\",\"type\":\"youtube\"},\"language\":{\"confidence\":62,\"tag\":\"es\"},\"salience\":{\"content\":{\"sentiment\":7},\"title\":{\"sentiment\":0}},\"youtube\":{\"author\":{\"link\":\"http:\\/\\/youtube.com\\/y_4c0uy7ikisinxqjnquva\",\"name\":\"y_4c0uy7ikisinxqjnquva\"},\"category\":\"People & Blogs\",\"content\":\"comenten y subo otros.\",\"contenttype\":\"html\",\"created_at\":\"Thu, 03 Oct 2013 01:04:41 +0000\",\"duration\":\"219\",\"id\":\"1e32bc7c8499aa80e0612c55f4551676\",\"title\":\"FA! Fuerte Apache NO TE CONFUNDAS 2013
] DEMO\",\"type\":\"video\",\"videolink\":\"http:\\/\\/www.youtube.com\\/watch?v=Yj6ckGTNJ7M\"}}\n";
private String board = "{\"board\":{\"anchor\":\"post8881971\",\"author\":{\"avatar\":\"http:\\/\\/www.gstatic.com\\/psa\\/static\\/1.gif\",\"link\":\"http:\\/\\/forums.techguy.org\\/members\\/496500-phantom010.html\",\"location\":\"Cyberspace\",\"name\":\"Phantom010\",\"registered\":\"Sun, 01 Mar 2009 00:00:00 +0000\",\"username\":\"Phantom010\"},\"boardname\":\"Tech Support Guy - Free help for Windows 7, Vista, XP, and more!\",\"categories\":\"Software\",\"content\":\"<div class=\\\"KonaBody\\\"><div style=\\\"margin:20px; margin-top:5px; \\\"> <div class=\\\"smallfont\\\" style=\\\"margin-bottom:2px\\\">Quote:<\\/div> <table cellpadding=\\\"6\\\" cellspacing=\\\"0\\\" border=\\\"0\\\" width=\\\"100%\\\"> <tr> <td class=\\\"alt2\\\" style=\\\"border:1px inset\\\"> <div>\\n\\t\\t\\t\\t\\tOriginally Posted by <strong>donsor<\\/strong> <a href=\\\"http:\\/\\/forums.techguy.org\\/all-other-software\\/1122437-apache-open-office.html#post8881771\\\" rel=\\\"nofollow\\\"><img class=\
\\"inlineimg sprite-viewpost\\\" pagespeed_lazy_src=\\\"http:\\/\\/attach.tsgstatic.com\\/tsg\\/styles\\/common\\/blank.png\\\" width=\\\"12\\\" height=\\\"12\\\" alt=\\\"View Post\\\" src=\\\"http:\\/\\/www.gstatic.com\\/psa\\/static\\/1.gif\\\" onload=\\\"pagespeed.lazyLoadImages.loadIfVisible(this);\\\"\\/><\\/a> <\\/div> <div style=\\\"font-style:italic\\\">I tried downloading Libreoffice but encountered some problem. No big deal I'll find something else.<\\/div> <\\/td> <\\/tr> <\\/table> <\\/div>What kind of problem?<\\/div>\",\"contenttype\":\"html\",\"countrycode\":\"US\",\"crawled\":\"Sun, 23 Mar 2014 09:18:13 +0000\",\"created_at\":\"Sat, 22 Mar 2014 16:03:00 -0500\",\"domain\":\"www.techguy.org\",\"forumid\":\"469bbbf115a\",\"forumname\":\"All Other Software\",\"forumurl\":\"http:\\/\\/forums.techguy.org\\/18-all-other-software\\/\",\"gmt\":\"-5\",\"id\":\"1e3b20559a7daa00e072105aaa3de61e\",\"language\":\"English\",\"link\":\"http:\\/\\/forums.techguy.org\\/all-other-soft
ware\\/1122437-apache-open-office.html#post8881971\",\"siteid\":\"3cbc7f773\",\"thread\":\"http:\\/\\/forums.techguy.org\\/all-other-software\\/1122437-apache-open-office.html\",\"threadid\":\"1122437\",\"title\":\"Apache Open Office\",\"topics\":\"Computers\",\"type\":\"post\"},\"interaction\":{\"author\":{\"avatar\":\"http:\\/\\/www.gstatic.com\\/psa\\/static\\/1.gif\",\"link\":\"http:\\/\\/forums.techguy.org\\/members\\/496500-phantom010.html\",\"location\":\"Cyberspace\",\"name\":\"Phantom010\",\"registered\":\"Sun, 01 Mar 2009 00:00:00 +0000\",\"username\":\"Phantom010\"},\"content\":\"<div class=\\\"KonaBody\\\"><div style=\\\"margin:20px; margin-top:5px; \\\"> <div class=\\\"smallfont\\\" style=\\\"margin-bottom:2px\\\">Quote:<\\/div> <table cellpadding=\\\"6\\\" cellspacing=\\\"0\\\" border=\\\"0\\\" width=\\\"100%\\\"> <tr> <td class=\\\"alt2\\\" style=\\\"border:1px inset\\\"> <div>\\n\\t\\t\\t\\t\\tOriginally Posted by <strong>donsor<\\/strong> <a href=\\\"http:\\/\\/foru
ms.techguy.org\\/all-other-software\\/1122437-apache-open-office.html#post8881771\\\" rel=\\\"nofollow\\\"><img class=\\\"inlineimg sprite-viewpost\\\" pagespeed_lazy_src=\\\"http:\\/\\/attach.tsgstatic.com\\/tsg\\/styles\\/common\\/blank.png\\\" width=\\\"12\\\" height=\\\"12\\\" alt=\\\"View Post\\\" src=\\\"http:\\/\\/www.gstatic.com\\/psa\\/static\\/1.gif\\\" onload=\\\"pagespeed.lazyLoadImages.loadIfVisible(this);\\\"\\/><\\/a> <\\/div> <div style=\\\"font-style:italic\\\">I tried downloading Libreoffice but encountered some problem. No big deal I'll find something else.<\\/div> <\\/td> <\\/tr> <\\/table> <\\/div>What kind of problem?<\\/div>\",\"contenttype\":\"html\",\"created_at\":\"Sat, 22 Mar 2014 16:03:00 -0500\",\"id\":\"1e3b20559a7daa00e072105aaa3de61e\",\"link\":\"http:\\/\\/forums.techguy.org\\/all-other-software\\/1122437-apache-open-office.html#post8881971\",\"schema\":{\"version\":3},\"subtype\":\"post\",\"title\":\"Apache Open Office\",\"type\":\"board\"},\"langua
ge\":{\"confidence\":99,\"tag\":\"en\",\"tag_extended\":\"en\"},\"salience\":{\"content\":{\"sentiment\":-4},\"title\":{\"sentiment\":0}}}";
private String blog = "{\"blog\":{\"author\":{\"name\":\"Stileex\"},\"blog\":{\"link\":\"http:\\/\\/laptop.toprealtime.net\\/\",\"title\":\"Laptop News\"},\"blogid\":\"71107731\",\"content\":\"If you are a gaming freak and play video games even when you are on the move, you need a gaming laptop that gives a smooth and fast performance and has great graphics and audio output. MSI has come up with a small and lightweight laptop with stunning\\u00A0\\u2026\",\"contenttype\":\"html\",\"created_at\":\"Sat, 22 Mar 2014 21:35:23 +0000\",\"domain\":\"laptop.toprealtime.net\",\"guid\":\"8921b89d1faad3d22a6e2a7ed55765e1\",\"id\":\"1e3b209dfc61af80e072ce1ee22bb088\",\"lang\":\"en\",\"link\":\"http:\\/\\/laptop.toprealtime.net\\/2014\\/msi-ge60-apache-pro-003-15-6%e2%80%b3-excellent-gaming-laptop-pressandupdate\\/\",\"parseddate\":\"Sun, 23 Mar 2014 00:21:59 +0000\",\"postid\":\"ed8205a9-2812-4fa6-a730-692b75adc9d7\",\"title\":\"MSI GE60 Apache Pro 003 15.6\\u2033: Excellent Gaming Laptop \
\u2013 PressAndUpdate\",\"type\":\"post\"},\"interaction\":{\"author\":{\"name\":\"Stileex\"},\"content\":\"If you are a gaming freak and play video games even when you are on the move, you need a gaming laptop that gives a smooth and fast performance and has great graphics and audio output. MSI has come up with a small and lightweight laptop with stunning\\u00A0\\u2026\",\"contenttype\":\"html\",\"created_at\":\"Sat, 22 Mar 2014 21:35:23 +0000\",\"id\":\"1e3b209dfc61af80e072ce1ee22bb088\",\"link\":\"http:\\/\\/laptop.toprealtime.net\\/2014\\/msi-ge60-apache-pro-003-15-6%e2%80%b3-excellent-gaming-laptop-pressandupdate\\/\",\"received_at\":4743640105474628098,\"schema\":{\"version\":3},\"subtype\":\"post\",\"title\":\"MSI GE60 Apache Pro 003 15.6\\u2033: Excellent Gaming Laptop \\u2013 PressAndUpdate\",\"type\":\"blog\"},\"language\":{\"confidence\":99,\"tag\":\"en\",\"tag_extended\":\"en\"},\"salience\":{\"content\":{\"entities\":[{\"name\":\"MSI\",\"sentiment\":0,\"confident\":1,\"
label\":\"Company\",\"evidence\":2,\"type\":\"Company\",\"about\":1,\"themes\":[\"fast performance\",\"great graphics\",\"audio output\",\"gaming freak\",\"gaming laptop\",\"lightweight laptop\",\"stunning \\u2026\"]}],\"sentiment\":2,\"topics\":[{\"name\":\"Video Games\",\"score\":0.64548820257187,\"additional\":\"If you are a gaming freak and play video games even when you are on the move, you need a gaming laptop that gives a smooth and fast performance and has great graphics and audio output. MSI has come up with a small and lightweight laptop with stunning \\u2026\",\"hits\":0},{\"name\":\"Hardware\",\"score\":0.45651730895042,\"additional\":\"If you are a gaming freak and play video games even when you are on the move, you need a gaming laptop that gives a smooth and fast performance and has great graphics and audio output. MSI has come up with a small and lightweight laptop with stunning \\u2026\",\"hits\":0}]},\"title\":{\"sentiment\":6,\"topics\":[{\"name\":\"Video Games\",
\"score\":0.62834107875824,\"additional\":\"MSI GE60 Apache Pro 003 15.6\\u2033: Excellent Gaming Laptop \\u2013 PressAndUpdate\",\"hits\":0}]}}}";
-// @Test
-// public void testDetectTwitter() {
-// Class result = TwitterEventClassifier.detectClass(twitter);
-// if( !result.equals(Tweet.class) )
-// Assert.fail();
-// }
+ @Test
+ public void testDetectTwitter() {
+ Class result = DatasiftEventClassifier.detectClass(twitter);
+ if( !result.equals(Twitter.class) )
+ Assert.fail();
+ }
@Test
public void testDetectFacebook() {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-provider-sysomos/src/main/xmlschema/com/sysomos/sysomos.xsd
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/xmlschema/com/sysomos/sysomos.xsd b/streams-contrib/streams-provider-sysomos/src/main/xmlschema/com/sysomos/sysomos.xsd
index 6558189..b41ecfe 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/xmlschema/com/sysomos/sysomos.xsd
+++ b/streams-contrib/streams-provider-sysomos/src/main/xmlschema/com/sysomos/sysomos.xsd
@@ -50,7 +50,7 @@
<xs:element type="xs:anyURI" name="tweetJsonLink" minOccurs="0"/>
<xs:element type="xs:anyURI" name="tweetHbLink" minOccurs="0"/>
<xs:element type="xs:string" name="twitterFollowers" minOccurs="0"/>
- <xs:element type="xs:string" name=" " minOccurs="0"/>
+ <xs:element type="xs:string" name="twitterFollowing" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
</xs:element>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
index 5e88bdd..aca8dc4 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
@@ -43,11 +43,16 @@ public class TwitterTypeConverter implements StreamsProcessor {
public final static String TERMINATE = new String("TERMINATE");
+ public TwitterTypeConverter() {
+ // for pig
+ }
+
public TwitterTypeConverter(Class inClass, Class outClass) {
this.inClass = inClass;
this.outClass = outClass;
}
+
public Queue<StreamsDatum> getProcessorOutputQueue() {
return outQueue;
}
@@ -175,9 +180,19 @@ public class TwitterTypeConverter implements StreamsProcessor {
}
@Override
- public void prepare(Object o) {
+ public void prepare(Object configurationObject) {
mapper = new StreamsTwitterMapper();
twitterJsonActivitySerializer = new TwitterJsonActivitySerializer();
+ if( configurationObject instanceof String[] ) {
+ // for pig
+ String[] prepareArgs = (String[]) configurationObject;
+ try {
+ inClass = Class.forName(prepareArgs[0]);
+ outClass = Class.forName(prepareArgs[1]);
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
index 6b61036..da17af6 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java
@@ -24,6 +24,18 @@ public class StreamsTwitterMapper extends StreamsJacksonMapper {
public static final DateTimeFormatter TWITTER_FORMAT = DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss Z yyyy");
+ public static final Long getMillis(String dateTime) {
+
+ // this function is for pig which doesn't handle exceptions well
+ try {
+ Long result = TWITTER_FORMAT.parseMillis(dateTime);
+ return result;
+ } catch( Exception e ) {
+ return null;
+ }
+
+ }
+
private static final StreamsTwitterMapper INSTANCE = new StreamsTwitterMapper();
public static StreamsTwitterMapper getInstance(){
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
index 51e39b1..0d13eaa 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
@@ -87,7 +87,7 @@ public class TwitterJsonRetweetActivitySerializer implements ActivitySerializer<
} catch( Exception e ) {
throw new ActivitySerializerException("Unable to determine content", e);
}
- activity.setUrl("http://twitter.com/" + retweet.getIdStr());
+ activity.setUrl("http://twitter.com/" + retweet.getUser().getIdStr() + "/status/" + retweet.getIdStr());
activity.setLinks(TwitterJsonTweetActivitySerializer.getLinks(retweet.getRetweetedStatus()));
addTwitterExtension(activity, mapper.convertValue(retweet, ObjectNode.class));
addLocationExtension(activity, retweet);
@@ -130,7 +130,6 @@ public class TwitterJsonRetweetActivitySerializer implements ActivitySerializer<
public static void addLocationExtension(Activity activity, Retweet retweet) {
Map<String, Object> extensions = ensureExtensions(activity);
Map<String, Object> location = new HashMap<String, Object>();
- location.put("id", TwitterJsonActivitySerializer.formatId(retweet.getIdStr()));
location.put("coordinates", retweet.getCoordinates());
extensions.put("location", location);
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
index e830884..74c011a 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
@@ -82,7 +82,7 @@ public class TwitterJsonTweetActivitySerializer implements ActivitySerializer<St
activity.setProvider(getProvider());
activity.setTitle("");
activity.setContent(tweet.getText());
- activity.setUrl("http://twitter.com/" + tweet.getIdStr());
+ activity.setUrl("http://twitter.com/" + tweet.getUser().getIdStr() + "/status/" + tweet.getIdStr());
activity.setLinks(getLinks(tweet));
addTwitterExtension(activity, mapper.convertValue(tweet, ObjectNode.class));
@@ -132,12 +132,6 @@ public class TwitterJsonTweetActivitySerializer implements ActivitySerializer<St
public static void addLocationExtension(Activity activity, Tweet tweet) {
Map<String, Object> extensions = ensureExtensions(activity);
Map<String, Object> location = new HashMap<String, Object>();
- location.put("id", formatId(
- Optional.fromNullable(
- tweet.getIdStr())
- .or(Optional.of(tweet.getId().toString()))
- .orNull()
- ));
location.put("coordinates", tweet.getCoordinates());
extensions.put("location", location);
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java
index 3f43477..c0768e3 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java
@@ -12,6 +12,7 @@ import org.apache.streams.pojo.json.ActivityObject;
import org.apache.streams.pojo.json.Actor;
import org.apache.streams.twitter.pojo.Delete;
import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.UserstreamEvent;
import java.util.List;
@@ -46,42 +47,45 @@ public class TwitterJsonUserstreameventActivitySerializer implements ActivitySer
return null;
}
- public Activity convert(ObjectNode event) throws ActivitySerializerException {
+ public Activity convert(ObjectNode item) throws ActivitySerializerException {
ObjectMapper mapper = StreamsTwitterMapper.getInstance();
- Delete delete = null;
+ UserstreamEvent event = null;
try {
- delete = mapper.treeToValue(event, Delete.class);
+ event = mapper.treeToValue(item, UserstreamEvent.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
Activity activity = new Activity();
- activity.setActor(buildActor(delete));
- activity.setVerb("delete");
- activity.setObject(buildActivityObject(delete));
- activity.setId(TwitterJsonActivitySerializer.formatId(activity.getVerb(), delete.getDelete().getStatus().getIdStr()));
+ activity.setActor(buildActor(event));
+ activity.setVerb(detectVerb(event));
+ activity.setObject(buildActivityObject(event));
+ activity.setId(TwitterJsonActivitySerializer.formatId(activity.getVerb()));
if(Strings.isNullOrEmpty(activity.getId()))
throw new ActivitySerializerException("Unable to determine activity id");
activity.setProvider(getProvider());
- addTwitterExtension(activity, event);
return activity;
}
- public Actor buildActor(Delete delete) {
+ public Actor buildActor(UserstreamEvent event) {
Actor actor = new Actor();
- actor.setId(formatId(delete.getDelete().getStatus().getUserIdStr()));
+ //actor.setId(formatId(delete.getDelete().getStatus().getUserIdStr()));
return actor;
}
- public ActivityObject buildActivityObject(Delete delete) {
+ public ActivityObject buildActivityObject(UserstreamEvent event) {
ActivityObject actObj = new ActivityObject();
- actObj.setId(formatId(delete.getDelete().getStatus().getIdStr()));
- actObj.setObjectType("tweet");
+ //actObj.setId(formatId(delete.getDelete().getStatus().getIdStr()));
+ //actObj.setObjectType("tweet");
return actObj;
}
- public ActivityObject buildTarget(Tweet tweet) {
+ public String detectVerb(UserstreamEvent event) {
+ return null;
+ }
+
+ public ActivityObject buildTarget(UserstreamEvent event) {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/UserstreamEvent.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/UserstreamEvent.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/UserstreamEvent.json
index 652567e..07b5883 100644
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/UserstreamEvent.json
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/UserstreamEvent.json
@@ -10,9 +10,23 @@
},
"event_type": {
"type": "string",
- "items": {
- "type": "integer"
- }
+ "enum" : [
+ "access_revoked",
+ "block",
+ "unblock",
+ "favorite",
+ "unfavorite",
+ "follow",
+ "unfollow",
+ "list_created",
+ "list_destroyed",
+ "list_updated",
+ "list_member_added",
+ "list_member_removed",
+ "list_user_subscribed",
+ "list_user_unsubscribed",
+ "user_update"
+ ]
},
"source": {
"type": "string",
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
index 1f6c86d..241aaed 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java
@@ -45,8 +45,6 @@ public class TweetSerDeTest {
public void Tests()
{
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
- mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
- mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
InputStream is = TweetSerDeTest.class.getResourceAsStream("/testtweets.txt");
InputStreamReader isr = new InputStreamReader(is);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java b/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
index 78623b0..9866697 100644
--- a/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
+++ b/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
@@ -117,6 +117,10 @@ public class StreamsDatum implements Serializable {
return id;
}
+ public void setId(String id) {
+ this.id = id;
+ }
+
@Override
public boolean equals(Object o) {
if(o instanceof StreamsDatum) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
index f9934b6..f489911 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java
@@ -32,7 +32,7 @@ public class StreamsJacksonMapper extends ObjectMapper {
super();
registerModule(new StreamsJacksonModule());
disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
- configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
+ configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, Boolean.TRUE);
configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
@@ -42,6 +42,7 @@ public class StreamsJacksonMapper extends ObjectMapper {
configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, Boolean.FALSE);
configure(SerializationFeature.WRITE_EMPTY_JSON_ARRAYS, Boolean.FALSE);
configure(SerializationFeature.WRITE_NULL_MAP_VALUES, Boolean.FALSE);
+ configure(SerializationFeature.WRITE_NULL_MAP_VALUES, Boolean.FALSE);
setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.DEFAULT);
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
index bcf0f65..6fcc1f1 100644
--- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
+++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java
@@ -4,6 +4,8 @@ import com.fasterxml.jackson.databind.module.SimpleModule;
import org.joda.time.DateTime;
import org.joda.time.Period;
+import java.util.Map;
+
/**
* Created by sblackmon on 3/27/14.
*/
@@ -16,6 +18,9 @@ public class StreamsJacksonModule extends SimpleModule {
addSerializer(Period.class, new StreamsPeriodSerializer(Period.class));
addDeserializer(Period.class, new StreamsPeriodDeserializer(Period.class));
+
+ addSerializer(Map.class, new StreamsMapSerializer(Map.class));
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-runtimes/streams-runtime-pig/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/pom.xml b/streams-runtimes/streams-runtime-pig/pom.xml
index e62a7c8..2b0a9c6 100644
--- a/streams-runtimes/streams-runtime-pig/pom.xml
+++ b/streams-runtimes/streams-runtime-pig/pom.xml
@@ -29,8 +29,8 @@
<artifactId>streams-runtime-pig</artifactId>
<properties>
- <hadoop-client.version>2.0.0-cdh4.5.0.1-SNAPSHOT</hadoop-client.version>
- <pig.version>0.11.0-cdh4.5.0.1-SNAPSHOT</pig.version>
+ <hadoop-client.version>2.0.0-cdh4.5.0</hadoop-client.version>
+ <pig.version>0.11.0-cdh4.5.0</pig.version>
</properties>
<dependencies>
@@ -44,18 +44,18 @@
<artifactId>streams-core</artifactId>
<version>0.1-SNAPSHOT</version>
</dependency>
- <dependency>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-runtime-local</artifactId>
- <version>0.1-SNAPSHOT</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-runtime-local</artifactId>
- <version>0.1-SNAPSHOT</version>
- <scope>test-jar</scope>
- </dependency>
+ <!--<dependency>-->
+ <!--<groupId>org.apache.streams</groupId>-->
+ <!--<artifactId>streams-runtime-local</artifactId>-->
+ <!--<version>0.1-SNAPSHOT</version>-->
+ <!--<scope>test-jar</scope>-->
+ <!--</dependency>-->
+ <!--<dependency>-->
+ <!--<groupId>org.apache.streams</groupId>-->
+ <!--<artifactId>streams-runtime-local</artifactId>-->
+ <!--<version>0.1-SNAPSHOT</version>-->
+ <!--<scope>test</scope>-->
+ <!--</dependency>-->
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-provider-twitter</artifactId>
@@ -89,6 +89,10 @@
<artifactId>pigunit</artifactId>
</dependency>
<dependency>
+ <groupId>com.linkedin.datafu</groupId>
+ <artifactId>datafu</artifactId>
+ </dependency>
+ <dependency>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
<version>2.11</version>
@@ -122,6 +126,13 @@
<version>${pig.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.linkedin.datafu</groupId>
+ <artifactId>datafu</artifactId>
+ <version>1.2.0</version>
+ <scope>compile</scope>
+ </dependency>
+
</dependencies>
</dependencyManagement>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
index 6b4705d..2f7538d 100644
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDatumExec.java
@@ -2,6 +2,7 @@ package org.apache.streams.pig;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import datafu.pig.util.AliasableEvalFunc;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.EvalFunc;
@@ -10,9 +11,12 @@ import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.UDFContext;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.util.RFC3339Utils;
+import org.joda.time.DateTime;
import java.io.IOException;
import java.util.Arrays;
@@ -23,7 +27,7 @@ import java.util.concurrent.TimeUnit;
* Created by sblackmon on 3/25/14.
*/
@MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 30, intDefault = 10)
-public class StreamsProcessDatumExec extends EvalFunc<DataBag> {
+public class StreamsProcessDatumExec extends AliasableEvalFunc<DataBag> {
TupleFactory mTupleFactory = TupleFactory.getInstance();
BagFactory mBagFactory = BagFactory.getInstance();
@@ -46,19 +50,26 @@ public class StreamsProcessDatumExec extends EvalFunc<DataBag> {
}
@Override
- public DataBag exec(Tuple line) throws IOException {
+ public DataBag exec(Tuple input) throws IOException {
- if (line == null || line.size() == 0)
+ if (input == null || input.size() == 0)
return null;
+ DataBag output = BagFactory.getInstance().newDefaultBag();
+
Configuration conf = UDFContext.getUDFContext().getJobConf();
- String id = (String)line.get(0);
- String provider = (String)line.get(1);
- Long timestamp = (Long)line.get(2);
- String object = (String)line.get(3);
+ String id = getString(input, "id");
+ String source = getString(input, "source");
+ Long timestamp;
+ try {
+ timestamp = getLong(input, "timestamp");
+ } catch( Exception e ) {
+ timestamp = RFC3339Utils.parseUTC(getString(input, "timestamp")).getMillis();
+ }
+ String object = getString(input, "object");
- StreamsDatum entry = new StreamsDatum(object);
+ StreamsDatum entry = new StreamsDatum(object, id, new DateTime(timestamp));
List<StreamsDatum> resultSet = streamsProcessor.process(entry);
List<Tuple> resultTupleList = Lists.newArrayList();
@@ -66,7 +77,7 @@ public class StreamsProcessDatumExec extends EvalFunc<DataBag> {
for( StreamsDatum resultDatum : resultSet ) {
Tuple tuple = mTupleFactory.newTuple();
tuple.append(id);
- tuple.append(provider);
+ tuple.append(source);
tuple.append(timestamp);
tuple.append(resultDatum.getDocument());
resultTupleList.add(tuple);
@@ -81,4 +92,9 @@ public class StreamsProcessDatumExec extends EvalFunc<DataBag> {
public void finish() {
streamsProcessor.cleanUp();
}
+
+ @Override
+ public Schema getOutputSchema(Schema schema) {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
index 5b0890f..5455fd9 100644
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java
@@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import datafu.pig.util.SimpleEvalFunc;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.EvalFunc;
@@ -28,79 +29,53 @@ import java.util.concurrent.TimeUnit;
* Created by sblackmon on 3/25/14.
*/
@MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 30, intDefault = 10)
-public class StreamsProcessDocumentExec extends EvalFunc<String> {
-
- TupleFactory mTupleFactory = TupleFactory.getInstance();
- BagFactory mBagFactory = BagFactory.getInstance();
+public class StreamsProcessDocumentExec extends SimpleEvalFunc<String> {
StreamsProcessor streamsProcessor;
ObjectMapper mapper = StreamsJacksonMapper.getInstance();
public StreamsProcessDocumentExec(String... execArgs) throws ClassNotFoundException{
- System.out.println("A");
Preconditions.checkNotNull(execArgs);
- System.out.println(Arrays.toString(execArgs));
- System.out.println("B");
Preconditions.checkArgument(execArgs.length > 0);
- System.out.println(execArgs.length);
String classFullName = execArgs[0];
- System.out.println("C");
Preconditions.checkNotNull(classFullName);
- System.out.println("E");
+ String[] prepareArgs = (String[]) ArrayUtils.remove(execArgs, 0);
streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(classFullName));
if( execArgs.length == 1 ) {
- System.out.println("PREPARE NULL");
streamsProcessor.prepare(null);
- } else if( execArgs.length == 2 ) {
- System.out.println("PREPARE 1 " + execArgs[1]);
- streamsProcessor.prepare(execArgs[1]);
- } else {
- System.out.println("PREPARE ARRAY " + Arrays.toString(execArgs));
- streamsProcessor.prepare(execArgs);
+ } else if( execArgs.length > 1 ) {
+ streamsProcessor.prepare(prepareArgs);
}
- System.out.println("F");
}
- @Override
- public String exec(Tuple input) throws IOException {
+ public String call(String input) throws IOException {
- System.out.println("H");
Preconditions.checkNotNull(streamsProcessor);
- Preconditions.checkNotNull(input);
- Preconditions.checkArgument(input.size() == 1);
- System.out.println("I");
-
- String document = (String) input.get(0);
-
- Preconditions.checkNotNull(document);
- // System.out.println(document);
+ try {
- StreamsDatum entry = new StreamsDatum(document);
+ Preconditions.checkNotNull(input);
- Preconditions.checkNotNull(entry);
+ StreamsDatum entry = new StreamsDatum(input);
- // System.out.println(entry);
+ Preconditions.checkNotNull(entry);
- List<StreamsDatum> resultSet = streamsProcessor.process(entry);
+ List<StreamsDatum> resultSet = streamsProcessor.process(entry);
- System.out.println(resultSet);
+ Object resultDoc = null;
+ resultDoc = resultSet.get(0).getDocument();
- Object resultDoc = null;
- for( StreamsDatum resultDatum : resultSet ) {
- resultDoc = resultDatum.getDocument();
- }
-
- Preconditions.checkNotNull(resultDoc);
+ Preconditions.checkNotNull(resultDoc);
- System.out.println(resultDoc);
+ if( resultDoc instanceof String )
+ return (String) resultDoc;
+ else
+ return mapper.writeValueAsString(resultDoc);
- if( resultDoc instanceof String )
- return (String) resultDoc;
- else if( resultDoc instanceof ObjectNode)
- return mapper.writeValueAsString(resultDoc);
- else
+ } catch (Exception e) {
+ System.err.println("Error with " + input.toString());
return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/040e2c4c/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
index 67f3c61..50c7fab 100644
--- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
+++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java
@@ -3,6 +3,7 @@ package org.apache.streams.pig;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import datafu.pig.util.SimpleEvalFunc;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.EvalFunc;
@@ -26,7 +27,7 @@ import java.util.concurrent.TimeUnit;
* Created by sblackmon on 3/25/14.
*/
@MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 10, intDefault = 10)
-public class StreamsSerializerExec extends EvalFunc<String> {
+public class StreamsSerializerExec extends SimpleEvalFunc<String> {
ActivitySerializer activitySerializer;
ObjectMapper mapper = StreamsJacksonMapper.getInstance();
@@ -42,26 +43,24 @@ public class StreamsSerializerExec extends EvalFunc<String> {
activitySerializer = StreamsComponentFactory.getSerializerInstance(Class.forName(classFullName));
}
- @Override
- public String exec(Tuple input) throws IOException {
+ public String call(String input) throws IOException {
Preconditions.checkNotNull(activitySerializer);
Preconditions.checkNotNull(input);
- Preconditions.checkArgument(input.size() == 1);
- Configuration conf = UDFContext.getUDFContext().getJobConf();
- String document = (String) input.get(0);
-
- Preconditions.checkNotNull(document);
Activity activity = null;
try {
- activity = activitySerializer.deserialize(document);
+ activity = activitySerializer.deserialize(input);
+
+ Preconditions.checkNotNull(activity);
+
+ return mapper.writeValueAsString(activity);
+
} catch( Exception e ) {
e.printStackTrace();
- }
- Preconditions.checkNotNull(activity);
- return mapper.writeValueAsString(activity);
+ return null;
+ }
}