You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/22 00:20:19 UTC
[41/71] [abbrv] git commit: adding uncommitted core classes,
and updates to twitter, es, mongo, hdfs
adding uncommitted core classes, and updates to twitter, es, mongo, hdfs
git-svn-id: https://svn.apache.org/repos/asf/incubator/streams/branches/STREAMS-26@1572204 13f79535-47bb-0310-9956-ffa450edef68
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/4705fcb7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/4705fcb7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/4705fcb7
Branch: refs/heads/master
Commit: 4705fcb7597550fb5fae9c85c2ec537264291038
Parents: c028136
Author: sblackmon <sb...@unknown>
Authored: Wed Feb 26 19:15:42 2014 +0000
Committer: sblackmon <sb...@unknown>
Committed: Wed Feb 26 19:15:42 2014 +0000
----------------------------------------------------------------------
streams-contrib/pom.xml | 1 +
.../streams-persist-elasticsearch/pom.xml | 1 +
.../ElasticsearchPersistWriter.java | 4 +
.../elasticsearch/PercolateProcessor.java | 150 +++++++++++
.../ElasticsearchWriterConfiguration.json | 18 ++
.../streams/hdfs/WebHdfsPersistWriter.java | 18 +-
.../streams/hdfs/WebHdfsPersistWriterTask.java | 2 +-
.../streams/mongo/MongoPersistWriter.java | 36 +--
.../streams/mongo/MongoPersistWriterTask.java | 38 ---
.../twitter/provider/TwitterEventProcessor.java | 194 --------------
.../provider/TwitterProfileProcessor.java | 111 --------
.../twitter/provider/TwitterStreamProvider.java | 1 +
.../provider/TwitterTimelineProvider.java | 195 +++++++++-----
.../provider/TwitterTimelineProviderTask.java | 2 +-
.../twitter/provider/TwitterTypeConverter.java | 199 --------------
.../apache/streams/core/StreamsOperation.java | 23 ++
.../core/builders/InvalidStreamException.java | 23 ++
.../core/builders/LocalStreamBuilder.java | 256 +++++++++++++++++++
.../streams/core/builders/StreamBuilder.java | 97 +++++++
.../streams/core/builders/StreamComponent.java | 217 ++++++++++++++++
.../streams/core/tasks/BaseStreamsTask.java | 89 +++++++
.../streams/core/tasks/StreamsMergeTask.java | 58 +++++
.../core/tasks/StreamsPersistWriterTask.java | 103 ++++++++
.../core/tasks/StreamsProcessorTask.java | 102 ++++++++
.../streams/core/tasks/StreamsProviderTask.java | 132 ++++++++++
.../apache/streams/core/tasks/StreamsTask.java | 58 +++++
26 files changed, 1490 insertions(+), 638 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index ad559d8..07660ee 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -40,6 +40,7 @@
<module>streams-persist-elasticsearch</module>
<module>streams-persist-hdfs</module>
<module>streams-persist-kafka</module>
+ <module>streams-persist-mongo</module>
<!--<module>streams-provider-datasift</module>-->
<!--<module>streams-provider-facebook</module>-->
<!--<module>streams-provider-gnip</module>-->
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/streams-persist-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/pom.xml b/streams-contrib/streams-persist-elasticsearch/pom.xml
index 2b99973..84aad95 100644
--- a/streams-contrib/streams-persist-elasticsearch/pom.xml
+++ b/streams-contrib/streams-persist-elasticsearch/pom.xml
@@ -74,6 +74,7 @@
<generateBuilders>true</generateBuilders>
<sourcePaths>
<sourcePath>src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchConfiguration.json</sourcePath>
+ <sourcePath>src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json</sourcePath>
</sourcePaths>
<outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
<targetPackage>org.apache.streams.elasticsearch.pojo</targetPackage>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 6afe959..12d6d06 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -116,6 +116,10 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
this.config = ElasticsearchConfigurator.detectConfiguration(config);
}
+ public ElasticsearchPersistWriter(ElasticsearchConfiguration config) {
+ this.config = config;
+ }
+
private static final int BYTES_IN_MB = 1024*1024;
private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
private volatile int totalByteCount = 0;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
new file mode 100644
index 0000000..e625381
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
@@ -0,0 +1,150 @@
+package org.apache.streams.elasticsearch;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.action.percolate.PercolateResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * References:
+ * Some helpful references to help
+ * Purpose URL
+ * ------------- ----------------------------------------------------------------
+ * [Status Codes] http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
+ * [Test Cases] http://greenbytes.de/tech/tc/httpredirects/
+ * [t.co behavior] https://dev.twitter.com/docs/tco-redirection-behavior
+ */
+
+public class PercolateProcessor implements StreamsProcessor, Runnable
+{
+ private final static Logger LOGGER = LoggerFactory.getLogger(PercolateProcessor.class);
+
+ private ObjectMapper mapper = new ObjectMapper();
+
+ protected Queue<StreamsDatum> inQueue;
+ protected Queue<StreamsDatum> outQueue;
+
+ private ElasticsearchWriterConfiguration config;
+ private ElasticsearchClientManager manager;
+
+ public PercolateProcessor(Queue<StreamsDatum> inQueue) {
+ this.inQueue = inQueue;
+ this.outQueue = new LinkedBlockingQueue<StreamsDatum>();
+ }
+
+ public ElasticsearchClientManager getManager() {
+ return manager;
+ }
+
+ public void setManager(ElasticsearchClientManager manager) {
+ this.manager = manager;
+ }
+
+ public ElasticsearchWriterConfiguration getConfig() {
+ return config;
+ }
+
+ public void setConfig(ElasticsearchWriterConfiguration config) {
+ this.config = config;
+ }
+
+ public void start() {
+ Preconditions.checkNotNull(config);
+ Preconditions.checkNotNull(manager);
+ Preconditions.checkNotNull(manager.getClient());
+ }
+
+ public void stop() {
+
+ }
+
+ public Queue<StreamsDatum> getProcessorOutputQueue() {
+ return outQueue;
+ }
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+
+ List<StreamsDatum> result = Lists.newArrayList();
+
+ String json;
+ ObjectNode node;
+ // first check for valid json
+ if( entry.getDocument() instanceof String ) {
+ json = (String) entry.getDocument();
+ try {
+ node = (ObjectNode) mapper.readTree(json);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return null;
+ }
+ } else {
+ node = (ObjectNode) entry.getDocument();
+ json = node.asText();
+ }
+
+ PercolateResponse response = manager.getClient().preparePercolate(config.getIndex(), config.getType()).setSource(json).execute().actionGet();
+
+ ArrayNode tagArray = JsonNodeFactory.instance.arrayNode();
+
+ for( String match : response.getMatches())
+ tagArray.add(match);
+
+ // need utility methods for get / create specific node
+ ObjectNode extensions = (ObjectNode) node.get("extensions");
+ ObjectNode w2o = (ObjectNode) extensions.get("w2o");
+ w2o.put("tags", tagArray);
+
+ result.add(entry);
+
+ return result;
+
+ }
+
+ @Override
+ public void prepare(Object o) {
+ start();
+ }
+
+ @Override
+ public void cleanUp() {
+ stop();
+ }
+
+ @Override
+ public void run() {
+
+ while(true) {
+ StreamsDatum item;
+ try {
+ item = inQueue.poll();
+
+ Thread.sleep(new Random().nextInt(100));
+
+ for( StreamsDatum entry : process(item)) {
+ outQueue.offer(entry);
+ }
+
+
+ } catch (Exception e) {
+ e.printStackTrace();
+
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
new file mode 100644
index 0000000..21aad5c
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@ -0,0 +1,18 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType" : "org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration",
+ "extends": {"$ref":"ElasticsearchConfiguration.json"},
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "index": {
+ "type": "string",
+ "description": "Index to write to"
+ },
+ "type": {
+ "type": "string",
+ "description": "Type to write as"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index de87ba5..de21055 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -18,6 +18,8 @@ import org.slf4j.LoggerFactory;
import org.apache.streams.hdfs.HdfsConfiguration;
+import java.io.Closeable;
+import java.io.Flushable;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.URI;
@@ -26,7 +28,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
-public class WebHdfsPersistWriter implements StreamsPersistWriter, Runnable
+public class WebHdfsPersistWriter implements StreamsPersistWriter, Runnable, Flushable, Closeable
{
private final static Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistWriter.class);
@@ -300,13 +302,13 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Runnable
}
}
- public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
- this.persistQueue = persistQueue;
- }
-
- public Queue<StreamsDatum> getPersistQueue() {
- return persistQueue;
- }
+// public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+// this.persistQueue = persistQueue;
+// }
+//
+// public Queue<StreamsDatum> getPersistQueue() {
+// return persistQueue;
+// }
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java
index 02e4519..2e90a00 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java
@@ -20,7 +20,7 @@ public class WebHdfsPersistWriterTask implements Runnable {
public void run() {
while(true) {
- if( writer.getPersistQueue().peek() != null ) {
+ if( writer.persistQueue.peek() != null ) {
try {
StreamsDatum entry = writer.persistQueue.remove();
writer.write(entry);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
index 5ab8470..8d8d4b3 100644
--- a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
+++ b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
@@ -21,6 +21,7 @@ import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
+import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
public class MongoPersistWriter implements StreamsPersistWriter, Runnable
@@ -110,14 +111,12 @@ public class MongoPersistWriter implements StreamsPersistWriter, Runnable
client.cleanCursors(true);
}
- @Override
public void start() {
connectToMongo();
}
- @Override
public void stop() {
try {
@@ -132,32 +131,39 @@ public class MongoPersistWriter implements StreamsPersistWriter, Runnable
}
}
- @Override
public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
this.persistQueue = persistQueue;
}
- @Override
public Queue<StreamsDatum> getPersistQueue() {
return persistQueue;
}
-
- @Override
public void run() {
- start();
+ while(true) {
+ if( persistQueue.peek() != null ) {
+ try {
+ StreamsDatum entry = persistQueue.remove();
+ write(entry);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ try {
+ Thread.sleep(new Random().nextInt(1));
+ } catch (InterruptedException e) {}
+ }
- Thread task = new Thread(new MongoPersistWriterTask(this));
- task.start();
+ }
- try {
- task.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- return;
- }
+ @Override
+ public void prepare(Object configurationObject) {
+ start();
+ }
+ @Override
+ public void cleanUp() {
stop();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriterTask.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriterTask.java
deleted file mode 100644
index 398d1cd..0000000
--- a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriterTask.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.apache.streams.mongo;
-
-import org.apache.streams.core.StreamsDatum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Random;
-
-public class MongoPersistWriterTask implements Runnable {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(MongoPersistWriterTask.class);
-
- private MongoPersistWriter writer;
-
- public MongoPersistWriterTask(MongoPersistWriter writer) {
- this.writer = writer;
- }
-
- @Override
- public void run() {
-
- while(true) {
- if( writer.getPersistQueue().peek() != null ) {
- try {
- StreamsDatum entry = writer.persistQueue.remove();
- writer.write(entry);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- try {
- Thread.sleep(new Random().nextInt(1));
- } catch (InterruptedException e) {}
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
deleted file mode 100644
index 871c5f0..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
+++ /dev/null
@@ -1,194 +0,0 @@
-package org.apache.streams.twitter.provider;
-
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.pojo.Delete;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Queue;
-import java.util.Random;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class TwitterEventProcessor implements StreamsProcessor, Runnable {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterEventProcessor.class);
-
- private ObjectMapper mapper = new ObjectMapper();
-
- private BlockingQueue<String> inQueue;
- private Queue<StreamsDatum> outQueue;
-
- private Class inClass;
- private Class outClass;
-
- private TwitterJsonTweetActivitySerializer twitterJsonTweetActivitySerializer = new TwitterJsonTweetActivitySerializer();
- private TwitterJsonRetweetActivitySerializer twitterJsonRetweetActivitySerializer = new TwitterJsonRetweetActivitySerializer();
- private TwitterJsonDeleteActivitySerializer twitterJsonDeleteActivitySerializer = new TwitterJsonDeleteActivitySerializer();
-
- public final static String TERMINATE = new String("TERMINATE");
-
- public TwitterEventProcessor(BlockingQueue<String> inQueue, Queue<StreamsDatum> outQueue, Class inClass, Class outClass) {
- this.inQueue = inQueue;
- this.outQueue = outQueue;
- this.inClass = inClass;
- this.outClass = outClass;
- }
-
- public TwitterEventProcessor(BlockingQueue<String> inQueue, Queue<StreamsDatum> outQueue, Class outClass) {
- this.inQueue = inQueue;
- this.outQueue = outQueue;
- this.outClass = outClass;
- }
-
- @Override
- public void run() {
-
- while(true) {
- String item;
- try {
- item = inQueue.poll();
- if(item instanceof String && item.equals(TERMINATE)) {
- LOGGER.info("Terminating!");
- break;
- }
-
- ObjectNode objectNode = (ObjectNode) mapper.readTree(item);
-
- StreamsDatum rawDatum = new StreamsDatum(objectNode);
-
- for( StreamsDatum entry : process(rawDatum)) {
- outQueue.offer(entry);
- }
-
- } catch (Exception e) {
- e.printStackTrace();
-
- }
- }
- }
-
- public Object convert(ObjectNode event, Class inClass, Class outClass) {
-
- LOGGER.debug(event.toString());
-
- Object result = null;
-
- if( outClass.equals( Activity.class )) {
- if( inClass.equals( Delete.class )) {
- LOGGER.debug("ACTIVITY DELETE");
- result = twitterJsonDeleteActivitySerializer.convert(event);
- } else if ( inClass.equals( Retweet.class )) {
- LOGGER.debug("ACTIVITY RETWEET");
- result = twitterJsonRetweetActivitySerializer.convert(event);
- } else if ( inClass.equals( Tweet.class )) {
- LOGGER.debug("ACTIVITY TWEET");
- result = twitterJsonTweetActivitySerializer.convert(event);
- } else {
- return null;
- }
- } else if( outClass.equals( Tweet.class )) {
- if ( inClass.equals( Tweet.class )) {
- LOGGER.debug("TWEET");
- result = mapper.convertValue(event, Tweet.class);
- }
- } else if( outClass.equals( Retweet.class )) {
- if ( inClass.equals( Retweet.class )) {
- LOGGER.debug("RETWEET");
- result = mapper.convertValue(event, Retweet.class);
- }
- } else if( outClass.equals( Delete.class )) {
- if ( inClass.equals( Delete.class )) {
- LOGGER.debug("DELETE");
- result = mapper.convertValue(event, Delete.class);
- }
- } else if( outClass.equals( ObjectNode.class )) {
- LOGGER.debug("OBJECTNODE");
- result = mapper.convertValue(event, ObjectNode.class);
- }
-
- // no supported conversion were applied
- if( result != null )
- return result;
-
- LOGGER.debug("CONVERT FAILED");
-
- return null;
-
- }
-
- public boolean validate(Object document, Class klass) {
-
- // TODO
- return true;
- }
-
- public boolean isValidJSON(final String json) {
- boolean valid = false;
- try {
- final JsonParser parser = new ObjectMapper().getJsonFactory()
- .createJsonParser(json);
- while (parser.nextToken() != null) {
- }
- valid = true;
- } catch (JsonParseException jpe) {
- LOGGER.warn("validate: {}", jpe);
- } catch (IOException ioe) {
- LOGGER.warn("validate: {}", ioe);
- }
-
- return valid;
- }
-
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
-
- // first check for valid json
- ObjectNode node = (ObjectNode) entry.getDocument();
-
- String json = node.asText();
-
- // since data is coming from outside provider, we don't know what type the events are
- Class inClass = TwitterEventClassifier.detectClass(json);
-
- // if the target is string, just pass-through
- if( java.lang.String.class.equals(outClass))
- return Lists.newArrayList(new StreamsDatum(json));
- else {
- // convert to desired format
- Object out = convert(node, inClass, outClass);
-
- if( out != null && validate(out, outClass))
- return Lists.newArrayList(new StreamsDatum(out));
- }
-
- return Lists.newArrayList();
-
- }
-
- @Override
- public void prepare(Object configurationObject) {
-
- }
-
- @Override
- public void cleanUp() {
-
- }
-};
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProfileProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProfileProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProfileProcessor.java
deleted file mode 100644
index 3f9c24b..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProfileProcessor.java
+++ /dev/null
@@ -1,111 +0,0 @@
-package org.apache.streams.twitter.provider;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.pojo.User;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Queue;
-import java.util.Random;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class TwitterProfileProcessor implements StreamsProcessor, Runnable {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterProfileProcessor.class);
-
- private ObjectMapper mapper = new ObjectMapper();
-
- private Queue<StreamsDatum> inQueue;
- private Queue<StreamsDatum> outQueue;
-
- public final static String TERMINATE = new String("TERMINATE");
-
- @Override
- public void run() {
-
- while(true) {
- StreamsDatum item;
- try {
- item = inQueue.poll();
- if(item.getDocument() instanceof String && item.equals(TERMINATE)) {
- LOGGER.info("Terminating!");
- break;
- }
-
- Thread.sleep(new Random().nextInt(100));
-
- for( StreamsDatum entry : process(item)) {
- outQueue.offer(entry);
- }
-
-
- } catch (Exception e) {
- e.printStackTrace();
-
- }
- }
- }
-
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
-
- List<StreamsDatum> result = Lists.newArrayList();
- String item;
- try {
- // first check for valid json
- // since data is coming from outside provider, we don't know what type the events are
- if( entry.getDocument() instanceof String) {
- item = (String) entry.getDocument();
- } else {
- item = mapper.writeValueAsString((ObjectNode)entry.getDocument());
- }
-
- Class inClass = TwitterEventClassifier.detectClass(item);
-
- User user;
-
- if ( inClass.equals( Tweet.class )) {
- LOGGER.debug("TWEET");
- Tweet tweet = mapper.readValue(item, Tweet.class);
- user = tweet.getUser();
- result.add(new StreamsDatum(user));
- }
- else if ( inClass.equals( Retweet.class )) {
- LOGGER.debug("RETWEET");
- Retweet retweet = mapper.readValue(item, Retweet.class);
- user = retweet.getRetweetedStatus().getUser();
- result.add(new StreamsDatum(user));
- } else {
- return Lists.newArrayList();
- }
-
- return result;
- } catch (Exception e) {
- e.printStackTrace();
- LOGGER.warn("Error processing " + entry.toString());
- return Lists.newArrayList();
- }
- }
-
- @Override
- public void prepare(Object o) {
-
- }
-
- @Override
- public void cleanUp() {
-
- }
-};
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index 0520b0f..34b3ab1 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -20,6 +20,7 @@ import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.twitter.processor.TwitterEventProcessor;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index a91a7ec..052a360 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -2,6 +2,10 @@ package org.apache.streams.twitter.provider;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@@ -15,13 +19,14 @@ import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-import twitter4j.Twitter;
-import twitter4j.TwitterFactory;
+import twitter4j.*;
import twitter4j.conf.ConfigurationBuilder;
+import twitter4j.json.DataObjectFactory;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.Iterator;
+import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.*;
@@ -29,7 +34,9 @@ import java.util.concurrent.*;
/**
* Created by sblackmon on 12/10/13.
*/
-public class TwitterTimelineProvider implements StreamsProvider, Serializable, Runnable {
+public class TwitterTimelineProvider implements StreamsProvider, Serializable {
+
+ private final static String STREAMS_ID = "TwitterTimelineProvider";
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProvider.class);
@@ -45,11 +52,10 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable, R
this.config = config;
}
- protected volatile BlockingQueue<String> inQueue = new LinkedBlockingQueue<String>(10000);
-
protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
protected Twitter client;
+ protected Iterator<Long> ids;
ListenableFuture providerTaskComplete;
//
@@ -92,69 +98,118 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable, R
return this.providerQueue;
}
- public void run() {
-
- executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
-
- Preconditions.checkNotNull(providerQueue);
+// public void run() {
+//
+// LOGGER.info("{} Running", STREAMS_ID);
+//
+// while( ids.hasNext() ) {
+// Long currentId = ids.next();
+// LOGGER.info("Provider Task Starting: {}", currentId);
+// captureTimeline(currentId);
+// }
+//
+// LOGGER.info("{} Finished. Cleaning up...", STREAMS_ID);
+//
+// client.shutdown();
+//
+// LOGGER.info("{} Exiting", STREAMS_ID);
+//
+// while(!providerTaskComplete.isDone() && !providerTaskComplete.isCancelled() ) {
+// try {
+// Thread.sleep(100);
+// } catch (InterruptedException e) {}
+// }
+// }
- Preconditions.checkNotNull(this.klass);
+ private void captureTimeline(long currentId) {
+
+ Paging paging = new Paging(1, 200);
+ List<Status> statuses = null;
+ boolean KeepGoing = true;
+ boolean hadFailure = false;
+
+ do
+ {
+ int keepTrying = 0;
+
+ // keep trying to load, give it 5 attempts.
+ //while (keepTrying < 10)
+ while (keepTrying < 1)
+ {
+
+ try
+ {
+ statuses = client.getUserTimeline(currentId, paging);
+
+ for (Status tStat : statuses)
+ {
+// if( provider.start != null &&
+// provider.start.isAfter(new DateTime(tStat.getCreatedAt())))
+// {
+// // they hit the last date we wanted to collect
+// // we can now exit early
+// KeepGoing = false;
+// }
+ // emit the record
+ String json = DataObjectFactory.getRawJSON(tStat);
+
+ providerQueue.offer(new StreamsDatum(json));
+
+ }
+
+ paging.setPage(paging.getPage() + 1);
+
+ keepTrying = 10;
+ }
+ catch(TwitterException twitterException) {
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+ }
+ catch(Exception e)
+ {
+ hadFailure = true;
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+ }
+ finally
+ {
+ // Shutdown the twitter to release the resources
+ client.shutdown();
+ }
+ }
+ }
+ while ((statuses != null) && (statuses.size() > 0) && KeepGoing);
+ }
- Preconditions.checkNotNull(config.getOauth().getConsumerKey());
- Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
- Preconditions.checkNotNull(config.getOauth().getAccessToken());
- Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
+ public StreamsResultSet readCurrent() {
- Preconditions.checkNotNull(config.getFollow());
+ Preconditions.checkArgument(ids.hasNext());
- Preconditions.checkArgument(config.getEndpoint().equals("statuses/user_timeline"));
+ LOGGER.info("{} readCurrent", STREAMS_ID);
- Boolean jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true);
- Boolean includeEntitiesEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getIncludeEntities()))).or(true);
-
- Iterator<Long> ids = config.getFollow().iterator();
while( ids.hasNext() ) {
- Long id = ids.next();
-
- String baseUrl = config.getProtocol() + "://" + config.getHost() + ":" + config.getPort() + "/" + config.getVersion() + "/";
-
- ConfigurationBuilder builder = new ConfigurationBuilder()
- .setOAuthConsumerKey(config.getOauth().getConsumerKey())
- .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
- .setOAuthAccessToken(config.getOauth().getAccessToken())
- .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
- .setIncludeEntitiesEnabled(includeEntitiesEnabled)
- .setJSONStoreEnabled(jsonStoreEnabled)
- .setAsyncNumThreads(3)
- .setRestBaseURL(baseUrl);
-
- Twitter twitter = new TwitterFactory(builder.build()).getInstance();
-
- providerTaskComplete = executor.submit(new TwitterTimelineProviderTask(this, twitter, id));
+ Long currentId = ids.next();
+ LOGGER.info("Provider Task Starting: {}", currentId);
+ captureTimeline(currentId);
}
- for (int i = 0; i < 1; i++) {
- executor.submit(new TwitterEventProcessor(inQueue, providerQueue, klass));
- }
- }
+ LOGGER.info("{} Finished. Cleaning up...", STREAMS_ID);
- @Override
- public StreamsResultSet readCurrent() {
- run();
- StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
+ StreamsResultSet result = (StreamsResultSet) ImmutableList.copyOf(Iterators.consumingIterator(providerQueue.iterator()));
+ LOGGER.info("{} providing {} docs", STREAMS_ID, providerQueue.size());
+ LOGGER.info("{} Exiting", STREAMS_ID);
return result;
+
}
- @Override
public StreamsResultSet readNew(BigInteger sequence) {
+ LOGGER.debug("{} readNew", STREAMS_ID);
throw new NotImplementedException();
}
- @Override
public StreamsResultSet readRange(DateTime start, DateTime end) {
+ LOGGER.debug("{} readRange", STREAMS_ID);
this.start = start;
this.end = end;
- run();
+ readCurrent();
StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
return result;
}
@@ -181,6 +236,8 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable, R
@Override
public void prepare(Object o) {
+ executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+
Preconditions.checkNotNull(providerQueue);
Preconditions.checkNotNull(this.klass);
@@ -197,33 +254,29 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable, R
Boolean jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true);
Boolean includeEntitiesEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getIncludeEntities()))).or(true);
- Iterator<Long> ids = config.getFollow().iterator();
- while( ids.hasNext() ) {
- Long id = ids.next();
-
- String baseUrl = config.getProtocol() + "://" + config.getHost() + ":" + config.getPort() + "/" + config.getVersion() + "/";
-
- ConfigurationBuilder builder = new ConfigurationBuilder()
- .setOAuthConsumerKey(config.getOauth().getConsumerKey())
- .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
- .setOAuthAccessToken(config.getOauth().getAccessToken())
- .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
- .setIncludeEntitiesEnabled(includeEntitiesEnabled)
- .setJSONStoreEnabled(jsonStoreEnabled)
- .setAsyncNumThreads(3)
- .setRestBaseURL(baseUrl);
-
- Twitter twitter = new TwitterFactory(builder.build()).getInstance();
- providerTaskComplete = executor.submit(new TwitterTimelineProviderTask(this, twitter, id));
- }
+ ids = config.getFollow().iterator();
+
+ String baseUrl = config.getProtocol() + "://" + config.getHost() + ":" + config.getPort() + "/" + config.getVersion() + "/";
+
+ ConfigurationBuilder builder = new ConfigurationBuilder()
+ .setOAuthConsumerKey(config.getOauth().getConsumerKey())
+ .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
+ .setOAuthAccessToken(config.getOauth().getAccessToken())
+ .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
+ .setIncludeEntitiesEnabled(includeEntitiesEnabled)
+ .setJSONStoreEnabled(jsonStoreEnabled)
+ .setAsyncNumThreads(3)
+ .setRestBaseURL(baseUrl);
+
+ client = new TwitterFactory(builder.build()).getInstance();
- for (int i = 0; i < 1; i++) {
- executor.submit(new TwitterEventProcessor(inQueue, providerQueue, klass));
- }
}
@Override
public void cleanUp() {
+
+ client.shutdown();
+
shutdownAndAwaitTermination(executor);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
index fcab6f5..9619f4f 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
@@ -60,7 +60,7 @@ public class TwitterTimelineProviderTask implements Runnable {
// emit the record
String json = DataObjectFactory.getRawJSON(tStat);
- provider.inQueue.offer(json);
+ //provider.offer(json);
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTypeConverter.java
deleted file mode 100644
index 0b0507d..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTypeConverter.java
+++ /dev/null
@@ -1,199 +0,0 @@
-package org.apache.streams.twitter.provider;
-
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.pojo.Delete;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Queue;
-import java.util.Random;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class TwitterTypeConverter implements StreamsProcessor, Runnable {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTypeConverter.class);
-
- private ObjectMapper mapper = new ObjectMapper();
-
- private Queue<StreamsDatum> inQueue;
- private Queue<StreamsDatum> outQueue;
-
- private Class inClass;
- private Class outClass;
-
- private TwitterJsonTweetActivitySerializer twitterJsonTweetActivitySerializer = new TwitterJsonTweetActivitySerializer();
- private TwitterJsonRetweetActivitySerializer twitterJsonRetweetActivitySerializer = new TwitterJsonRetweetActivitySerializer();
- private TwitterJsonDeleteActivitySerializer twitterJsonDeleteActivitySerializer = new TwitterJsonDeleteActivitySerializer();
-
- public final static String TERMINATE = new String("TERMINATE");
-
- public TwitterTypeConverter(Class inClass, Class outClass) {
- this.inClass = inClass;
- this.outClass = outClass;
- }
-
- public Queue<StreamsDatum> getProcessorOutputQueue() {
- return outQueue;
- }
-
- public void setProcessorInputQueue(Queue<StreamsDatum> inputQueue) {
- inQueue = inputQueue;
- }
-
- public Object convert(ObjectNode event, Class inClass, Class outClass) {
-
- LOGGER.debug(event.toString());
-
- Object result = null;
-
- if( outClass.equals( Activity.class )) {
- if( inClass.equals( Delete.class )) {
- LOGGER.debug("ACTIVITY DELETE");
- result = twitterJsonDeleteActivitySerializer.convert(event);
- } else if ( inClass.equals( Retweet.class )) {
- LOGGER.debug("ACTIVITY RETWEET");
- result = twitterJsonRetweetActivitySerializer.convert(event);
- } else if ( inClass.equals( Tweet.class )) {
- LOGGER.debug("ACTIVITY TWEET");
- result = twitterJsonTweetActivitySerializer.convert(event);
- } else {
- return null;
- }
- } else if( outClass.equals( Tweet.class )) {
- if ( inClass.equals( Tweet.class )) {
- LOGGER.debug("TWEET");
- result = mapper.convertValue(event, Tweet.class);
- }
- } else if( outClass.equals( Retweet.class )) {
- if ( inClass.equals( Retweet.class )) {
- LOGGER.debug("RETWEET");
- result = mapper.convertValue(event, Retweet.class);
- }
- } else if( outClass.equals( Delete.class )) {
- if ( inClass.equals( Delete.class )) {
- LOGGER.debug("DELETE");
- result = mapper.convertValue(event, Delete.class);
- }
- } else if( outClass.equals( ObjectNode.class )) {
- LOGGER.debug("OBJECTNODE");
- result = mapper.convertValue(event, ObjectNode.class);
- }
-
- // no supported conversion were applied
- if( result != null )
- return result;
-
- LOGGER.debug("CONVERT FAILED");
-
- return null;
-
- }
-
- public boolean validate(Object document, Class klass) {
-
- // TODO
- return true;
- }
-
- public boolean isValidJSON(final String json) {
- boolean valid = false;
- try {
- final JsonParser parser = new ObjectMapper().getJsonFactory()
- .createJsonParser(json);
- while (parser.nextToken() != null) {
- }
- valid = true;
- } catch (JsonParseException jpe) {
- LOGGER.warn("validate: {}", jpe);
- } catch (IOException ioe) {
- LOGGER.warn("validate: {}", ioe);
- }
-
- return valid;
- }
-
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
-
- StreamsDatum result = null;
-
- try {
-
- Object item = entry.getDocument();
- ObjectNode node;
-
- if( item instanceof String ) {
-
- // if the target is string, just pass-through
- if( String.class.equals(outClass))
- outQueue.offer(entry);
- else {
- // first check for valid json
- node = (ObjectNode)mapper.readTree((String)item);
-
- // since data is coming from outside provider, we don't know what type the events are
- Class inClass = TwitterEventClassifier.detectClass((String)item);
-
- Object out = convert(node, inClass, outClass);
-
- if( out != null && validate(out, outClass))
- result = new StreamsDatum(out);
- }
-
- } else if( item instanceof ObjectNode ) {
-
- // first check for valid json
- node = (ObjectNode)mapper.valueToTree(item);
-
- // since data is coming from outside provider, we don't know what type the events are
- Class inClass = TwitterEventClassifier.detectClass((String)item);
-
- Object out = convert(node, inClass, outClass);
-
- if( out != null && validate(out, outClass))
- result = new StreamsDatum(out);
-
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- if( result != null )
- return Lists.newArrayList(result);
- else
- return Lists.newArrayList();
- }
-
- @Override
- public void prepare(Object o) {
-
- }
-
- @Override
- public void cleanUp() {
-
- }
-
- @Override
- public void run() {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java b/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java
new file mode 100644
index 0000000..213b09d
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java
@@ -0,0 +1,23 @@
+package org.apache.streams.core;
+
+import java.io.Serializable;
+
+/**
+ *
+ */
+public interface StreamsOperation extends Serializable {
+
+ /**
+ * This method will be called after initialization/serialization. Initialize any non-serializable objects here.
+ * @param configurationObject Any object to help intialize the operation. ie. Map, JobContext, Properties, etc. The type
+ * will be based on where the operation is being run (ie. hadoop, storm, locally, etc.)
+ */
+ public void prepare(Object configurationObject);
+
+ /**
+ * No guarantee that this method will ever be called. But upon shutdown of the stream, an attempt to call this method
+ * will be made.
+ * Use this method to terminate connections, etc.
+ */
+ public void cleanUp();
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-core/src/main/java/org/apache/streams/core/builders/InvalidStreamException.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/builders/InvalidStreamException.java b/streams-core/src/main/java/org/apache/streams/core/builders/InvalidStreamException.java
new file mode 100644
index 0000000..43c5a4c
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/builders/InvalidStreamException.java
@@ -0,0 +1,23 @@
+package org.apache.streams.core.builders;
+
+/**
+ * Exception that indicates a malformed data stream in some way.
+ */
+public class InvalidStreamException extends RuntimeException {
+
+ public InvalidStreamException() {
+ super();
+ }
+
+ public InvalidStreamException(String s) {
+ super(s);
+ }
+
+ public InvalidStreamException(String s, Throwable throwable) {
+ super(s, throwable);
+ }
+
+ public InvalidStreamException(Throwable throwable) {
+ super(throwable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java b/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
new file mode 100644
index 0000000..76e925f
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
@@ -0,0 +1,256 @@
+package org.apache.streams.core.builders;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.tasks.StreamsProviderTask;
+import org.apache.streams.core.tasks.StreamsTask;
+import org.apache.streams.util.SerializationUtil;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link org.apache.streams.core.builders.StreamBuilder} implementation to run a data processing stream in a single
+ * JVM across many threads. Depending on your data stream, the JVM heap may need to be set to a high value. Default
+ * implementation uses unbound {@link java.util.concurrent.ConcurrentLinkedQueue} to connect stream components.
+ */
+public class LocalStreamBuilder implements StreamBuilder{
+
+ private Map<String, StreamComponent> providers;
+ private Map<String, StreamComponent> components;
+ private Queue<StreamsDatum> queue;
+ private Map<String, Object> streamConfig;
+ private ExecutorService executor;
+ private int totalTasks;
+
+ /**
+ *
+ */
+ public LocalStreamBuilder(){
+ this(new ConcurrentLinkedQueue<StreamsDatum>(), null);
+ }
+
+ /**
+ *
+ * @param streamConfig
+ */
+ public LocalStreamBuilder(Map<String, Object> streamConfig) {
+ this(new ConcurrentLinkedQueue<StreamsDatum>(), streamConfig);
+ }
+
+ /**
+ *
+ * @param queueType
+ */
+ public LocalStreamBuilder(Queue<StreamsDatum> queueType) {
+ this(queueType, null);
+ }
+
+ /**
+ *
+ * @param queueType
+ * @param streamConfig
+ */
+ public LocalStreamBuilder(Queue<StreamsDatum> queueType, Map<String, Object> streamConfig) {
+ this.queue = queueType;
+ this.providers = new HashMap<String, StreamComponent>();
+ this.components = new HashMap<String, StreamComponent>();
+ this.streamConfig = streamConfig;
+ this.totalTasks = 0;
+ }
+
+ @Override
+ public StreamBuilder newReadCurrentStream(String id, StreamsProvider provider) {
+ validateId(id);
+ this.providers.put(id, new StreamComponent(id, provider));
+ ++this.totalTasks;
+ return this;
+ }
+
+ @Override
+ public StreamBuilder newReadNewStream(String id, StreamsProvider provider, BigInteger sequence) {
+ validateId(id);
+ this.providers.put(id, new StreamComponent(id, provider, sequence));
+ ++this.totalTasks;
+ return this;
+ }
+
+ @Override
+ public StreamBuilder newReadRangeStream(String id, StreamsProvider provider, DateTime start, DateTime end) {
+ validateId(id);
+ this.providers.put(id, new StreamComponent(id, provider, start, end));
+ ++this.totalTasks;
+ return this;
+ }
+
+ @Override
+ public StreamBuilder addStreamsProcessor(String id, StreamsProcessor processor, int numTasks, String... inBoundIds) {
+ validateId(id);
+ StreamComponent comp = new StreamComponent(id, processor, cloneQueue(), numTasks);
+ this.components.put(id, comp);
+ connectToOtherComponents(inBoundIds, comp);
+ this.totalTasks += numTasks;
+ return this;
+ }
+
+ @Override
+ public StreamBuilder addStreamsPersistWriter(String id, StreamsPersistWriter writer, int numTasks, String... inBoundIds) {
+ validateId(id);
+ StreamComponent comp = new StreamComponent(id, writer, cloneQueue(), numTasks);
+ this.components.put(id, comp);
+ connectToOtherComponents(inBoundIds, comp);
+ this.totalTasks += numTasks;
+ return this;
+ }
+
+ /**
+ * Runs the data stream in the this JVM and blocks till completion.
+ */
+ @Override
+ public void start() {
+ boolean isRunning = true;
+ this.executor = Executors.newFixedThreadPool(this.totalTasks);
+ Map<String, StreamsProviderTask> provTasks = new HashMap<String, StreamsProviderTask>();
+ Map<String, List<StreamsTask>> streamsTasks = new HashMap<String, List<StreamsTask>>();
+ try {
+ for(StreamComponent comp : this.components.values()) {
+ int tasks = comp.getNumTasks();
+ List<StreamsTask> compTasks = new LinkedList<StreamsTask>();
+ for(int i=0; i < tasks; ++i) {
+ StreamsTask task = comp.createConnectedTask();
+ task.setStreamConfig(this.streamConfig);
+ this.executor.submit(task);
+ compTasks.add(task);
+ }
+ streamsTasks.put(comp.getId(), compTasks);
+ }
+ for(StreamComponent prov : this.providers.values()) {
+ StreamsTask task = prov.createConnectedTask();
+ task.setStreamConfig(this.streamConfig);
+ this.executor.submit(task);
+ provTasks.put(prov.getId(), (StreamsProviderTask) task);
+ }
+
+ while(isRunning) {
+ isRunning = false;
+ for(StreamsProviderTask task : provTasks.values()) {
+ isRunning = isRunning || task.isRunning();
+ }
+ if(isRunning) {
+ Thread.sleep(10000);
+ }
+ }
+ this.executor.shutdown();
+ //complete stream shut down gracfully
+ for(StreamComponent prov : this.providers.values()) {
+ shutDownTask(prov, streamsTasks);
+ }
+ //need to make this configurable
+ if(!this.executor.awaitTermination(10, TimeUnit.SECONDS)) { // all threads should have terminated already.
+ this.executor.shutdownNow();
+ this.executor.awaitTermination(10, TimeUnit.SECONDS);
+ }
+ } catch (InterruptedException e){
+ //give the stream 30secs to try to shutdown gracefully, then force shutdown otherwise
+ for(List<StreamsTask> tasks : streamsTasks.values()) {
+ for(StreamsTask task : tasks) {
+ task.stopTask();
+ }
+ }
+ this.executor.shutdown();
+ try {
+ if(!this.executor.awaitTermination(30, TimeUnit.SECONDS)){
+ this.executor.shutdownNow();
+ }
+ }catch (InterruptedException ie) {
+ this.executor.shutdownNow();
+ throw new RuntimeException(ie);
+ }
+ }
+
+ }
+
+ /**
+ * Shutsdown the running tasks in sudo depth first search kind of way. Checks that the upstream components have
+ * finished running before shutting down. Waits till inbound queue is empty to shutdown.
+ * @param comp StreamComponent to shut down.
+ * @param streamTasks the list of non-StreamsProvider tasks for this stream.
+ * @throws InterruptedException
+ */
+ private void shutDownTask(StreamComponent comp, Map<String, List<StreamsTask>> streamTasks) throws InterruptedException {
+ List<StreamsTask> tasks = streamTasks.get(comp.getId());
+ if(tasks != null) { //not a StreamProvider
+ boolean parentsShutDown = true;
+ for(StreamComponent parent : comp.getUpStreamComponents()) {
+ List<StreamsTask> parentTasks = streamTasks.get(parent.getId());
+ //if parentTask == null, its a provider and is not running anymore
+ if(parentTasks != null) {
+ for(StreamsTask task : parentTasks) {
+ parentsShutDown = parentsShutDown && !task.isRunning();
+ }
+ }
+ }
+ if(parentsShutDown) {
+ for(StreamsTask task : tasks) {
+ task.stopTask();
+ }
+ for(StreamsTask task : tasks) {
+ while(task.isRunning()) {
+ Thread.sleep(500);
+ }
+ }
+ }
+ }
+ Collection<StreamComponent> children = comp.getDownStreamComponents();
+ if(children != null) {
+ for(StreamComponent child : comp.getDownStreamComponents()) {
+ shutDownTask(child, streamTasks);
+ }
+ }
+ }
+
+ /**
+ * NOT IMPLEMENTED.
+ */
+ @Override
+ public void stop() {
+
+ }
+
+ private void connectToOtherComponents(String[] conntectToIds, StreamComponent toBeConnected) {
+ for(String id : conntectToIds) {
+ StreamComponent upStream = null;
+ if(this.providers.containsKey(id)) {
+ upStream = this.providers.get(id);
+ }
+ else if(this.components.containsKey(id)) {
+ upStream = this.components.get(id);
+ }
+ else {
+ throw new InvalidStreamException("Cannot connect to id, "+id+", because id does not exist.");
+ }
+ upStream.addOutBoundQueue(toBeConnected, toBeConnected.getInBoundQueue());
+ toBeConnected.addInboundQueue(upStream);
+ }
+ }
+
+ private void validateId(String id) {
+ if(this.providers.containsKey(id) || this.components.containsKey(id)) {
+ throw new InvalidStreamException("Duplicate id. "+id+" is already assigned to another component");
+ }
+ }
+
+
+ private Queue<StreamsDatum> cloneQueue() {
+ return (Queue<StreamsDatum>)SerializationUtil.cloneBySerialization(this.queue);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-core/src/main/java/org/apache/streams/core/builders/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/builders/StreamBuilder.java b/streams-core/src/main/java/org/apache/streams/core/builders/StreamBuilder.java
new file mode 100644
index 0000000..918eb7a
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/builders/StreamBuilder.java
@@ -0,0 +1,97 @@
+package org.apache.streams.core.builders;
+
+import org.apache.streams.core.*;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.Queue;
+
+/**
+ * Interface for building data streams.
+ *
+ * <pre>
+ * StreamBuilder builder = ...
+ * builder.newReadCurrentStream(. . .)
+ * .addStreamsProcessor(. . .)
+ * ...
+ * .addStreamsPersistWriter(. . .)
+ * builder.run();
+ * </pre>
+ *
+ */
+public interface StreamBuilder {
+
+
+ /**
+ * Add a {@link org.apache.streams.core.StreamsProcessor} to the data processing stream.
+ * @param processorId unique id for this processor - must be unique across the entire stream
+ * @param processor the processor to execute
+ * @param numTasks the number of instances of this processor to run concurrently
+ * @param connectToIds the ids of the {@link org.apache.streams.core.StreamsOperation} that this process will
+ * receive data from.
+ * @return this
+ */
+ public StreamBuilder addStreamsProcessor(String processorId, StreamsProcessor processor, int numTasks, String... connectToIds);
+
+ /**
+ * Add a {@link org.apache.streams.core.StreamsPersistWriter} to the data processing stream.
+ * @param persistWriterId unique id for this processor - must be unique across the entire stream
+ * @param writer the writer to execute
+ * @param numTasks the number of instances of this writer to run concurrently
+ * @param connectToIds the ids of the {@link org.apache.streams.core.StreamsOperation} that this process will
+ * receive data from.
+ * @return this
+ */
+ public StreamBuilder addStreamsPersistWriter(String persistWriterId, StreamsPersistWriter writer, int numTasks, String... connectToIds);
+
+ /**
+ * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream. The provider will execute
+ * {@link org.apache.streams.core.StreamsProvider:readCurrent()} to produce data.
+ * @param streamId unique if for this provider - must be unique across the entire stream.
+ * @param provider provider to execute
+ * @return this
+ */
+ public StreamBuilder newReadCurrentStream(String streamId, StreamsProvider provider);
+
+ /**
+ * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream. The provider will execute
+ * {@link org.apache.streams.core.StreamsProvider:readNext(BigInteger)} to produce data.
+ * @param streamId unique if for this provider - must be unique across the entire stream.
+ * @param provider provider to execute
+ * @param sequence sequence to pass to {@link org.apache.streams.core.StreamsProvider:readNext(BigInteger)} method
+ * @return this
+ */
+ public StreamBuilder newReadNewStream(String streamId, StreamsProvider provider, BigInteger sequence);
+
+ /**
+ * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream. The provider will execute
+ * {@link org.apache.streams.core.StreamsProvider:readRange(DateTime, DateTime)} to produce data. Whether the start
+ * and end dates are inclusive or exclusive is up to the implementation.
+ * @param streamId unique if for this provider - must be unique across the entire stream.
+ * @param provider provider to execute
+ * @param start start date
+ * @param end end date
+ * @return this
+ */
+ public StreamBuilder newReadRangeStream(String streamId, StreamsProvider provider, DateTime start, DateTime end);
+
+ /**
+ * Builds the stream, and starts it or submits it based on implementation.
+ */
+ public void start();
+
+ /**
+ * Stops the streams processing. No guarantee on a smooth shutdown. Optional method, may not be implemented in
+ * all cases.
+ */
+ public void stop();
+
+
+
+
+
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java b/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java
new file mode 100644
index 0000000..2f1b14c
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java
@@ -0,0 +1,217 @@
+package org.apache.streams.core.builders;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.tasks.StreamsPersistWriterTask;
+import org.apache.streams.core.tasks.StreamsProcessorTask;
+import org.apache.streams.core.tasks.StreamsProviderTask;
+import org.apache.streams.core.tasks.StreamsTask;
+import org.apache.streams.util.SerializationUtil;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.*;
+
+/**
+ * Stores the implementations of {@link org.apache.streams.core.StreamsOperation}, the StreamsOperations it is connected
+ * to and the necessary metadata to construct a data stream.
+ */
+public class StreamComponent {
+
+ private static final int START = 1;
+ private static final int END = 2;
+
+ private String id;
+ private Set<StreamComponent> inBound;
+ private Map<StreamComponent, Queue<StreamsDatum>> outBound;
+ private Queue<StreamsDatum> inQueue;
+ private StreamsProvider provider;
+ private StreamsProcessor processor;
+ private StreamsPersistWriter writer;
+ private DateTime[] dateRange;
+ private BigInteger sequence;
+ private int numTasks = 1;
+
+ /**
+ *
+ * @param id
+ * @param provider
+ */
+ public StreamComponent(String id, StreamsProvider provider) {
+ this.id = id;
+ this.provider = provider;
+ initializePrivateVariables();
+ }
+
+ /**
+ *
+ * @param id
+ * @param provider
+ * @param start
+ * @param end
+ */
+ public StreamComponent(String id, StreamsProvider provider, DateTime start, DateTime end) {
+ this.id = id;
+ this.provider = provider;
+ this.dateRange = new DateTime[2];
+ this.dateRange[START] = start;
+ this.dateRange[END] = end;
+ initializePrivateVariables();
+ }
+
+
+ /**
+ *
+ * @param id
+ * @param provider
+ * @param sequence
+ */
+ public StreamComponent(String id, StreamsProvider provider, BigInteger sequence) {
+ this.id = id;
+ this.provider = provider;
+ this.sequence = sequence;
+ }
+
+ /**
+ *
+ * @param id
+ * @param processor
+ * @param inQueue
+ * @param numTasks
+ */
+ public StreamComponent(String id, StreamsProcessor processor, Queue<StreamsDatum> inQueue, int numTasks) {
+ this.id = id;
+ this.processor = processor;
+ this.inQueue = inQueue;
+ this.numTasks = numTasks;
+ initializePrivateVariables();
+ }
+
+ /**
+ *
+ * @param id
+ * @param writer
+ * @param inQueue
+ * @param numTasks
+ */
+ public StreamComponent(String id, StreamsPersistWriter writer, Queue<StreamsDatum> inQueue, int numTasks) {
+ this.id = id;
+ this.writer = writer;
+ this.inQueue = inQueue;
+ this.numTasks = numTasks;
+ initializePrivateVariables();
+ }
+
+ private void initializePrivateVariables() {
+ this.inBound = new HashSet<StreamComponent>();
+ this.outBound = new HashMap<StreamComponent, Queue<StreamsDatum>>();
+ }
+
+ /**
+ * Add an outbound queue for this component. The queue should be an inbound queue of a downstream component.
+ * @param component the component that this supplying their inbound queue
+ * @param queue the queue to to put post processed/provided datums on
+ */
+ public void addOutBoundQueue(StreamComponent component, Queue<StreamsDatum> queue) {
+ this.outBound.put(component, queue);
+ }
+
+ /**
+ * Add a component that supplies data through the inbound queue.
+ * @param component that supplies data through the inbound queue
+ */
+ public void addInboundQueue(StreamComponent component) {
+ this.inBound.add(component);
+ }
+
+ /**
+ * The components that are immediately downstream of this component (aka child nodes)
+ * @return Collection of child nodes of this component
+ */
+ public Collection<StreamComponent> getDownStreamComponents() {
+ return this.outBound.keySet();
+ }
+
+ /**
+ * The components that are immediately upstream of this component (aka parent nodes)
+ * @return Collection of parent nodes of this component
+ */
+ public Collection<StreamComponent> getUpStreamComponents() {
+ return this.inBound;
+ }
+
+ /**
+ * The inbound queue for this component
+ * @return inbound queue
+ */
+ public Queue<StreamsDatum> getInBoundQueue() {
+ return this.inQueue;
+ }
+
+ /**
+ * The number of tasks this to run this component
+ * @return
+ */
+ public int getNumTasks() {
+ return this.numTasks;
+ }
+
+ /**
+ * Creates a {@link org.apache.streams.core.tasks.StreamsTask} that is running a clone of this component whose
+ * inbound and outbound queues are appropriately connected to the parent and child nodes.
+ * @return StreamsTask for this component
+ */
+ public StreamsTask createConnectedTask() {
+ StreamsTask task;
+ if(this.processor != null) {
+ task = new StreamsProcessorTask((StreamsProcessor)SerializationUtil.cloneBySerialization(this.processor));
+ task.addInputQueue(this.inQueue);
+ for(Queue<StreamsDatum> q : this.outBound.values()) {
+ task.addOutputQueue(q);
+ }
+ }
+ else if(this.writer != null) {
+ task = new StreamsPersistWriterTask((StreamsPersistWriter) SerializationUtil.cloneBySerialization(this.writer));
+ task.addInputQueue(this.inQueue);
+ }
+ else if(this.provider != null) {
+ StreamsProvider prov = (StreamsProvider)SerializationUtil.cloneBySerialization(this.provider);
+ if(this.dateRange == null && this.sequence == null)
+ task = new StreamsProviderTask(prov);
+ else if(this.sequence != null)
+ task = new StreamsProviderTask(prov, this.sequence);
+ else
+ task = new StreamsProviderTask(prov, this.dateRange[0], this.dateRange[1]);
+ for(Queue<StreamsDatum> q : this.outBound.values()) {
+ task.addOutputQueue(q);
+ }
+ }
+ else {
+ throw new InvalidStreamException("Underlying StreamComponoent was NULL.");
+ }
+ return task;
+ }
+
+ /**
+ * The unique of this component
+ * @return
+ */
+ public String getId() {
+ return this.id;
+ }
+
+ @Override
+ public int hashCode() {
+ return this.id.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if(o instanceof StreamComponent)
+ return this.id.equals(((StreamComponent) o).id);
+ else
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java b/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java
new file mode 100644
index 0000000..8c275bf
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java
@@ -0,0 +1,89 @@
+package org.apache.streams.core.tasks;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.util.SerializationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ *
+ */
+public abstract class BaseStreamsTask implements StreamsTask {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(BaseStreamsTask.class);
+
+ private List<Queue<StreamsDatum>> inQueues = new ArrayList<Queue<StreamsDatum>>();
+ private List<Queue<StreamsDatum>> outQueues = new LinkedList<Queue<StreamsDatum>>();
+ private int inIndex = 0;
+
+ @Override
+ public void addInputQueue(Queue<StreamsDatum> inputQueue) {
+ this.inQueues.add(inputQueue);
+ }
+
+ @Override
+ public void addOutputQueue(Queue<StreamsDatum> outputQueue) {
+ this.outQueues.add(outputQueue);
+ }
+
+ @Override
+ public List<Queue<StreamsDatum>> getInputQueues() {
+ return this.inQueues;
+ }
+
+ @Override
+ public List<Queue<StreamsDatum>> getOutputQueues() {
+ return this.outQueues;
+ }
+
+ /**
+ * NOTE NECCESSARY AT THE MOMENT. MAY BECOME NECESSARY AS WE LOOK AT MAKING JOIN TASKS. CURRENTLY ALL TASK HAVE MAX
+ * OF 1 INPUT QUEUE.
+ * Round Robins through input queues to get the next StreamsDatum. If all input queues are empty, it will return null.
+ * @return the next StreamsDatum or null if all input queues are empty.
+ */
+ protected StreamsDatum getNextDatum() {
+ int startIndex = this.inIndex;
+ int index = startIndex;
+ StreamsDatum datum = null;
+ do {
+ datum = this.inQueues.get(index).poll();
+ index = getNextInputQueueIndex();
+ } while( datum == null && startIndex != index);
+ return datum;
+ }
+
+ /**
+ * Adds a StreamDatum to the outgoing queues. If there are multiple queues, it uses serialization to create
+ * clones of the datum and adds a new clone to each queue.
+ * @param datum
+ */
+ protected void addToOutgoingQueue(StreamsDatum datum) {
+ if(this.outQueues.size() == 1) {
+ this.outQueues.get(0).offer(datum);
+ }
+ else {
+ for(Queue<StreamsDatum> queue : this.outQueues) {
+ try {
+ queue.offer((StreamsDatum) SerializationUtil.deserialize(SerializationUtil.serialize(datum)));
+ } catch (RuntimeException e) {
+ LOGGER.debug("Failed to add StreamsDatum to outgoing queue : {}", datum);
+ LOGGER.error("Exception while offering StreamsDatum to outgoing queue: {}", e);
+ }
+ }
+ }
+ }
+
+ private int getNextInputQueueIndex() {
+ ++this.inIndex;
+ if(this.inIndex >= this.inQueues.size()) {
+ this.inIndex = 0;
+ }
+ return this.inIndex;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsMergeTask.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsMergeTask.java b/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsMergeTask.java
new file mode 100644
index 0000000..f3ad0cc
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsMergeTask.java
@@ -0,0 +1,58 @@
+package org.apache.streams.core.tasks;
+
+import org.apache.streams.core.StreamsDatum;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * NOT USED. When joins/partions are implemented, a similar pattern could be followed. Done only as basic proof
+ * of concept.
+ */
+public class StreamsMergeTask extends BaseStreamsTask {
+
+ private AtomicBoolean keepRunning;
+ private long sleepTime;
+
+ public StreamsMergeTask() {
+ this(DEFAULT_SLEEP_TIME_MS);
+ }
+
+ public StreamsMergeTask(long sleepTime) {
+ this.sleepTime = sleepTime;
+ this.keepRunning = new AtomicBoolean(true);
+ }
+
+
+ @Override
+ public void stopTask() {
+ this.keepRunning.set(false);
+ }
+
+ @Override
+ public void setStreamConfig(Map<String, Object> config) {
+
+ }
+
+ @Override
+ public boolean isRunning() {
+ return false;
+ }
+
+ @Override
+ public void run() {
+ while(this.keepRunning.get()) {
+ StreamsDatum datum = super.getNextDatum();
+ if(datum != null) {
+ super.addToOutgoingQueue(datum);
+ }
+ else {
+ try {
+ Thread.sleep(this.sleepTime);
+ } catch (InterruptedException e) {
+ this.keepRunning.set(false);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsPersistWriterTask.java b/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsPersistWriterTask.java
new file mode 100644
index 0000000..1a701a7
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsPersistWriterTask.java
@@ -0,0 +1,103 @@
+package org.apache.streams.core.tasks;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ */
+public class StreamsPersistWriterTask extends BaseStreamsTask {
+
+
+
+ private StreamsPersistWriter writer;
+ private long sleepTime;
+ private AtomicBoolean keepRunning;
+ private Map<String, Object> streamConfig;
+ private Queue<StreamsDatum> inQueue;
+ private AtomicBoolean isRunning;
+
+ /**
+ * Default constructor. Uses default sleep of 500ms when inbound queue is empty.
+ * @param writer writer to execute in task
+ */
+ public StreamsPersistWriterTask(StreamsPersistWriter writer) {
+ this(writer, DEFAULT_SLEEP_TIME_MS);
+ }
+
+ /**
+ *
+ * @param writer writer to execute in task
+ * @param sleepTime time to sleep when inbound queue is empty.
+ */
+ public StreamsPersistWriterTask(StreamsPersistWriter writer, long sleepTime) {
+ this.writer = writer;
+ this.sleepTime = sleepTime;
+ this.keepRunning = new AtomicBoolean(true);
+ this.isRunning = new AtomicBoolean(true);
+ }
+
+ @Override
+ public void setStreamConfig(Map<String, Object> config) {
+ this.streamConfig = config;
+ }
+
+ @Override
+ public void addInputQueue(Queue<StreamsDatum> inputQueue) {
+ this.inQueue = inputQueue;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return this.isRunning.get();
+ }
+
+ @Override
+ public void run() {
+ try {
+ this.writer.prepare(this.streamConfig);
+ StreamsDatum datum = this.inQueue.poll();
+ while(datum != null || this.keepRunning.get()) {
+ if(datum != null) {
+ this.writer.write(datum);
+ }
+ else {
+ try {
+ Thread.sleep(this.sleepTime);
+ } catch (InterruptedException e) {
+ this.keepRunning.set(false);
+ }
+ }
+ datum = this.inQueue.poll();
+ }
+
+ } finally {
+ this.writer.cleanUp();
+ this.isRunning.set(false);
+ }
+ }
+
+ @Override
+ public void stopTask() {
+ this.keepRunning.set(false);
+ }
+
+
+ @Override
+ public void addOutputQueue(Queue<StreamsDatum> outputQueue) {
+ throw new UnsupportedOperationException(this.getClass().getName()+" does not support method - setOutputQueue()");
+ }
+
+ @Override
+ public List<Queue<StreamsDatum>> getInputQueues() {
+ List<Queue<StreamsDatum>> queues = new LinkedList<Queue<StreamsDatum>>();
+ queues.add(this.inQueue);
+ return queues;
+ }
+}