You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/22 00:20:19 UTC

[41/71] [abbrv] git commit: adding uncommitted core classes, and updates to twitter, es, mongo, hdfs

adding uncommitted core classes, and updates to twitter, es, mongo, hdfs


git-svn-id: https://svn.apache.org/repos/asf/incubator/streams/branches/STREAMS-26@1572204 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/4705fcb7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/4705fcb7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/4705fcb7

Branch: refs/heads/master
Commit: 4705fcb7597550fb5fae9c85c2ec537264291038
Parents: c028136
Author: sblackmon <sb...@unknown>
Authored: Wed Feb 26 19:15:42 2014 +0000
Committer: sblackmon <sb...@unknown>
Committed: Wed Feb 26 19:15:42 2014 +0000

----------------------------------------------------------------------
 streams-contrib/pom.xml                         |   1 +
 .../streams-persist-elasticsearch/pom.xml       |   1 +
 .../ElasticsearchPersistWriter.java             |   4 +
 .../elasticsearch/PercolateProcessor.java       | 150 +++++++++++
 .../ElasticsearchWriterConfiguration.json       |  18 ++
 .../streams/hdfs/WebHdfsPersistWriter.java      |  18 +-
 .../streams/hdfs/WebHdfsPersistWriterTask.java  |   2 +-
 .../streams/mongo/MongoPersistWriter.java       |  36 +--
 .../streams/mongo/MongoPersistWriterTask.java   |  38 ---
 .../twitter/provider/TwitterEventProcessor.java | 194 --------------
 .../provider/TwitterProfileProcessor.java       | 111 --------
 .../twitter/provider/TwitterStreamProvider.java |   1 +
 .../provider/TwitterTimelineProvider.java       | 195 +++++++++-----
 .../provider/TwitterTimelineProviderTask.java   |   2 +-
 .../twitter/provider/TwitterTypeConverter.java  | 199 --------------
 .../apache/streams/core/StreamsOperation.java   |  23 ++
 .../core/builders/InvalidStreamException.java   |  23 ++
 .../core/builders/LocalStreamBuilder.java       | 256 +++++++++++++++++++
 .../streams/core/builders/StreamBuilder.java    |  97 +++++++
 .../streams/core/builders/StreamComponent.java  | 217 ++++++++++++++++
 .../streams/core/tasks/BaseStreamsTask.java     |  89 +++++++
 .../streams/core/tasks/StreamsMergeTask.java    |  58 +++++
 .../core/tasks/StreamsPersistWriterTask.java    | 103 ++++++++
 .../core/tasks/StreamsProcessorTask.java        | 102 ++++++++
 .../streams/core/tasks/StreamsProviderTask.java | 132 ++++++++++
 .../apache/streams/core/tasks/StreamsTask.java  |  58 +++++
 26 files changed, 1490 insertions(+), 638 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index ad559d8..07660ee 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -40,6 +40,7 @@
         <module>streams-persist-elasticsearch</module>
         <module>streams-persist-hdfs</module>
         <module>streams-persist-kafka</module>
+        <module>streams-persist-mongo</module>
         <!--<module>streams-provider-datasift</module>-->
         <!--<module>streams-provider-facebook</module>-->
         <!--<module>streams-provider-gnip</module>-->

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/streams-persist-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/pom.xml b/streams-contrib/streams-persist-elasticsearch/pom.xml
index 2b99973..84aad95 100644
--- a/streams-contrib/streams-persist-elasticsearch/pom.xml
+++ b/streams-contrib/streams-persist-elasticsearch/pom.xml
@@ -74,6 +74,7 @@
                     <generateBuilders>true</generateBuilders>
                     <sourcePaths>
                         <sourcePath>src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchConfiguration.json</sourcePath>
+                        <sourcePath>src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json</sourcePath>
                     </sourcePaths>
                     <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
                     <targetPackage>org.apache.streams.elasticsearch.pojo</targetPackage>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 6afe959..12d6d06 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -116,6 +116,10 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
         this.config = ElasticsearchConfigurator.detectConfiguration(config);
     }
 
+    public ElasticsearchPersistWriter(ElasticsearchConfiguration config) {
+        this.config = config;
+    }
+
     private static final int  BYTES_IN_MB = 1024*1024;
     private static final int  BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
     private volatile int  totalByteCount = 0;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
new file mode 100644
index 0000000..e625381
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
@@ -0,0 +1,150 @@
+package org.apache.streams.elasticsearch;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.action.percolate.PercolateResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * References:
+ * Some helpful references to help
+ * Purpose              URL
+ * -------------        ----------------------------------------------------------------
+ * [Status Codes]       http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
+ * [Test Cases]         http://greenbytes.de/tech/tc/httpredirects/
+ * [t.co behavior]      https://dev.twitter.com/docs/tco-redirection-behavior
+ */
+
+public class PercolateProcessor implements StreamsProcessor, Runnable
+{
+    private final static Logger LOGGER = LoggerFactory.getLogger(PercolateProcessor.class);
+
+    private ObjectMapper mapper = new ObjectMapper();
+
+    protected Queue<StreamsDatum> inQueue;
+    protected Queue<StreamsDatum> outQueue;
+
+    private ElasticsearchWriterConfiguration config;
+    private ElasticsearchClientManager manager;
+
+    public PercolateProcessor(Queue<StreamsDatum> inQueue) {
+        this.inQueue = inQueue;
+        this.outQueue = new LinkedBlockingQueue<StreamsDatum>();
+    }
+
+    public ElasticsearchClientManager getManager() {
+        return manager;
+    }
+
+    public void setManager(ElasticsearchClientManager manager) {
+        this.manager = manager;
+    }
+
+    public ElasticsearchWriterConfiguration getConfig() {
+        return config;
+    }
+
+    public void setConfig(ElasticsearchWriterConfiguration config) {
+        this.config = config;
+    }
+
+    public void start() {
+        Preconditions.checkNotNull(config);
+        Preconditions.checkNotNull(manager);
+        Preconditions.checkNotNull(manager.getClient());
+    }
+
+    public void stop() {
+
+    }
+
+    public Queue<StreamsDatum> getProcessorOutputQueue() {
+        return outQueue;
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+
+        List<StreamsDatum> result = Lists.newArrayList();
+
+        String json;
+        ObjectNode node;
+        // first check for valid json
+        if( entry.getDocument() instanceof String ) {
+            json = (String) entry.getDocument();
+            try {
+                node = (ObjectNode) mapper.readTree(json);
+            } catch (IOException e) {
+                e.printStackTrace();
+                return null;
+            }
+        } else {
+            node = (ObjectNode) entry.getDocument();
+            json = node.asText();
+        }
+
+        PercolateResponse response = manager.getClient().preparePercolate(config.getIndex(), config.getType()).setSource(json).execute().actionGet();
+
+        ArrayNode tagArray = JsonNodeFactory.instance.arrayNode();
+
+        for( String match : response.getMatches())
+            tagArray.add(match);
+
+        // need utility methods for get / create specific node
+        ObjectNode extensions = (ObjectNode) node.get("extensions");
+        ObjectNode w2o = (ObjectNode) extensions.get("w2o");
+        w2o.put("tags", tagArray);
+
+        result.add(entry);
+
+        return result;
+
+    }
+
+    @Override
+    public void prepare(Object o) {
+        start();
+    }
+
+    @Override
+    public void cleanUp() {
+        stop();
+    }
+
+    @Override
+    public void run() {
+
+        while(true) {
+            StreamsDatum item;
+            try {
+                item = inQueue.poll();
+
+                Thread.sleep(new Random().nextInt(100));
+
+                for( StreamsDatum entry : process(item)) {
+                    outQueue.offer(entry);
+                }
+
+
+            } catch (Exception e) {
+                e.printStackTrace();
+
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
new file mode 100644
index 0000000..21aad5c
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@ -0,0 +1,18 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration",
+    "extends": {"$ref":"ElasticsearchConfiguration.json"},
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "index": {
+            "type": "string",
+            "description": "Index to write to"
+        },
+        "type": {
+            "type": "string",
+            "description": "Type to write as"
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index de87ba5..de21055 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -18,6 +18,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.streams.hdfs.HdfsConfiguration;
 
+import java.io.Closeable;
+import java.io.Flushable;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.net.URI;
@@ -26,7 +28,7 @@ import java.security.PrivilegedExceptionAction;
 import java.util.*;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-public class WebHdfsPersistWriter implements StreamsPersistWriter, Runnable
+public class WebHdfsPersistWriter implements StreamsPersistWriter, Runnable, Flushable, Closeable
 {
     private final static Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistWriter.class);
 
@@ -300,13 +302,13 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Runnable
         }
     }
 
-    public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
-        this.persistQueue = persistQueue;
-    }
-
-    public Queue<StreamsDatum> getPersistQueue() {
-        return persistQueue;
-    }
+//    public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+//        this.persistQueue = persistQueue;
+//    }
+//
+//    public Queue<StreamsDatum> getPersistQueue() {
+//        return persistQueue;
+//    }
 
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java
index 02e4519..2e90a00 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java
@@ -20,7 +20,7 @@ public class WebHdfsPersistWriterTask implements Runnable {
     public void run() {
 
         while(true) {
-            if( writer.getPersistQueue().peek() != null ) {
+            if( writer.persistQueue.peek() != null ) {
                 try {
                     StreamsDatum entry = writer.persistQueue.remove();
                     writer.write(entry);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
index 5ab8470..8d8d4b3 100644
--- a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
+++ b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
@@ -21,6 +21,7 @@ import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Queue;
+import java.util.Random;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 public class MongoPersistWriter implements StreamsPersistWriter, Runnable
@@ -110,14 +111,12 @@ public class MongoPersistWriter implements StreamsPersistWriter, Runnable
         client.cleanCursors(true);
     }
 
-    @Override
     public void start() {
 
         connectToMongo();
 
     }
 
-    @Override
     public void stop() {
 
         try {
@@ -132,32 +131,39 @@ public class MongoPersistWriter implements StreamsPersistWriter, Runnable
         }
     }
 
-    @Override
     public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
         this.persistQueue = persistQueue;
     }
 
-    @Override
     public Queue<StreamsDatum> getPersistQueue() {
         return persistQueue;
     }
 
-
-    @Override
     public void run() {
 
-        start();
+        while(true) {
+            if( persistQueue.peek() != null ) {
+                try {
+                    StreamsDatum entry = persistQueue.remove();
+                    write(entry);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+            try {
+                Thread.sleep(new Random().nextInt(1));
+            } catch (InterruptedException e) {}
+        }
 
-        Thread task = new Thread(new MongoPersistWriterTask(this));
-        task.start();
+    }
 
-        try {
-            task.join();
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-            return;
-        }
+    @Override
+    public void prepare(Object configurationObject) {
+        start();
+    }
 
+    @Override
+    public void cleanUp() {
         stop();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriterTask.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriterTask.java
deleted file mode 100644
index 398d1cd..0000000
--- a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriterTask.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.apache.streams.mongo;
-
-import org.apache.streams.core.StreamsDatum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Random;
-
-public class MongoPersistWriterTask implements Runnable {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(MongoPersistWriterTask.class);
-
-    private MongoPersistWriter writer;
-
-    public MongoPersistWriterTask(MongoPersistWriter writer) {
-        this.writer = writer;
-    }
-
-    @Override
-    public void run() {
-
-        while(true) {
-            if( writer.getPersistQueue().peek() != null ) {
-                try {
-                    StreamsDatum entry = writer.persistQueue.remove();
-                    writer.write(entry);
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-            try {
-                Thread.sleep(new Random().nextInt(1));
-            } catch (InterruptedException e) {}
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
deleted file mode 100644
index 871c5f0..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
+++ /dev/null
@@ -1,194 +0,0 @@
-package org.apache.streams.twitter.provider;
-
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.pojo.Delete;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Queue;
-import java.util.Random;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class TwitterEventProcessor implements StreamsProcessor, Runnable {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterEventProcessor.class);
-
-    private ObjectMapper mapper = new ObjectMapper();
-
-    private BlockingQueue<String> inQueue;
-    private Queue<StreamsDatum> outQueue;
-
-    private Class inClass;
-    private Class outClass;
-
-    private TwitterJsonTweetActivitySerializer twitterJsonTweetActivitySerializer = new TwitterJsonTweetActivitySerializer();
-    private TwitterJsonRetweetActivitySerializer twitterJsonRetweetActivitySerializer = new TwitterJsonRetweetActivitySerializer();
-    private TwitterJsonDeleteActivitySerializer twitterJsonDeleteActivitySerializer = new TwitterJsonDeleteActivitySerializer();
-
-    public final static String TERMINATE = new String("TERMINATE");
-
-    public TwitterEventProcessor(BlockingQueue<String> inQueue, Queue<StreamsDatum> outQueue, Class inClass, Class outClass) {
-        this.inQueue = inQueue;
-        this.outQueue = outQueue;
-        this.inClass = inClass;
-        this.outClass = outClass;
-    }
-
-    public TwitterEventProcessor(BlockingQueue<String> inQueue, Queue<StreamsDatum> outQueue, Class outClass) {
-        this.inQueue = inQueue;
-        this.outQueue = outQueue;
-        this.outClass = outClass;
-    }
-
-    @Override
-    public void run() {
-
-        while(true) {
-            String item;
-            try {
-                item = inQueue.poll();
-                if(item instanceof String && item.equals(TERMINATE)) {
-                    LOGGER.info("Terminating!");
-                    break;
-                }
-
-                ObjectNode objectNode = (ObjectNode) mapper.readTree(item);
-
-                StreamsDatum rawDatum = new StreamsDatum(objectNode);
-
-                for( StreamsDatum entry : process(rawDatum)) {
-                    outQueue.offer(entry);
-                }
-
-            } catch (Exception e) {
-                e.printStackTrace();
-
-            }
-        }
-    }
-
-    public Object convert(ObjectNode event, Class inClass, Class outClass) {
-
-        LOGGER.debug(event.toString());
-
-        Object result = null;
-
-        if( outClass.equals( Activity.class )) {
-            if( inClass.equals( Delete.class )) {
-                LOGGER.debug("ACTIVITY DELETE");
-                result = twitterJsonDeleteActivitySerializer.convert(event);
-            } else if ( inClass.equals( Retweet.class )) {
-                LOGGER.debug("ACTIVITY RETWEET");
-                result = twitterJsonRetweetActivitySerializer.convert(event);
-            } else if ( inClass.equals( Tweet.class )) {
-                LOGGER.debug("ACTIVITY TWEET");
-                result = twitterJsonTweetActivitySerializer.convert(event);
-            } else {
-                return null;
-            }
-        } else if( outClass.equals( Tweet.class )) {
-            if ( inClass.equals( Tweet.class )) {
-                LOGGER.debug("TWEET");
-                result = mapper.convertValue(event, Tweet.class);
-            }
-        } else if( outClass.equals( Retweet.class )) {
-            if ( inClass.equals( Retweet.class )) {
-                LOGGER.debug("RETWEET");
-                result = mapper.convertValue(event, Retweet.class);
-            }
-        } else if( outClass.equals( Delete.class )) {
-            if ( inClass.equals( Delete.class )) {
-                LOGGER.debug("DELETE");
-                result = mapper.convertValue(event, Delete.class);
-            }
-        } else if( outClass.equals( ObjectNode.class )) {
-            LOGGER.debug("OBJECTNODE");
-            result = mapper.convertValue(event, ObjectNode.class);
-        }
-
-            // no supported conversion were applied
-        if( result != null )
-            return result;
-
-        LOGGER.debug("CONVERT FAILED");
-
-        return null;
-
-    }
-
-    public boolean validate(Object document, Class klass) {
-
-        // TODO
-        return true;
-    }
-
-    public boolean isValidJSON(final String json) {
-        boolean valid = false;
-        try {
-            final JsonParser parser = new ObjectMapper().getJsonFactory()
-                    .createJsonParser(json);
-            while (parser.nextToken() != null) {
-            }
-            valid = true;
-        } catch (JsonParseException jpe) {
-            LOGGER.warn("validate: {}", jpe);
-        } catch (IOException ioe) {
-            LOGGER.warn("validate: {}", ioe);
-        }
-
-        return valid;
-    }
-
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-
-        // first check for valid json
-        ObjectNode node = (ObjectNode) entry.getDocument();
-
-        String json = node.asText();
-
-        // since data is coming from outside provider, we don't know what type the events are
-        Class inClass = TwitterEventClassifier.detectClass(json);
-
-        // if the target is string, just pass-through
-        if( java.lang.String.class.equals(outClass))
-            return Lists.newArrayList(new StreamsDatum(json));
-        else {
-            // convert to desired format
-            Object out = convert(node, inClass, outClass);
-
-            if( out != null && validate(out, outClass))
-                return Lists.newArrayList(new StreamsDatum(out));
-        }
-
-        return Lists.newArrayList();
-
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-
-    }
-
-    @Override
-    public void cleanUp() {
-
-    }
-};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProfileProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProfileProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProfileProcessor.java
deleted file mode 100644
index 3f9c24b..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProfileProcessor.java
+++ /dev/null
@@ -1,111 +0,0 @@
-package org.apache.streams.twitter.provider;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.pojo.User;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Queue;
-import java.util.Random;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class TwitterProfileProcessor implements StreamsProcessor, Runnable {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterProfileProcessor.class);
-
-    private ObjectMapper mapper = new ObjectMapper();
-
-    private Queue<StreamsDatum> inQueue;
-    private Queue<StreamsDatum> outQueue;
-
-    public final static String TERMINATE = new String("TERMINATE");
-
-    @Override
-    public void run() {
-
-        while(true) {
-            StreamsDatum item;
-                try {
-                    item = inQueue.poll();
-                    if(item.getDocument() instanceof String && item.equals(TERMINATE)) {
-                        LOGGER.info("Terminating!");
-                        break;
-                    }
-
-                    Thread.sleep(new Random().nextInt(100));
-
-                    for( StreamsDatum entry : process(item)) {
-                        outQueue.offer(entry);
-                    }
-
-
-            } catch (Exception e) {
-                e.printStackTrace();
-
-            }
-        }
-    }
-
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-
-        List<StreamsDatum> result = Lists.newArrayList();
-        String item;
-        try {
-            // first check for valid json
-            // since data is coming from outside provider, we don't know what type the events are
-            if( entry.getDocument() instanceof String) {
-                item = (String) entry.getDocument();
-            } else {
-                item = mapper.writeValueAsString((ObjectNode)entry.getDocument());
-            }
-
-            Class inClass = TwitterEventClassifier.detectClass(item);
-
-            User user;
-
-            if ( inClass.equals( Tweet.class )) {
-                LOGGER.debug("TWEET");
-                Tweet tweet = mapper.readValue(item, Tweet.class);
-                user = tweet.getUser();
-                result.add(new StreamsDatum(user));
-            }
-            else if ( inClass.equals( Retweet.class )) {
-                LOGGER.debug("RETWEET");
-                Retweet retweet = mapper.readValue(item, Retweet.class);
-                user = retweet.getRetweetedStatus().getUser();
-                result.add(new StreamsDatum(user));
-            } else {
-                return Lists.newArrayList();
-            }
-
-            return result;
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOGGER.warn("Error processing " + entry.toString());
-            return Lists.newArrayList();
-        }
-    }
-
-    @Override
-    public void prepare(Object o) {
-
-    }
-
-    @Override
-    public void cleanUp() {
-
-    }
-};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index 0520b0f..34b3ab1 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -20,6 +20,7 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.twitter.processor.TwitterEventProcessor;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index a91a7ec..052a360 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -2,6 +2,10 @@ package org.apache.streams.twitter.provider;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -15,13 +19,14 @@ import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-import twitter4j.Twitter;
-import twitter4j.TwitterFactory;
+import twitter4j.*;
 import twitter4j.conf.ConfigurationBuilder;
+import twitter4j.json.DataObjectFactory;
 
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Queue;
 import java.util.Random;
 import java.util.concurrent.*;
@@ -29,7 +34,9 @@ import java.util.concurrent.*;
 /**
  * Created by sblackmon on 12/10/13.
  */
-public class TwitterTimelineProvider implements StreamsProvider, Serializable, Runnable {
+public class TwitterTimelineProvider implements StreamsProvider, Serializable {
+
+    private final static String STREAMS_ID = "TwitterTimelineProvider";
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProvider.class);
 
@@ -45,11 +52,10 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable, R
         this.config = config;
     }
 
-    protected volatile BlockingQueue<String> inQueue = new LinkedBlockingQueue<String>(10000);
-
     protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
 
     protected Twitter client;
+    protected Iterator<Long> ids;
 
     ListenableFuture providerTaskComplete;
 //
@@ -92,69 +98,118 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable, R
         return this.providerQueue;
     }
 
-    public void run() {
-
-        executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
-
-        Preconditions.checkNotNull(providerQueue);
+//    public void run() {
+//
+//        LOGGER.info("{} Running", STREAMS_ID);
+//
+//        while( ids.hasNext() ) {
+//            Long currentId = ids.next();
+//            LOGGER.info("Provider Task Starting: {}", currentId);
+//            captureTimeline(currentId);
+//        }
+//
+//        LOGGER.info("{} Finished.  Cleaning up...", STREAMS_ID);
+//
+//        client.shutdown();
+//
+//        LOGGER.info("{} Exiting", STREAMS_ID);
+//
+//        while(!providerTaskComplete.isDone() && !providerTaskComplete.isCancelled() ) {
+//            try {
+//                Thread.sleep(100);
+//            } catch (InterruptedException e) {}
+//        }
+//    }
 
-        Preconditions.checkNotNull(this.klass);
+    private void captureTimeline(long currentId) {
+
+        Paging paging = new Paging(1, 200);
+        List<Status> statuses = null;
+        boolean KeepGoing = true;
+        boolean hadFailure = false;
+
+        do
+        {
+            int keepTrying = 0;
+
+            // keep trying to load, give it 5 attempts.
+            //while (keepTrying < 10)
+            while (keepTrying < 1)
+            {
+
+                try
+                {
+                    statuses = client.getUserTimeline(currentId, paging);
+
+                    for (Status tStat : statuses)
+                    {
+//                        if( provider.start != null &&
+//                                provider.start.isAfter(new DateTime(tStat.getCreatedAt())))
+//                        {
+//                            // they hit the last date we wanted to collect
+//                            // we can now exit early
+//                            KeepGoing = false;
+//                        }
+                        // emit the record
+                        String json = DataObjectFactory.getRawJSON(tStat);
+
+                        providerQueue.offer(new StreamsDatum(json));
+
+                    }
+
+                    paging.setPage(paging.getPage() + 1);
+
+                    keepTrying = 10;
+                }
+                catch(TwitterException twitterException) {
+                    keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+                }
+                catch(Exception e)
+                {
+                    hadFailure = true;
+                    keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+                }
+                finally
+                {
+                    // Shutdown the twitter to release the resources
+                    client.shutdown();
+                }
+            }
+        }
+        while ((statuses != null) && (statuses.size() > 0) && KeepGoing);
+    }
 
-        Preconditions.checkNotNull(config.getOauth().getConsumerKey());
-        Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
-        Preconditions.checkNotNull(config.getOauth().getAccessToken());
-        Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
+    public StreamsResultSet readCurrent() {
 
-        Preconditions.checkNotNull(config.getFollow());
+        Preconditions.checkArgument(ids.hasNext());
 
-        Preconditions.checkArgument(config.getEndpoint().equals("statuses/user_timeline"));
+        LOGGER.info("{} readCurrent", STREAMS_ID);
 
-        Boolean jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true);
-        Boolean includeEntitiesEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getIncludeEntities()))).or(true);
-
-        Iterator<Long> ids = config.getFollow().iterator();
         while( ids.hasNext() ) {
-            Long id = ids.next();
-
-            String baseUrl = config.getProtocol() + "://" + config.getHost() + ":" + config.getPort() + "/" + config.getVersion() + "/";
-
-            ConfigurationBuilder builder = new ConfigurationBuilder()
-                    .setOAuthConsumerKey(config.getOauth().getConsumerKey())
-                    .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
-                    .setOAuthAccessToken(config.getOauth().getAccessToken())
-                    .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
-                    .setIncludeEntitiesEnabled(includeEntitiesEnabled)
-                    .setJSONStoreEnabled(jsonStoreEnabled)
-                    .setAsyncNumThreads(3)
-                    .setRestBaseURL(baseUrl);
-
-            Twitter twitter = new TwitterFactory(builder.build()).getInstance();
-
-            providerTaskComplete = executor.submit(new TwitterTimelineProviderTask(this, twitter, id));
+            Long currentId = ids.next();
+            LOGGER.info("Provider Task Starting: {}", currentId);
+            captureTimeline(currentId);
         }
 
-        for (int i = 0; i < 1; i++) {
-            executor.submit(new TwitterEventProcessor(inQueue, providerQueue, klass));
-        }
-    }
+        LOGGER.info("{} Finished.  Cleaning up...", STREAMS_ID);
 
-    @Override
-    public StreamsResultSet readCurrent() {
-        run();
-        StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
+        StreamsResultSet result = (StreamsResultSet) ImmutableList.copyOf(Iterators.consumingIterator(providerQueue.iterator()));
+        LOGGER.info("{} providing {} docs", STREAMS_ID, providerQueue.size());
+        LOGGER.info("{} Exiting", STREAMS_ID);
         return result;
+
     }
 
-    @Override
     public StreamsResultSet readNew(BigInteger sequence) {
+        LOGGER.debug("{} readNew", STREAMS_ID);
         throw new NotImplementedException();
     }
 
-    @Override
     public StreamsResultSet readRange(DateTime start, DateTime end) {
+        LOGGER.debug("{} readRange", STREAMS_ID);
         this.start = start;
         this.end = end;
-        run();
+        readCurrent();
         StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
         return result;
     }
@@ -181,6 +236,8 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable, R
     @Override
     public void prepare(Object o) {
 
+        executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+
         Preconditions.checkNotNull(providerQueue);
 
         Preconditions.checkNotNull(this.klass);
@@ -197,33 +254,29 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable, R
         Boolean jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true);
         Boolean includeEntitiesEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getIncludeEntities()))).or(true);
 
-        Iterator<Long> ids = config.getFollow().iterator();
-        while( ids.hasNext() ) {
-            Long id = ids.next();
-
-            String baseUrl = config.getProtocol() + "://" + config.getHost() + ":" + config.getPort() + "/" + config.getVersion() + "/";
-
-            ConfigurationBuilder builder = new ConfigurationBuilder()
-                    .setOAuthConsumerKey(config.getOauth().getConsumerKey())
-                    .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
-                    .setOAuthAccessToken(config.getOauth().getAccessToken())
-                    .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
-                    .setIncludeEntitiesEnabled(includeEntitiesEnabled)
-                    .setJSONStoreEnabled(jsonStoreEnabled)
-                    .setAsyncNumThreads(3)
-                    .setRestBaseURL(baseUrl);
-
-            Twitter twitter = new TwitterFactory(builder.build()).getInstance();
-            providerTaskComplete = executor.submit(new TwitterTimelineProviderTask(this, twitter, id));
-        }
+        ids = config.getFollow().iterator();
+
+        String baseUrl = config.getProtocol() + "://" + config.getHost() + ":" + config.getPort() + "/" + config.getVersion() + "/";
+
+        ConfigurationBuilder builder = new ConfigurationBuilder()
+                .setOAuthConsumerKey(config.getOauth().getConsumerKey())
+                .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
+                .setOAuthAccessToken(config.getOauth().getAccessToken())
+                .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
+                .setIncludeEntitiesEnabled(includeEntitiesEnabled)
+                .setJSONStoreEnabled(jsonStoreEnabled)
+                .setAsyncNumThreads(3)
+                .setRestBaseURL(baseUrl);
+
+        client = new TwitterFactory(builder.build()).getInstance();
 
-        for (int i = 0; i < 1; i++) {
-            executor.submit(new TwitterEventProcessor(inQueue, providerQueue, klass));
-        }
     }
 
     @Override
     public void cleanUp() {
+
+        client.shutdown();
+
         shutdownAndAwaitTermination(executor);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
index fcab6f5..9619f4f 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
@@ -60,7 +60,7 @@ public class TwitterTimelineProviderTask implements Runnable {
                         // emit the record
                         String json = DataObjectFactory.getRawJSON(tStat);
 
-                        provider.inQueue.offer(json);
+                        //provider.offer(json);
 
                     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTypeConverter.java
deleted file mode 100644
index 0b0507d..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTypeConverter.java
+++ /dev/null
@@ -1,199 +0,0 @@
-package org.apache.streams.twitter.provider;
-
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.twitter.pojo.Delete;
-import org.apache.streams.twitter.pojo.Retweet;
-import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer;
-import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Queue;
-import java.util.Random;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class TwitterTypeConverter implements StreamsProcessor, Runnable {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTypeConverter.class);
-
-    private ObjectMapper mapper = new ObjectMapper();
-
-    private Queue<StreamsDatum> inQueue;
-    private Queue<StreamsDatum> outQueue;
-
-    private Class inClass;
-    private Class outClass;
-
-    private TwitterJsonTweetActivitySerializer twitterJsonTweetActivitySerializer = new TwitterJsonTweetActivitySerializer();
-    private TwitterJsonRetweetActivitySerializer twitterJsonRetweetActivitySerializer = new TwitterJsonRetweetActivitySerializer();
-    private TwitterJsonDeleteActivitySerializer twitterJsonDeleteActivitySerializer = new TwitterJsonDeleteActivitySerializer();
-
-    public final static String TERMINATE = new String("TERMINATE");
-
-    public TwitterTypeConverter(Class inClass, Class outClass) {
-        this.inClass = inClass;
-        this.outClass = outClass;
-    }
-
-    public Queue<StreamsDatum> getProcessorOutputQueue() {
-        return outQueue;
-    }
-
-    public void setProcessorInputQueue(Queue<StreamsDatum> inputQueue) {
-        inQueue = inputQueue;
-    }
-
-    public Object convert(ObjectNode event, Class inClass, Class outClass) {
-
-        LOGGER.debug(event.toString());
-
-        Object result = null;
-
-        if( outClass.equals( Activity.class )) {
-            if( inClass.equals( Delete.class )) {
-                LOGGER.debug("ACTIVITY DELETE");
-                result = twitterJsonDeleteActivitySerializer.convert(event);
-            } else if ( inClass.equals( Retweet.class )) {
-                LOGGER.debug("ACTIVITY RETWEET");
-                result = twitterJsonRetweetActivitySerializer.convert(event);
-            } else if ( inClass.equals( Tweet.class )) {
-                LOGGER.debug("ACTIVITY TWEET");
-                result = twitterJsonTweetActivitySerializer.convert(event);
-            } else {
-                return null;
-            }
-        } else if( outClass.equals( Tweet.class )) {
-            if ( inClass.equals( Tweet.class )) {
-                LOGGER.debug("TWEET");
-                result = mapper.convertValue(event, Tweet.class);
-            }
-        } else if( outClass.equals( Retweet.class )) {
-            if ( inClass.equals( Retweet.class )) {
-                LOGGER.debug("RETWEET");
-                result = mapper.convertValue(event, Retweet.class);
-            }
-        } else if( outClass.equals( Delete.class )) {
-            if ( inClass.equals( Delete.class )) {
-                LOGGER.debug("DELETE");
-                result = mapper.convertValue(event, Delete.class);
-            }
-        } else if( outClass.equals( ObjectNode.class )) {
-            LOGGER.debug("OBJECTNODE");
-            result = mapper.convertValue(event, ObjectNode.class);
-        }
-
-            // no supported conversion were applied
-        if( result != null )
-            return result;
-
-        LOGGER.debug("CONVERT FAILED");
-
-        return null;
-
-    }
-
-    public boolean validate(Object document, Class klass) {
-
-        // TODO
-        return true;
-    }
-
-    public boolean isValidJSON(final String json) {
-        boolean valid = false;
-        try {
-            final JsonParser parser = new ObjectMapper().getJsonFactory()
-                    .createJsonParser(json);
-            while (parser.nextToken() != null) {
-            }
-            valid = true;
-        } catch (JsonParseException jpe) {
-            LOGGER.warn("validate: {}", jpe);
-        } catch (IOException ioe) {
-            LOGGER.warn("validate: {}", ioe);
-        }
-
-        return valid;
-    }
-
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-
-        StreamsDatum result = null;
-
-        try {
-
-            Object item = entry.getDocument();
-            ObjectNode node;
-
-            if( item instanceof String ) {
-
-                // if the target is string, just pass-through
-                if( String.class.equals(outClass))
-                    outQueue.offer(entry);
-                else {
-                    // first check for valid json
-                    node = (ObjectNode)mapper.readTree((String)item);
-
-                    // since data is coming from outside provider, we don't know what type the events are
-                    Class inClass = TwitterEventClassifier.detectClass((String)item);
-
-                    Object out = convert(node, inClass, outClass);
-
-                    if( out != null && validate(out, outClass))
-                        result = new StreamsDatum(out);
-                }
-
-            } else if( item instanceof ObjectNode ) {
-
-                // first check for valid json
-                node = (ObjectNode)mapper.valueToTree(item);
-
-                // since data is coming from outside provider, we don't know what type the events are
-                Class inClass = TwitterEventClassifier.detectClass((String)item);
-
-                Object out = convert(node, inClass, outClass);
-
-                if( out != null && validate(out, outClass))
-                    result = new StreamsDatum(out);
-
-            }
-
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-
-        if( result != null )
-            return Lists.newArrayList(result);
-        else
-            return Lists.newArrayList();
-    }
-
-    @Override
-    public void prepare(Object o) {
-
-    }
-
-    @Override
-    public void cleanUp() {
-
-    }
-
-    @Override
-    public void run() {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java b/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java
new file mode 100644
index 0000000..213b09d
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java
@@ -0,0 +1,23 @@
+package org.apache.streams.core;
+
+import java.io.Serializable;
+
+/**
+ *
+ */
+public interface StreamsOperation extends Serializable {
+
+    /**
+     * This method will be called after initialization/serialization. Initialize any non-serializable objects here.
+     * @param configurationObject Any object to help intialize the operation. ie. Map, JobContext, Properties, etc. The type
+     *                            will be based on where the operation is being run (ie. hadoop, storm, locally, etc.)
+     */
+    public void prepare(Object configurationObject);
+
+    /**
+     * No guarantee that this method will ever be called.  But upon shutdown of the stream, an attempt to call this method
+     * will be made.
+     * Use this method to terminate connections, etc.
+     */
+    public void cleanUp();
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-core/src/main/java/org/apache/streams/core/builders/InvalidStreamException.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/builders/InvalidStreamException.java b/streams-core/src/main/java/org/apache/streams/core/builders/InvalidStreamException.java
new file mode 100644
index 0000000..43c5a4c
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/builders/InvalidStreamException.java
@@ -0,0 +1,23 @@
+package org.apache.streams.core.builders;
+
+/**
+ * Exception that indicates a malformed data stream in some way.
+ */
+public class InvalidStreamException extends RuntimeException {
+
+    public InvalidStreamException() {
+        super();
+    }
+
+    public InvalidStreamException(String s) {
+        super(s);
+    }
+
+    public InvalidStreamException(String s, Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public InvalidStreamException(Throwable throwable) {
+        super(throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java b/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
new file mode 100644
index 0000000..76e925f
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
@@ -0,0 +1,256 @@
+package org.apache.streams.core.builders;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.tasks.StreamsProviderTask;
+import org.apache.streams.core.tasks.StreamsTask;
+import org.apache.streams.util.SerializationUtil;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link org.apache.streams.core.builders.StreamBuilder} implementation to run a data processing stream in a single
+ * JVM across many threads.  Depending on your data stream, the JVM heap may need to be set to a high value. Default
+ * implementation uses unbound {@link java.util.concurrent.ConcurrentLinkedQueue} to connect stream components.
+ */
+public class LocalStreamBuilder implements StreamBuilder{
+
+    private Map<String, StreamComponent> providers;
+    private Map<String, StreamComponent> components;
+    private Queue<StreamsDatum> queue;
+    private Map<String, Object> streamConfig;
+    private ExecutorService executor;
+    private int totalTasks;
+
+    /**
+     *
+     */
+    public LocalStreamBuilder(){
+        this(new ConcurrentLinkedQueue<StreamsDatum>(), null);
+    }
+
+    /**
+     *
+     * @param streamConfig
+     */
+    public LocalStreamBuilder(Map<String, Object> streamConfig) {
+        this(new ConcurrentLinkedQueue<StreamsDatum>(), streamConfig);
+    }
+
+    /**
+     *
+     * @param queueType
+     */
+    public LocalStreamBuilder(Queue<StreamsDatum> queueType) {
+        this(queueType, null);
+    }
+
+    /**
+     *
+     * @param queueType
+     * @param streamConfig
+     */
+    public LocalStreamBuilder(Queue<StreamsDatum> queueType, Map<String, Object> streamConfig) {
+        this.queue = queueType;
+        this.providers = new HashMap<String, StreamComponent>();
+        this.components = new HashMap<String, StreamComponent>();
+        this.streamConfig = streamConfig;
+        this.totalTasks = 0;
+    }
+
+    @Override
+    public StreamBuilder newReadCurrentStream(String id, StreamsProvider provider) {
+        validateId(id);
+        this.providers.put(id, new StreamComponent(id, provider));
+        ++this.totalTasks;
+        return this;
+    }
+
+    @Override
+    public StreamBuilder newReadNewStream(String id, StreamsProvider provider, BigInteger sequence) {
+        validateId(id);
+        this.providers.put(id, new StreamComponent(id, provider, sequence));
+        ++this.totalTasks;
+        return this;
+    }
+
+    @Override
+    public StreamBuilder newReadRangeStream(String id, StreamsProvider provider, DateTime start, DateTime end) {
+        validateId(id);
+        this.providers.put(id, new StreamComponent(id, provider, start, end));
+        ++this.totalTasks;
+        return this;
+    }
+
+    @Override
+    public StreamBuilder addStreamsProcessor(String id, StreamsProcessor processor, int numTasks, String... inBoundIds) {
+        validateId(id);
+        StreamComponent comp = new StreamComponent(id, processor, cloneQueue(), numTasks);
+        this.components.put(id, comp);
+        connectToOtherComponents(inBoundIds, comp);
+        this.totalTasks += numTasks;
+        return this;
+    }
+
+    @Override
+    public StreamBuilder addStreamsPersistWriter(String id, StreamsPersistWriter writer, int numTasks, String... inBoundIds) {
+        validateId(id);
+        StreamComponent comp = new StreamComponent(id, writer, cloneQueue(), numTasks);
+        this.components.put(id, comp);
+        connectToOtherComponents(inBoundIds, comp);
+        this.totalTasks += numTasks;
+        return this;
+    }
+
+    /**
+     * Runs the data stream in the this JVM and blocks till completion.
+     */
+    @Override
+    public void start() {
+        boolean isRunning = true;
+        this.executor = Executors.newFixedThreadPool(this.totalTasks);
+        Map<String, StreamsProviderTask> provTasks = new HashMap<String, StreamsProviderTask>();
+        Map<String, List<StreamsTask>> streamsTasks = new HashMap<String, List<StreamsTask>>();
+        try {
+            for(StreamComponent comp : this.components.values()) {
+                int tasks = comp.getNumTasks();
+                List<StreamsTask> compTasks = new LinkedList<StreamsTask>();
+                for(int i=0; i < tasks; ++i) {
+                    StreamsTask task = comp.createConnectedTask();
+                    task.setStreamConfig(this.streamConfig);
+                    this.executor.submit(task);
+                    compTasks.add(task);
+                }
+                streamsTasks.put(comp.getId(), compTasks);
+            }
+            for(StreamComponent prov : this.providers.values()) {
+                StreamsTask task = prov.createConnectedTask();
+                task.setStreamConfig(this.streamConfig);
+                this.executor.submit(task);
+                provTasks.put(prov.getId(), (StreamsProviderTask) task);
+            }
+
+            while(isRunning) {
+                isRunning = false;
+                for(StreamsProviderTask task : provTasks.values()) {
+                    isRunning = isRunning || task.isRunning();
+                }
+                if(isRunning) {
+                    Thread.sleep(10000);
+                }
+            }
+            this.executor.shutdown();
+            //complete stream shut down gracfully 
+            for(StreamComponent prov : this.providers.values()) {
+                shutDownTask(prov, streamsTasks);
+            }
+            //need to make this configurable
+            if(!this.executor.awaitTermination(10, TimeUnit.SECONDS)) { // all threads should have terminated already.
+                this.executor.shutdownNow();
+                this.executor.awaitTermination(10, TimeUnit.SECONDS);
+            }
+        } catch (InterruptedException e){
+            //give the stream 30secs to try to shutdown gracefully, then force shutdown otherwise
+            for(List<StreamsTask> tasks : streamsTasks.values()) {
+                for(StreamsTask task : tasks) {
+                    task.stopTask();
+                }
+            }
+            this.executor.shutdown();
+            try {
+                if(!this.executor.awaitTermination(30, TimeUnit.SECONDS)){
+                    this.executor.shutdownNow();
+                }
+            }catch (InterruptedException ie) {
+                this.executor.shutdownNow();
+                throw new RuntimeException(ie);
+            }
+        }
+
+    }
+
+    /**
+     * Shutsdown the running tasks in sudo depth first search kind of way. Checks that the upstream components have
+     * finished running before shutting down. Waits till inbound queue is empty to shutdown.
+     * @param comp StreamComponent to shut down.
+     * @param streamTasks the list of non-StreamsProvider tasks for this stream.
+     * @throws InterruptedException
+     */
+    private void shutDownTask(StreamComponent comp, Map<String, List<StreamsTask>> streamTasks) throws InterruptedException {
+        List<StreamsTask> tasks = streamTasks.get(comp.getId());
+        if(tasks != null) { //not a StreamProvider
+            boolean parentsShutDown = true;
+            for(StreamComponent parent : comp.getUpStreamComponents()) {
+                List<StreamsTask> parentTasks = streamTasks.get(parent.getId());
+                //if parentTask == null, its a provider and is not running anymore
+                if(parentTasks != null) {
+                    for(StreamsTask task : parentTasks) {
+                        parentsShutDown = parentsShutDown && !task.isRunning();
+                    }
+                }
+            }
+            if(parentsShutDown) {
+                for(StreamsTask task : tasks) {
+                    task.stopTask();
+                }
+                for(StreamsTask task : tasks) {
+                    while(task.isRunning()) {
+                        Thread.sleep(500);
+                    }
+                }
+            }
+        }
+        Collection<StreamComponent> children = comp.getDownStreamComponents();
+        if(children != null) {
+            for(StreamComponent child : comp.getDownStreamComponents()) {
+                shutDownTask(child, streamTasks);
+            }
+        }
+    }
+
+    /**
+     * NOT IMPLEMENTED.
+     */
+    @Override
+    public void stop() {
+
+    }
+
+    private void connectToOtherComponents(String[] conntectToIds, StreamComponent toBeConnected) {
+        for(String id : conntectToIds) {
+            StreamComponent upStream = null;
+            if(this.providers.containsKey(id)) {
+                upStream = this.providers.get(id);
+            }
+            else if(this.components.containsKey(id)) {
+                upStream = this.components.get(id);
+            }
+            else {
+                throw new InvalidStreamException("Cannot connect to id, "+id+", because id does not exist.");
+            }
+            upStream.addOutBoundQueue(toBeConnected, toBeConnected.getInBoundQueue());
+            toBeConnected.addInboundQueue(upStream);
+        }
+    }
+
+    private void validateId(String id) {
+        if(this.providers.containsKey(id) || this.components.containsKey(id)) {
+            throw new InvalidStreamException("Duplicate id. "+id+" is already assigned to another component");
+        }
+    }
+
+
+    private Queue<StreamsDatum> cloneQueue() {
+        return (Queue<StreamsDatum>)SerializationUtil.cloneBySerialization(this.queue);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-core/src/main/java/org/apache/streams/core/builders/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/builders/StreamBuilder.java b/streams-core/src/main/java/org/apache/streams/core/builders/StreamBuilder.java
new file mode 100644
index 0000000..918eb7a
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/builders/StreamBuilder.java
@@ -0,0 +1,97 @@
+package org.apache.streams.core.builders;
+
+import org.apache.streams.core.*;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.Queue;
+
+/**
+ * Interface for building data streams.
+ *
+ * <pre>
+ *     StreamBuilder builder = ...
+ *     builder.newReadCurrentStream(. . .)
+ *            .addStreamsProcessor(. . .)
+ *            ...
+ *            .addStreamsPersistWriter(. . .)
+ *     builder.run();
+ * </pre>
+ *
+ */
+public interface StreamBuilder {
+
+
+    /**
+     * Add a {@link org.apache.streams.core.StreamsProcessor} to the data processing stream.
+     * @param processorId unique id for this processor - must be unique across the entire stream
+     * @param processor the processor to execute
+     * @param numTasks the number of instances of this processor to run concurrently
+     * @param connectToIds the ids of the {@link org.apache.streams.core.StreamsOperation} that this process will
+     *                     receive data from.
+     * @return this
+     */
+    public StreamBuilder addStreamsProcessor(String processorId, StreamsProcessor processor, int numTasks, String... connectToIds);
+
+    /**
+     * Add a {@link org.apache.streams.core.StreamsPersistWriter} to the data processing stream.
+     * @param persistWriterId unique id for this processor - must be unique across the entire stream
+     * @param writer the writer to execute
+     * @param numTasks the number of instances of this writer to run concurrently
+     * @param connectToIds the ids of the {@link org.apache.streams.core.StreamsOperation} that this process will
+     *                     receive data from.
+     * @return this
+     */
+    public StreamBuilder addStreamsPersistWriter(String persistWriterId, StreamsPersistWriter writer, int numTasks, String... connectToIds);
+
+    /**
+     * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream.  The provider will execute
+     * {@link org.apache.streams.core.StreamsProvider:readCurrent()} to produce data.
+     * @param streamId unique if for this provider - must be unique across the entire stream.
+     * @param provider provider to execute
+     * @return this
+     */
+    public StreamBuilder newReadCurrentStream(String streamId, StreamsProvider provider);
+
+    /**
+     * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream.  The provider will execute
+     * {@link org.apache.streams.core.StreamsProvider:readNext(BigInteger)} to produce data.
+     * @param streamId unique if for this provider - must be unique across the entire stream.
+     * @param provider provider to execute
+     * @param sequence sequence to pass to {@link org.apache.streams.core.StreamsProvider:readNext(BigInteger)} method
+     * @return this
+     */
+    public StreamBuilder newReadNewStream(String streamId, StreamsProvider provider, BigInteger sequence);
+
+    /**
+     * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream.  The provider will execute
+     * {@link org.apache.streams.core.StreamsProvider:readRange(DateTime, DateTime)} to produce data. Whether the start
+     * and end dates are inclusive or exclusive is up to the implementation.
+     * @param streamId unique if for this provider - must be unique across the entire stream.
+     * @param provider provider to execute
+     * @param start start date
+     * @param end end date
+     * @return this
+     */
+    public StreamBuilder newReadRangeStream(String streamId, StreamsProvider provider, DateTime start, DateTime end);
+
+    /**
+     * Builds the stream, and starts it or submits it based on implementation.
+     */
+    public void start();
+
+    /**
+     * Stops the streams processing.  No guarantee on a smooth shutdown. Optional method, may not be implemented in
+     * all cases.
+     */
+    public void stop();
+
+
+
+
+
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java b/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java
new file mode 100644
index 0000000..2f1b14c
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java
@@ -0,0 +1,217 @@
+package org.apache.streams.core.builders;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.tasks.StreamsPersistWriterTask;
+import org.apache.streams.core.tasks.StreamsProcessorTask;
+import org.apache.streams.core.tasks.StreamsProviderTask;
+import org.apache.streams.core.tasks.StreamsTask;
+import org.apache.streams.util.SerializationUtil;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.*;
+
+/**
+ * Stores the implementations of {@link org.apache.streams.core.StreamsOperation}, the StreamsOperations it is connected
+ * to and the necessary metadata to construct a data stream.
+ */
+public class StreamComponent {
+
+    private static final int START = 1;
+    private static final int END = 2;
+
+    private String id;
+    private Set<StreamComponent> inBound;
+    private Map<StreamComponent, Queue<StreamsDatum>> outBound;
+    private Queue<StreamsDatum> inQueue;
+    private StreamsProvider provider;
+    private StreamsProcessor processor;
+    private StreamsPersistWriter writer;
+    private DateTime[] dateRange;
+    private BigInteger sequence;
+    private int numTasks = 1;
+
+    /**
+     *
+     * @param id
+     * @param provider
+     */
+    public StreamComponent(String id, StreamsProvider provider) {
+        this.id = id;
+        this.provider = provider;
+        initializePrivateVariables();
+    }
+
+    /**
+     *
+     * @param id
+     * @param provider
+     * @param start
+     * @param end
+     */
+    public StreamComponent(String id, StreamsProvider provider, DateTime start, DateTime end) {
+        this.id = id;
+        this.provider = provider;
+        this.dateRange = new DateTime[2];
+        this.dateRange[START] = start;
+        this.dateRange[END] = end;
+        initializePrivateVariables();
+    }
+
+
+    /**
+     *
+     * @param id
+     * @param provider
+     * @param sequence
+     */
+    public StreamComponent(String id, StreamsProvider provider, BigInteger sequence) {
+        this.id = id;
+        this.provider = provider;
+        this.sequence = sequence;
+    }
+
+    /**
+     *
+     * @param id
+     * @param processor
+     * @param inQueue
+     * @param numTasks
+     */
+    public StreamComponent(String id, StreamsProcessor processor, Queue<StreamsDatum> inQueue, int numTasks) {
+        this.id = id;
+        this.processor = processor;
+        this.inQueue = inQueue;
+        this.numTasks = numTasks;
+        initializePrivateVariables();
+    }
+
+    /**
+     *
+     * @param id
+     * @param writer
+     * @param inQueue
+     * @param numTasks
+     */
+    public StreamComponent(String id, StreamsPersistWriter writer, Queue<StreamsDatum> inQueue, int numTasks) {
+        this.id = id;
+        this.writer = writer;
+        this.inQueue = inQueue;
+        this.numTasks = numTasks;
+        initializePrivateVariables();
+    }
+
+    private void initializePrivateVariables() {
+        this.inBound = new HashSet<StreamComponent>();
+        this.outBound = new HashMap<StreamComponent, Queue<StreamsDatum>>();
+    }
+
+    /**
+     * Add an outbound queue for this component. The queue should be an inbound queue of a downstream component.
+     * @param component the component that this supplying their inbound queue
+     * @param queue the queue to to put post processed/provided datums on
+     */
+    public void addOutBoundQueue(StreamComponent component, Queue<StreamsDatum> queue) {
+        this.outBound.put(component, queue);
+    }
+
+    /**
+     * Add a component that supplies data through the inbound queue.
+     * @param component that supplies data through the inbound queue
+     */
+    public void addInboundQueue(StreamComponent component) {
+        this.inBound.add(component);
+    }
+
+    /**
+     * The components that are immediately downstream of this component (aka child nodes)
+     * @return Collection of child nodes of this component
+     */
+    public Collection<StreamComponent> getDownStreamComponents() {
+        return this.outBound.keySet();
+    }
+
+    /**
+     * The components that are immediately upstream of this component (aka parent nodes)
+     * @return Collection of parent nodes of this component
+     */
+    public Collection<StreamComponent> getUpStreamComponents() {
+        return this.inBound;
+    }
+
+    /**
+     * The inbound queue for this component
+     * @return inbound queue
+     */
+    public Queue<StreamsDatum> getInBoundQueue() {
+        return this.inQueue;
+    }
+
+    /**
+     * The number of tasks this to run this component
+     * @return
+     */
+    public int getNumTasks() {
+        return this.numTasks;
+    }
+
+    /**
+     * Creates a {@link org.apache.streams.core.tasks.StreamsTask} that is running a clone of this component whose
+     * inbound and outbound queues are appropriately connected to the parent and child nodes.
+     * @return StreamsTask for this component
+     */
+    public StreamsTask createConnectedTask() {
+        StreamsTask task;
+        if(this.processor != null) {
+            task =  new StreamsProcessorTask((StreamsProcessor)SerializationUtil.cloneBySerialization(this.processor));
+            task.addInputQueue(this.inQueue);
+            for(Queue<StreamsDatum> q : this.outBound.values()) {
+                task.addOutputQueue(q);
+            }
+        }
+        else if(this.writer != null) {
+            task = new StreamsPersistWriterTask((StreamsPersistWriter) SerializationUtil.cloneBySerialization(this.writer));
+            task.addInputQueue(this.inQueue);
+        }
+        else if(this.provider != null) {
+            StreamsProvider prov = (StreamsProvider)SerializationUtil.cloneBySerialization(this.provider);
+            if(this.dateRange == null && this.sequence == null)
+                task = new StreamsProviderTask(prov);
+            else if(this.sequence != null)
+                task = new StreamsProviderTask(prov, this.sequence);
+            else
+                task = new StreamsProviderTask(prov, this.dateRange[0], this.dateRange[1]);
+            for(Queue<StreamsDatum> q : this.outBound.values()) {
+                task.addOutputQueue(q);
+            }
+        }
+        else {
+            throw new InvalidStreamException("Underlying StreamComponoent was NULL.");
+        }
+        return task;
+    }
+
+    /**
+     * The unique of this component
+     * @return
+     */
+    public String getId() {
+        return this.id;
+    }
+
+    @Override
+    public int hashCode() {
+        return this.id.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if(o instanceof StreamComponent)
+            return this.id.equals(((StreamComponent) o).id);
+        else
+            return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java b/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java
new file mode 100644
index 0000000..8c275bf
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java
@@ -0,0 +1,89 @@
+package org.apache.streams.core.tasks;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.util.SerializationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ *
+ */
+public abstract class BaseStreamsTask implements StreamsTask {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BaseStreamsTask.class);
+
+    private List<Queue<StreamsDatum>> inQueues = new ArrayList<Queue<StreamsDatum>>();
+    private List<Queue<StreamsDatum>> outQueues = new LinkedList<Queue<StreamsDatum>>();
+    private int inIndex = 0;
+
+    @Override
+    public void addInputQueue(Queue<StreamsDatum> inputQueue) {
+        this.inQueues.add(inputQueue);
+    }
+
+    @Override
+    public void addOutputQueue(Queue<StreamsDatum> outputQueue) {
+        this.outQueues.add(outputQueue);
+    }
+
+    @Override
+    public List<Queue<StreamsDatum>> getInputQueues() {
+        return this.inQueues;
+    }
+
+    @Override
+    public List<Queue<StreamsDatum>> getOutputQueues() {
+        return this.outQueues;
+    }
+
+    /**
+     * NOTE NECCESSARY AT THE MOMENT.  MAY BECOME NECESSARY AS WE LOOK AT MAKING JOIN TASKS. CURRENTLY ALL TASK HAVE MAX
+     * OF 1 INPUT QUEUE.
+     * Round Robins through input queues to get the next StreamsDatum. If all input queues are empty, it will return null.
+     * @return the next StreamsDatum or null if all input queues are empty.
+     */
+    protected StreamsDatum getNextDatum() {
+        int startIndex = this.inIndex;
+        int index = startIndex;
+        StreamsDatum datum = null;
+        do {
+            datum = this.inQueues.get(index).poll();
+            index = getNextInputQueueIndex();
+        } while( datum == null && startIndex != index);
+        return datum;
+    }
+
+    /**
+     * Adds a StreamDatum to the outgoing queues.  If there are multiple queues, it uses serialization to create
+     * clones of the datum and adds a new clone to each queue.
+     * @param datum
+     */
+    protected void addToOutgoingQueue(StreamsDatum datum) {
+        if(this.outQueues.size() == 1) {
+            this.outQueues.get(0).offer(datum);
+        }
+        else {
+            for(Queue<StreamsDatum> queue : this.outQueues) {
+                try {
+                    queue.offer((StreamsDatum) SerializationUtil.deserialize(SerializationUtil.serialize(datum)));
+                } catch (RuntimeException e) {
+                    LOGGER.debug("Failed to add StreamsDatum to outgoing queue : {}", datum);
+                    LOGGER.error("Exception while offering StreamsDatum to outgoing queue: {}", e);
+                }
+            }
+        }
+    }
+
+    private int getNextInputQueueIndex() {
+        ++this.inIndex;
+        if(this.inIndex >= this.inQueues.size()) {
+            this.inIndex = 0;
+        }
+        return this.inIndex;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsMergeTask.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsMergeTask.java b/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsMergeTask.java
new file mode 100644
index 0000000..f3ad0cc
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsMergeTask.java
@@ -0,0 +1,58 @@
+package org.apache.streams.core.tasks;
+
+import org.apache.streams.core.StreamsDatum;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * NOT USED.  When joins/partions are implemented, a similar pattern could be followed. Done only as basic proof
+ * of concept.
+ */
+public class StreamsMergeTask extends BaseStreamsTask {
+
+    private AtomicBoolean keepRunning;
+    private long sleepTime;
+
+    public StreamsMergeTask() {
+        this(DEFAULT_SLEEP_TIME_MS);
+    }
+
+    public StreamsMergeTask(long sleepTime) {
+        this.sleepTime = sleepTime;
+        this.keepRunning = new AtomicBoolean(true);
+    }
+
+
+    @Override
+    public void stopTask() {
+        this.keepRunning.set(false);
+    }
+
+    @Override
+    public void setStreamConfig(Map<String, Object> config) {
+
+    }
+
+    @Override
+    public boolean isRunning() {
+        return false;
+    }
+
+    @Override
+    public void run() {
+        while(this.keepRunning.get()) {
+            StreamsDatum datum = super.getNextDatum();
+            if(datum != null) {
+                super.addToOutgoingQueue(datum);
+            }
+            else {
+                try {
+                    Thread.sleep(this.sleepTime);
+                } catch (InterruptedException e) {
+                    this.keepRunning.set(false);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsPersistWriterTask.java b/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsPersistWriterTask.java
new file mode 100644
index 0000000..1a701a7
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsPersistWriterTask.java
@@ -0,0 +1,103 @@
+package org.apache.streams.core.tasks;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ */
+public class StreamsPersistWriterTask extends BaseStreamsTask {
+
+
+
+    private StreamsPersistWriter writer;
+    private long sleepTime;
+    private AtomicBoolean keepRunning;
+    private Map<String, Object> streamConfig;
+    private Queue<StreamsDatum> inQueue;
+    private AtomicBoolean isRunning;
+
+    /**
+     * Default constructor.  Uses default sleep of 500ms when inbound queue is empty.
+     * @param writer writer to execute in task
+     */
+    public StreamsPersistWriterTask(StreamsPersistWriter writer) {
+        this(writer, DEFAULT_SLEEP_TIME_MS);
+    }
+
+    /**
+     *
+     * @param writer writer to execute in task
+     * @param sleepTime time to sleep when inbound queue is empty.
+     */
+    public StreamsPersistWriterTask(StreamsPersistWriter writer, long sleepTime) {
+        this.writer = writer;
+        this.sleepTime = sleepTime;
+        this.keepRunning = new AtomicBoolean(true);
+        this.isRunning = new AtomicBoolean(true);
+    }
+
+    @Override
+    public void setStreamConfig(Map<String, Object> config) {
+        this.streamConfig = config;
+    }
+
+    @Override
+    public void addInputQueue(Queue<StreamsDatum> inputQueue) {
+        this.inQueue = inputQueue;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return this.isRunning.get();
+    }
+
+    @Override
+    public void run() {
+        try {
+            this.writer.prepare(this.streamConfig);
+            StreamsDatum datum = this.inQueue.poll();
+            while(datum != null || this.keepRunning.get()) {
+                if(datum != null) {
+                    this.writer.write(datum);
+                }
+                else {
+                    try {
+                        Thread.sleep(this.sleepTime);
+                    } catch (InterruptedException e) {
+                        this.keepRunning.set(false);
+                    }
+                }
+                datum = this.inQueue.poll();
+            }
+
+        } finally {
+            this.writer.cleanUp();
+            this.isRunning.set(false);
+        }
+    }
+
+    @Override
+    public void stopTask() {
+        this.keepRunning.set(false);
+    }
+
+
+    @Override
+    public void addOutputQueue(Queue<StreamsDatum> outputQueue) {
+        throw new UnsupportedOperationException(this.getClass().getName()+" does not support method - setOutputQueue()");
+    }
+
+    @Override
+    public List<Queue<StreamsDatum>> getInputQueues() {
+        List<Queue<StreamsDatum>> queues = new LinkedList<Queue<StreamsDatum>>();
+        queues.add(this.inQueue);
+        return queues;
+    }
+}