You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/02/26 20:15:43 UTC

svn commit: r1572204 - in /incubator/streams/branches/STREAMS-26: streams-contrib/ streams-contrib/streams-persist-elasticsearch/ streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ streams-contrib/streams-pers...

Author: sblackmon
Date: Wed Feb 26 19:15:42 2014
New Revision: 1572204

URL: http://svn.apache.org/r1572204
Log:
adding uncommitted core classes, and updates to twitter, es, mongo, hdfs

Added:
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
    incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java
    incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/
    incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/InvalidStreamException.java
    incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
    incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamBuilder.java
    incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java
    incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/
    incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java
    incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsMergeTask.java
    incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsPersistWriterTask.java
    incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProcessorTask.java
    incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProviderTask.java
    incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsTask.java
Removed:
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriterTask.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProfileProcessor.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTypeConverter.java
Modified:
    incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml?rev=1572204&r1=1572203&r2=1572204&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml Wed Feb 26 19:15:42 2014
@@ -40,6 +40,7 @@
         <module>streams-persist-elasticsearch</module>
         <module>streams-persist-hdfs</module>
         <module>streams-persist-kafka</module>
+        <module>streams-persist-mongo</module>
         <!--<module>streams-provider-datasift</module>-->
         <!--<module>streams-provider-facebook</module>-->
         <!--<module>streams-provider-gnip</module>-->

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml?rev=1572204&r1=1572203&r2=1572204&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml Wed Feb 26 19:15:42 2014
@@ -74,6 +74,7 @@
                     <generateBuilders>true</generateBuilders>
                     <sourcePaths>
                         <sourcePath>src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchConfiguration.json</sourcePath>
+                        <sourcePath>src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json</sourcePath>
                     </sourcePaths>
                     <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
                     <targetPackage>org.apache.streams.elasticsearch.pojo</targetPackage>

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java?rev=1572204&r1=1572203&r2=1572204&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java Wed Feb 26 19:15:42 2014
@@ -116,6 +116,10 @@ public class ElasticsearchPersistWriter 
         this.config = ElasticsearchConfigurator.detectConfiguration(config);
     }
 
+    public ElasticsearchPersistWriter(ElasticsearchConfiguration config) {
+        this.config = config;
+    }
+
     private static final int  BYTES_IN_MB = 1024*1024;
     private static final int  BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
     private volatile int  totalByteCount = 0;

Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java?rev=1572204&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java Wed Feb 26 19:15:42 2014
@@ -0,0 +1,150 @@
+package org.apache.streams.elasticsearch;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.action.percolate.PercolateResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * References:
+ * Some helpful references to help
+ * Purpose              URL
+ * -------------        ----------------------------------------------------------------
+ * [Status Codes]       http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
+ * [Test Cases]         http://greenbytes.de/tech/tc/httpredirects/
+ * [t.co behavior]      https://dev.twitter.com/docs/tco-redirection-behavior
+ */
+
+public class PercolateProcessor implements StreamsProcessor, Runnable
+{
+    private final static Logger LOGGER = LoggerFactory.getLogger(PercolateProcessor.class);
+
+    private ObjectMapper mapper = new ObjectMapper();
+
+    protected Queue<StreamsDatum> inQueue;
+    protected Queue<StreamsDatum> outQueue;
+
+    private ElasticsearchWriterConfiguration config;
+    private ElasticsearchClientManager manager;
+
+    public PercolateProcessor(Queue<StreamsDatum> inQueue) {
+        this.inQueue = inQueue;
+        this.outQueue = new LinkedBlockingQueue<StreamsDatum>();
+    }
+
+    public ElasticsearchClientManager getManager() {
+        return manager;
+    }
+
+    public void setManager(ElasticsearchClientManager manager) {
+        this.manager = manager;
+    }
+
+    public ElasticsearchWriterConfiguration getConfig() {
+        return config;
+    }
+
+    public void setConfig(ElasticsearchWriterConfiguration config) {
+        this.config = config;
+    }
+
+    public void start() {
+        Preconditions.checkNotNull(config);
+        Preconditions.checkNotNull(manager);
+        Preconditions.checkNotNull(manager.getClient());
+    }
+
+    public void stop() {
+
+    }
+
+    public Queue<StreamsDatum> getProcessorOutputQueue() {
+        return outQueue;
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+
+        List<StreamsDatum> result = Lists.newArrayList();
+
+        String json;
+        ObjectNode node;
+        // first check for valid json
+        if( entry.getDocument() instanceof String ) {
+            json = (String) entry.getDocument();
+            try {
+                node = (ObjectNode) mapper.readTree(json);
+            } catch (IOException e) {
+                e.printStackTrace();
+                return null;
+            }
+        } else {
+            node = (ObjectNode) entry.getDocument();
+            json = node.asText();
+        }
+
+        PercolateResponse response = manager.getClient().preparePercolate(config.getIndex(), config.getType()).setSource(json).execute().actionGet();
+
+        ArrayNode tagArray = JsonNodeFactory.instance.arrayNode();
+
+        for( String match : response.getMatches())
+            tagArray.add(match);
+
+        // need utility methods for get / create specific node
+        ObjectNode extensions = (ObjectNode) node.get("extensions");
+        ObjectNode w2o = (ObjectNode) extensions.get("w2o");
+        w2o.put("tags", tagArray);
+
+        result.add(entry);
+
+        return result;
+
+    }
+
+    @Override
+    public void prepare(Object o) {
+        start();
+    }
+
+    @Override
+    public void cleanUp() {
+        stop();
+    }
+
+    @Override
+    public void run() {
+
+        while(true) {
+            StreamsDatum item;
+            try {
+                item = inQueue.poll();
+
+                Thread.sleep(new Random().nextInt(100));
+
+                for( StreamsDatum entry : process(item)) {
+                    outQueue.offer(entry);
+                }
+
+
+            } catch (Exception e) {
+                e.printStackTrace();
+
+            }
+        }
+    }
+}
\ No newline at end of file

Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json?rev=1572204&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json Wed Feb 26 19:15:42 2014
@@ -0,0 +1,18 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration",
+    "extends": {"$ref":"ElasticsearchConfiguration.json"},
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "index": {
+            "type": "string",
+            "description": "Index to write to"
+        },
+        "type": {
+            "type": "string",
+            "description": "Type to write as"
+        }
+    }
+}
\ No newline at end of file

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java?rev=1572204&r1=1572203&r2=1572204&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java Wed Feb 26 19:15:42 2014
@@ -18,6 +18,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.streams.hdfs.HdfsConfiguration;
 
+import java.io.Closeable;
+import java.io.Flushable;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.net.URI;
@@ -26,7 +28,7 @@ import java.security.PrivilegedException
 import java.util.*;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-public class WebHdfsPersistWriter implements StreamsPersistWriter, Runnable
+public class WebHdfsPersistWriter implements StreamsPersistWriter, Runnable, Flushable, Closeable
 {
     private final static Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistWriter.class);
 
@@ -300,13 +302,13 @@ public class WebHdfsPersistWriter implem
         }
     }
 
-    public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
-        this.persistQueue = persistQueue;
-    }
-
-    public Queue<StreamsDatum> getPersistQueue() {
-        return persistQueue;
-    }
+//    public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+//        this.persistQueue = persistQueue;
+//    }
+//
+//    public Queue<StreamsDatum> getPersistQueue() {
+//        return persistQueue;
+//    }
 
 
     @Override

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java?rev=1572204&r1=1572203&r2=1572204&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java Wed Feb 26 19:15:42 2014
@@ -20,7 +20,7 @@ public class WebHdfsPersistWriterTask im
     public void run() {
 
         while(true) {
-            if( writer.getPersistQueue().peek() != null ) {
+            if( writer.persistQueue.peek() != null ) {
                 try {
                     StreamsDatum entry = writer.persistQueue.remove();
                     writer.write(entry);

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java?rev=1572204&r1=1572203&r2=1572204&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java Wed Feb 26 19:15:42 2014
@@ -21,6 +21,7 @@ import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Queue;
+import java.util.Random;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 public class MongoPersistWriter implements StreamsPersistWriter, Runnable
@@ -110,14 +111,12 @@ public class MongoPersistWriter implemen
         client.cleanCursors(true);
     }
 
-    @Override
     public void start() {
 
         connectToMongo();
 
     }
 
-    @Override
     public void stop() {
 
         try {
@@ -132,32 +131,39 @@ public class MongoPersistWriter implemen
         }
     }
 
-    @Override
     public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
         this.persistQueue = persistQueue;
     }
 
-    @Override
     public Queue<StreamsDatum> getPersistQueue() {
         return persistQueue;
     }
 
-
-    @Override
     public void run() {
 
-        start();
+        while(true) {
+            if( persistQueue.peek() != null ) {
+                try {
+                    StreamsDatum entry = persistQueue.remove();
+                    write(entry);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+            try {
+                Thread.sleep(new Random().nextInt(1));
+            } catch (InterruptedException e) {}
+        }
 
-        Thread task = new Thread(new MongoPersistWriterTask(this));
-        task.start();
+    }
 
-        try {
-            task.join();
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-            return;
-        }
+    @Override
+    public void prepare(Object configurationObject) {
+        start();
+    }
 
+    @Override
+    public void cleanUp() {
         stop();
     }
 }

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java?rev=1572204&r1=1572203&r2=1572204&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java Wed Feb 26 19:15:42 2014
@@ -20,6 +20,7 @@ import org.apache.streams.core.StreamsDa
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.twitter.processor.TwitterEventProcessor;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java?rev=1572204&r1=1572203&r2=1572204&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java Wed Feb 26 19:15:42 2014
@@ -2,6 +2,10 @@ package org.apache.streams.twitter.provi
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -15,13 +19,14 @@ import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-import twitter4j.Twitter;
-import twitter4j.TwitterFactory;
+import twitter4j.*;
 import twitter4j.conf.ConfigurationBuilder;
+import twitter4j.json.DataObjectFactory;
 
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Queue;
 import java.util.Random;
 import java.util.concurrent.*;
@@ -29,7 +34,9 @@ import java.util.concurrent.*;
 /**
  * Created by sblackmon on 12/10/13.
  */
-public class TwitterTimelineProvider implements StreamsProvider, Serializable, Runnable {
+public class TwitterTimelineProvider implements StreamsProvider, Serializable {
+
+    private final static String STREAMS_ID = "TwitterTimelineProvider";
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProvider.class);
 
@@ -45,11 +52,10 @@ public class TwitterTimelineProvider imp
         this.config = config;
     }
 
-    protected volatile BlockingQueue<String> inQueue = new LinkedBlockingQueue<String>(10000);
-
     protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
 
     protected Twitter client;
+    protected Iterator<Long> ids;
 
     ListenableFuture providerTaskComplete;
 //
@@ -92,69 +98,118 @@ public class TwitterTimelineProvider imp
         return this.providerQueue;
     }
 
-    public void run() {
-
-        executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+//    public void run() {
+//
+//        LOGGER.info("{} Running", STREAMS_ID);
+//
+//        while( ids.hasNext() ) {
+//            Long currentId = ids.next();
+//            LOGGER.info("Provider Task Starting: {}", currentId);
+//            captureTimeline(currentId);
+//        }
+//
+//        LOGGER.info("{} Finished.  Cleaning up...", STREAMS_ID);
+//
+//        client.shutdown();
+//
+//        LOGGER.info("{} Exiting", STREAMS_ID);
+//
+//        while(!providerTaskComplete.isDone() && !providerTaskComplete.isCancelled() ) {
+//            try {
+//                Thread.sleep(100);
+//            } catch (InterruptedException e) {}
+//        }
+//    }
 
-        Preconditions.checkNotNull(providerQueue);
+    private void captureTimeline(long currentId) {
 
-        Preconditions.checkNotNull(this.klass);
+        Paging paging = new Paging(1, 200);
+        List<Status> statuses = null;
+        boolean KeepGoing = true;
+        boolean hadFailure = false;
+
+        do
+        {
+            int keepTrying = 0;
+
+            // keep trying to load, give it 5 attempts.
+            //while (keepTrying < 10)
+            while (keepTrying < 1)
+            {
+
+                try
+                {
+                    statuses = client.getUserTimeline(currentId, paging);
+
+                    for (Status tStat : statuses)
+                    {
+//                        if( provider.start != null &&
+//                                provider.start.isAfter(new DateTime(tStat.getCreatedAt())))
+//                        {
+//                            // they hit the last date we wanted to collect
+//                            // we can now exit early
+//                            KeepGoing = false;
+//                        }
+                        // emit the record
+                        String json = DataObjectFactory.getRawJSON(tStat);
+
+                        providerQueue.offer(new StreamsDatum(json));
+
+                    }
+
+                    paging.setPage(paging.getPage() + 1);
+
+                    keepTrying = 10;
+                }
+                catch(TwitterException twitterException) {
+                    keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+                }
+                catch(Exception e)
+                {
+                    hadFailure = true;
+                    keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+                }
+                finally
+                {
+                    // Shutdown the twitter to release the resources
+                    client.shutdown();
+                }
+            }
+        }
+        while ((statuses != null) && (statuses.size() > 0) && KeepGoing);
+    }
 
-        Preconditions.checkNotNull(config.getOauth().getConsumerKey());
-        Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
-        Preconditions.checkNotNull(config.getOauth().getAccessToken());
-        Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
+    public StreamsResultSet readCurrent() {
 
-        Preconditions.checkNotNull(config.getFollow());
+        Preconditions.checkArgument(ids.hasNext());
 
-        Preconditions.checkArgument(config.getEndpoint().equals("statuses/user_timeline"));
+        LOGGER.info("{} readCurrent", STREAMS_ID);
 
-        Boolean jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true);
-        Boolean includeEntitiesEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getIncludeEntities()))).or(true);
-
-        Iterator<Long> ids = config.getFollow().iterator();
         while( ids.hasNext() ) {
-            Long id = ids.next();
-
-            String baseUrl = config.getProtocol() + "://" + config.getHost() + ":" + config.getPort() + "/" + config.getVersion() + "/";
-
-            ConfigurationBuilder builder = new ConfigurationBuilder()
-                    .setOAuthConsumerKey(config.getOauth().getConsumerKey())
-                    .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
-                    .setOAuthAccessToken(config.getOauth().getAccessToken())
-                    .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
-                    .setIncludeEntitiesEnabled(includeEntitiesEnabled)
-                    .setJSONStoreEnabled(jsonStoreEnabled)
-                    .setAsyncNumThreads(3)
-                    .setRestBaseURL(baseUrl);
-
-            Twitter twitter = new TwitterFactory(builder.build()).getInstance();
-
-            providerTaskComplete = executor.submit(new TwitterTimelineProviderTask(this, twitter, id));
+            Long currentId = ids.next();
+            LOGGER.info("Provider Task Starting: {}", currentId);
+            captureTimeline(currentId);
         }
 
-        for (int i = 0; i < 1; i++) {
-            executor.submit(new TwitterEventProcessor(inQueue, providerQueue, klass));
-        }
-    }
+        LOGGER.info("{} Finished.  Cleaning up...", STREAMS_ID);
 
-    @Override
-    public StreamsResultSet readCurrent() {
-        run();
-        StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
+        StreamsResultSet result = (StreamsResultSet) ImmutableList.copyOf(Iterators.consumingIterator(providerQueue.iterator()));
+        LOGGER.info("{} providing {} docs", STREAMS_ID, providerQueue.size());
+        LOGGER.info("{} Exiting", STREAMS_ID);
         return result;
+
     }
 
-    @Override
     public StreamsResultSet readNew(BigInteger sequence) {
+        LOGGER.debug("{} readNew", STREAMS_ID);
         throw new NotImplementedException();
     }
 
-    @Override
     public StreamsResultSet readRange(DateTime start, DateTime end) {
+        LOGGER.debug("{} readRange", STREAMS_ID);
         this.start = start;
         this.end = end;
-        run();
+        readCurrent();
         StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
         return result;
     }
@@ -181,6 +236,8 @@ public class TwitterTimelineProvider imp
     @Override
     public void prepare(Object o) {
 
+        executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+
         Preconditions.checkNotNull(providerQueue);
 
         Preconditions.checkNotNull(this.klass);
@@ -197,33 +254,29 @@ public class TwitterTimelineProvider imp
         Boolean jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true);
         Boolean includeEntitiesEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getIncludeEntities()))).or(true);
 
-        Iterator<Long> ids = config.getFollow().iterator();
-        while( ids.hasNext() ) {
-            Long id = ids.next();
+        ids = config.getFollow().iterator();
 
-            String baseUrl = config.getProtocol() + "://" + config.getHost() + ":" + config.getPort() + "/" + config.getVersion() + "/";
+        String baseUrl = config.getProtocol() + "://" + config.getHost() + ":" + config.getPort() + "/" + config.getVersion() + "/";
 
-            ConfigurationBuilder builder = new ConfigurationBuilder()
-                    .setOAuthConsumerKey(config.getOauth().getConsumerKey())
-                    .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
-                    .setOAuthAccessToken(config.getOauth().getAccessToken())
-                    .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
-                    .setIncludeEntitiesEnabled(includeEntitiesEnabled)
-                    .setJSONStoreEnabled(jsonStoreEnabled)
-                    .setAsyncNumThreads(3)
-                    .setRestBaseURL(baseUrl);
+        ConfigurationBuilder builder = new ConfigurationBuilder()
+                .setOAuthConsumerKey(config.getOauth().getConsumerKey())
+                .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
+                .setOAuthAccessToken(config.getOauth().getAccessToken())
+                .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
+                .setIncludeEntitiesEnabled(includeEntitiesEnabled)
+                .setJSONStoreEnabled(jsonStoreEnabled)
+                .setAsyncNumThreads(3)
+                .setRestBaseURL(baseUrl);
 
-            Twitter twitter = new TwitterFactory(builder.build()).getInstance();
-            providerTaskComplete = executor.submit(new TwitterTimelineProviderTask(this, twitter, id));
-        }
+        client = new TwitterFactory(builder.build()).getInstance();
 
-        for (int i = 0; i < 1; i++) {
-            executor.submit(new TwitterEventProcessor(inQueue, providerQueue, klass));
-        }
     }
 
     @Override
     public void cleanUp() {
+
+        client.shutdown();
+
         shutdownAndAwaitTermination(executor);
     }
 }

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java?rev=1572204&r1=1572203&r2=1572204&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java Wed Feb 26 19:15:42 2014
@@ -60,7 +60,7 @@ public class TwitterTimelineProviderTask
                         // emit the record
                         String json = DataObjectFactory.getRawJSON(tStat);
 
-                        provider.inQueue.offer(json);
+                        //provider.offer(json);
 
                     }
 

Added: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java?rev=1572204&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java Wed Feb 26 19:15:42 2014
@@ -0,0 +1,23 @@
+package org.apache.streams.core;
+
+import java.io.Serializable;
+
+/**
+ *
+ */
+public interface StreamsOperation extends Serializable {
+
+    /**
+     * This method will be called after initialization/serialization. Initialize any non-serializable objects here.
+     * @param configurationObject Any object to help intialize the operation. ie. Map, JobContext, Properties, etc. The type
+     *                            will be based on where the operation is being run (ie. hadoop, storm, locally, etc.)
+     */
+    public void prepare(Object configurationObject);
+
+    /**
+     * No guarantee that this method will ever be called.  But upon shutdown of the stream, an attempt to call this method
+     * will be made.
+     * Use this method to terminate connections, etc.
+     */
+    public void cleanUp();
+}

Added: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/InvalidStreamException.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/InvalidStreamException.java?rev=1572204&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/InvalidStreamException.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/InvalidStreamException.java Wed Feb 26 19:15:42 2014
@@ -0,0 +1,23 @@
+package org.apache.streams.core.builders;
+
+/**
+ * Exception that indicates a malformed data stream in some way.
+ */
+public class InvalidStreamException extends RuntimeException {
+
+    public InvalidStreamException() {
+        super();
+    }
+
+    public InvalidStreamException(String s) {
+        super(s);
+    }
+
+    public InvalidStreamException(String s, Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public InvalidStreamException(Throwable throwable) {
+        super(throwable);
+    }
+}

Added: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java?rev=1572204&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java Wed Feb 26 19:15:42 2014
@@ -0,0 +1,256 @@
+package org.apache.streams.core.builders;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.tasks.StreamsProviderTask;
+import org.apache.streams.core.tasks.StreamsTask;
+import org.apache.streams.util.SerializationUtil;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link org.apache.streams.core.builders.StreamBuilder} implementation to run a data processing stream in a single
+ * JVM across many threads.  Depending on your data stream, the JVM heap may need to be set to a high value. Default
+ * implementation uses unbound {@link java.util.concurrent.ConcurrentLinkedQueue} to connect stream components.
+ */
+public class LocalStreamBuilder implements StreamBuilder{
+
+    private Map<String, StreamComponent> providers;
+    private Map<String, StreamComponent> components;
+    private Queue<StreamsDatum> queue;
+    private Map<String, Object> streamConfig;
+    private ExecutorService executor;
+    private int totalTasks;
+
+    /**
+     *
+     */
+    public LocalStreamBuilder(){
+        this(new ConcurrentLinkedQueue<StreamsDatum>(), null);
+    }
+
+    /**
+     *
+     * @param streamConfig
+     */
+    public LocalStreamBuilder(Map<String, Object> streamConfig) {
+        this(new ConcurrentLinkedQueue<StreamsDatum>(), streamConfig);
+    }
+
+    /**
+     *
+     * @param queueType
+     */
+    public LocalStreamBuilder(Queue<StreamsDatum> queueType) {
+        this(queueType, null);
+    }
+
+    /**
+     *
+     * @param queueType
+     * @param streamConfig
+     */
+    public LocalStreamBuilder(Queue<StreamsDatum> queueType, Map<String, Object> streamConfig) {
+        this.queue = queueType;
+        this.providers = new HashMap<String, StreamComponent>();
+        this.components = new HashMap<String, StreamComponent>();
+        this.streamConfig = streamConfig;
+        this.totalTasks = 0;
+    }
+
+    @Override
+    public StreamBuilder newReadCurrentStream(String id, StreamsProvider provider) {
+        validateId(id);
+        this.providers.put(id, new StreamComponent(id, provider));
+        ++this.totalTasks;
+        return this;
+    }
+
+    @Override
+    public StreamBuilder newReadNewStream(String id, StreamsProvider provider, BigInteger sequence) {
+        validateId(id);
+        this.providers.put(id, new StreamComponent(id, provider, sequence));
+        ++this.totalTasks;
+        return this;
+    }
+
+    @Override
+    public StreamBuilder newReadRangeStream(String id, StreamsProvider provider, DateTime start, DateTime end) {
+        validateId(id);
+        this.providers.put(id, new StreamComponent(id, provider, start, end));
+        ++this.totalTasks;
+        return this;
+    }
+
+    @Override
+    public StreamBuilder addStreamsProcessor(String id, StreamsProcessor processor, int numTasks, String... inBoundIds) {
+        validateId(id);
+        StreamComponent comp = new StreamComponent(id, processor, cloneQueue(), numTasks);
+        this.components.put(id, comp);
+        connectToOtherComponents(inBoundIds, comp);
+        this.totalTasks += numTasks;
+        return this;
+    }
+
+    @Override
+    public StreamBuilder addStreamsPersistWriter(String id, StreamsPersistWriter writer, int numTasks, String... inBoundIds) {
+        validateId(id);
+        StreamComponent comp = new StreamComponent(id, writer, cloneQueue(), numTasks);
+        this.components.put(id, comp);
+        connectToOtherComponents(inBoundIds, comp);
+        this.totalTasks += numTasks;
+        return this;
+    }
+
+    /**
+     * Runs the data stream in the this JVM and blocks till completion.
+     */
+    @Override
+    public void start() {
+        boolean isRunning = true;
+        this.executor = Executors.newFixedThreadPool(this.totalTasks);
+        Map<String, StreamsProviderTask> provTasks = new HashMap<String, StreamsProviderTask>();
+        Map<String, List<StreamsTask>> streamsTasks = new HashMap<String, List<StreamsTask>>();
+        try {
+            for(StreamComponent comp : this.components.values()) {
+                int tasks = comp.getNumTasks();
+                List<StreamsTask> compTasks = new LinkedList<StreamsTask>();
+                for(int i=0; i < tasks; ++i) {
+                    StreamsTask task = comp.createConnectedTask();
+                    task.setStreamConfig(this.streamConfig);
+                    this.executor.submit(task);
+                    compTasks.add(task);
+                }
+                streamsTasks.put(comp.getId(), compTasks);
+            }
+            for(StreamComponent prov : this.providers.values()) {
+                StreamsTask task = prov.createConnectedTask();
+                task.setStreamConfig(this.streamConfig);
+                this.executor.submit(task);
+                provTasks.put(prov.getId(), (StreamsProviderTask) task);
+            }
+
+            while(isRunning) {
+                isRunning = false;
+                for(StreamsProviderTask task : provTasks.values()) {
+                    isRunning = isRunning || task.isRunning();
+                }
+                if(isRunning) {
+                    Thread.sleep(10000);
+                }
+            }
+            this.executor.shutdown();
+            //complete stream shut down gracfully 
+            for(StreamComponent prov : this.providers.values()) {
+                shutDownTask(prov, streamsTasks);
+            }
+            //need to make this configurable
+            if(!this.executor.awaitTermination(10, TimeUnit.SECONDS)) { // all threads should have terminated already.
+                this.executor.shutdownNow();
+                this.executor.awaitTermination(10, TimeUnit.SECONDS);
+            }
+        } catch (InterruptedException e){
+            //give the stream 30secs to try to shutdown gracefully, then force shutdown otherwise
+            for(List<StreamsTask> tasks : streamsTasks.values()) {
+                for(StreamsTask task : tasks) {
+                    task.stopTask();
+                }
+            }
+            this.executor.shutdown();
+            try {
+                if(!this.executor.awaitTermination(30, TimeUnit.SECONDS)){
+                    this.executor.shutdownNow();
+                }
+            }catch (InterruptedException ie) {
+                this.executor.shutdownNow();
+                throw new RuntimeException(ie);
+            }
+        }
+
+    }
+
+    /**
+     * Shutsdown the running tasks in sudo depth first search kind of way. Checks that the upstream components have
+     * finished running before shutting down. Waits till inbound queue is empty to shutdown.
+     * @param comp StreamComponent to shut down.
+     * @param streamTasks the list of non-StreamsProvider tasks for this stream.
+     * @throws InterruptedException
+     */
+    private void shutDownTask(StreamComponent comp, Map<String, List<StreamsTask>> streamTasks) throws InterruptedException {
+        List<StreamsTask> tasks = streamTasks.get(comp.getId());
+        if(tasks != null) { //not a StreamProvider
+            boolean parentsShutDown = true;
+            for(StreamComponent parent : comp.getUpStreamComponents()) {
+                List<StreamsTask> parentTasks = streamTasks.get(parent.getId());
+                //if parentTask == null, its a provider and is not running anymore
+                if(parentTasks != null) {
+                    for(StreamsTask task : parentTasks) {
+                        parentsShutDown = parentsShutDown && !task.isRunning();
+                    }
+                }
+            }
+            if(parentsShutDown) {
+                for(StreamsTask task : tasks) {
+                    task.stopTask();
+                }
+                for(StreamsTask task : tasks) {
+                    while(task.isRunning()) {
+                        Thread.sleep(500);
+                    }
+                }
+            }
+        }
+        Collection<StreamComponent> children = comp.getDownStreamComponents();
+        if(children != null) {
+            for(StreamComponent child : comp.getDownStreamComponents()) {
+                shutDownTask(child, streamTasks);
+            }
+        }
+    }
+
+    /**
+     * NOT IMPLEMENTED.
+     */
+    @Override
+    public void stop() {
+
+    }
+
+    private void connectToOtherComponents(String[] conntectToIds, StreamComponent toBeConnected) {
+        for(String id : conntectToIds) {
+            StreamComponent upStream = null;
+            if(this.providers.containsKey(id)) {
+                upStream = this.providers.get(id);
+            }
+            else if(this.components.containsKey(id)) {
+                upStream = this.components.get(id);
+            }
+            else {
+                throw new InvalidStreamException("Cannot connect to id, "+id+", because id does not exist.");
+            }
+            upStream.addOutBoundQueue(toBeConnected, toBeConnected.getInBoundQueue());
+            toBeConnected.addInboundQueue(upStream);
+        }
+    }
+
+    private void validateId(String id) {
+        if(this.providers.containsKey(id) || this.components.containsKey(id)) {
+            throw new InvalidStreamException("Duplicate id. "+id+" is already assigned to another component");
+        }
+    }
+
+
+    private Queue<StreamsDatum> cloneQueue() {
+        return (Queue<StreamsDatum>)SerializationUtil.cloneBySerialization(this.queue);
+    }
+
+
+}

Added: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamBuilder.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamBuilder.java?rev=1572204&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamBuilder.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamBuilder.java Wed Feb 26 19:15:42 2014
@@ -0,0 +1,97 @@
+package org.apache.streams.core.builders;
+
+import org.apache.streams.core.*;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.Queue;
+
+/**
+ * Interface for building data streams.
+ *
+ * <pre>
+ *     StreamBuilder builder = ...
+ *     builder.newReadCurrentStream(. . .)
+ *            .addStreamsProcessor(. . .)
+ *            ...
+ *            .addStreamsPersistWriter(. . .)
+ *     builder.run();
+ * </pre>
+ *
+ */
+public interface StreamBuilder {
+
+
+    /**
+     * Add a {@link org.apache.streams.core.StreamsProcessor} to the data processing stream.
+     * @param processorId unique id for this processor - must be unique across the entire stream
+     * @param processor the processor to execute
+     * @param numTasks the number of instances of this processor to run concurrently
+     * @param connectToIds the ids of the {@link org.apache.streams.core.StreamsOperation} that this process will
+     *                     receive data from.
+     * @return this
+     */
+    public StreamBuilder addStreamsProcessor(String processorId, StreamsProcessor processor, int numTasks, String... connectToIds);
+
+    /**
+     * Add a {@link org.apache.streams.core.StreamsPersistWriter} to the data processing stream.
+     * @param persistWriterId unique id for this processor - must be unique across the entire stream
+     * @param writer the writer to execute
+     * @param numTasks the number of instances of this writer to run concurrently
+     * @param connectToIds the ids of the {@link org.apache.streams.core.StreamsOperation} that this process will
+     *                     receive data from.
+     * @return this
+     */
+    public StreamBuilder addStreamsPersistWriter(String persistWriterId, StreamsPersistWriter writer, int numTasks, String... connectToIds);
+
+    /**
+     * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream.  The provider will execute
+     * {@link org.apache.streams.core.StreamsProvider:readCurrent()} to produce data.
+     * @param streamId unique if for this provider - must be unique across the entire stream.
+     * @param provider provider to execute
+     * @return this
+     */
+    public StreamBuilder newReadCurrentStream(String streamId, StreamsProvider provider);
+
+    /**
+     * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream.  The provider will execute
+     * {@link org.apache.streams.core.StreamsProvider:readNext(BigInteger)} to produce data.
+     * @param streamId unique if for this provider - must be unique across the entire stream.
+     * @param provider provider to execute
+     * @param sequence sequence to pass to {@link org.apache.streams.core.StreamsProvider:readNext(BigInteger)} method
+     * @return this
+     */
+    public StreamBuilder newReadNewStream(String streamId, StreamsProvider provider, BigInteger sequence);
+
+    /**
+     * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream.  The provider will execute
+     * {@link org.apache.streams.core.StreamsProvider:readRange(DateTime, DateTime)} to produce data. Whether the start
+     * and end dates are inclusive or exclusive is up to the implementation.
+     * @param streamId unique if for this provider - must be unique across the entire stream.
+     * @param provider provider to execute
+     * @param start start date
+     * @param end end date
+     * @return this
+     */
+    public StreamBuilder newReadRangeStream(String streamId, StreamsProvider provider, DateTime start, DateTime end);
+
+    /**
+     * Builds the stream, and starts it or submits it based on implementation.
+     */
+    public void start();
+
+    /**
+     * Stops the streams processing.  No guarantee on a smooth shutdown. Optional method, may not be implemented in
+     * all cases.
+     */
+    public void stop();
+
+
+
+
+
+
+
+
+
+}

Added: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java?rev=1572204&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java Wed Feb 26 19:15:42 2014
@@ -0,0 +1,217 @@
+package org.apache.streams.core.builders;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.tasks.StreamsPersistWriterTask;
+import org.apache.streams.core.tasks.StreamsProcessorTask;
+import org.apache.streams.core.tasks.StreamsProviderTask;
+import org.apache.streams.core.tasks.StreamsTask;
+import org.apache.streams.util.SerializationUtil;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.*;
+
+/**
+ * Stores the implementations of {@link org.apache.streams.core.StreamsOperation}, the StreamsOperations it is connected
+ * to and the necessary metadata to construct a data stream.
+ */
+public class StreamComponent {
+
+    private static final int START = 1;
+    private static final int END = 2;
+
+    private String id;
+    private Set<StreamComponent> inBound;
+    private Map<StreamComponent, Queue<StreamsDatum>> outBound;
+    private Queue<StreamsDatum> inQueue;
+    private StreamsProvider provider;
+    private StreamsProcessor processor;
+    private StreamsPersistWriter writer;
+    private DateTime[] dateRange;
+    private BigInteger sequence;
+    private int numTasks = 1;
+
+    /**
+     *
+     * @param id
+     * @param provider
+     */
+    public StreamComponent(String id, StreamsProvider provider) {
+        this.id = id;
+        this.provider = provider;
+        initializePrivateVariables();
+    }
+
+    /**
+     *
+     * @param id
+     * @param provider
+     * @param start
+     * @param end
+     */
+    public StreamComponent(String id, StreamsProvider provider, DateTime start, DateTime end) {
+        this.id = id;
+        this.provider = provider;
+        this.dateRange = new DateTime[2];
+        this.dateRange[START] = start;
+        this.dateRange[END] = end;
+        initializePrivateVariables();
+    }
+
+
+    /**
+     *
+     * @param id
+     * @param provider
+     * @param sequence
+     */
+    public StreamComponent(String id, StreamsProvider provider, BigInteger sequence) {
+        this.id = id;
+        this.provider = provider;
+        this.sequence = sequence;
+    }
+
+    /**
+     *
+     * @param id
+     * @param processor
+     * @param inQueue
+     * @param numTasks
+     */
+    public StreamComponent(String id, StreamsProcessor processor, Queue<StreamsDatum> inQueue, int numTasks) {
+        this.id = id;
+        this.processor = processor;
+        this.inQueue = inQueue;
+        this.numTasks = numTasks;
+        initializePrivateVariables();
+    }
+
+    /**
+     *
+     * @param id
+     * @param writer
+     * @param inQueue
+     * @param numTasks
+     */
+    public StreamComponent(String id, StreamsPersistWriter writer, Queue<StreamsDatum> inQueue, int numTasks) {
+        this.id = id;
+        this.writer = writer;
+        this.inQueue = inQueue;
+        this.numTasks = numTasks;
+        initializePrivateVariables();
+    }
+
+    private void initializePrivateVariables() {
+        this.inBound = new HashSet<StreamComponent>();
+        this.outBound = new HashMap<StreamComponent, Queue<StreamsDatum>>();
+    }
+
+    /**
+     * Add an outbound queue for this component. The queue should be an inbound queue of a downstream component.
+     * @param component the component that this supplying their inbound queue
+     * @param queue the queue to to put post processed/provided datums on
+     */
+    public void addOutBoundQueue(StreamComponent component, Queue<StreamsDatum> queue) {
+        this.outBound.put(component, queue);
+    }
+
+    /**
+     * Add a component that supplies data through the inbound queue.
+     * @param component that supplies data through the inbound queue
+     */
+    public void addInboundQueue(StreamComponent component) {
+        this.inBound.add(component);
+    }
+
+    /**
+     * The components that are immediately downstream of this component (aka child nodes)
+     * @return Collection of child nodes of this component
+     */
+    public Collection<StreamComponent> getDownStreamComponents() {
+        return this.outBound.keySet();
+    }
+
+    /**
+     * The components that are immediately upstream of this component (aka parent nodes)
+     * @return Collection of parent nodes of this component
+     */
+    public Collection<StreamComponent> getUpStreamComponents() {
+        return this.inBound;
+    }
+
+    /**
+     * The inbound queue for this component
+     * @return inbound queue
+     */
+    public Queue<StreamsDatum> getInBoundQueue() {
+        return this.inQueue;
+    }
+
+    /**
+     * The number of tasks this to run this component
+     * @return
+     */
+    public int getNumTasks() {
+        return this.numTasks;
+    }
+
+    /**
+     * Creates a {@link org.apache.streams.core.tasks.StreamsTask} that is running a clone of this component whose
+     * inbound and outbound queues are appropriately connected to the parent and child nodes.
+     * @return StreamsTask for this component
+     */
+    public StreamsTask createConnectedTask() {
+        StreamsTask task;
+        if(this.processor != null) {
+            task =  new StreamsProcessorTask((StreamsProcessor)SerializationUtil.cloneBySerialization(this.processor));
+            task.addInputQueue(this.inQueue);
+            for(Queue<StreamsDatum> q : this.outBound.values()) {
+                task.addOutputQueue(q);
+            }
+        }
+        else if(this.writer != null) {
+            task = new StreamsPersistWriterTask((StreamsPersistWriter) SerializationUtil.cloneBySerialization(this.writer));
+            task.addInputQueue(this.inQueue);
+        }
+        else if(this.provider != null) {
+            StreamsProvider prov = (StreamsProvider)SerializationUtil.cloneBySerialization(this.provider);
+            if(this.dateRange == null && this.sequence == null)
+                task = new StreamsProviderTask(prov);
+            else if(this.sequence != null)
+                task = new StreamsProviderTask(prov, this.sequence);
+            else
+                task = new StreamsProviderTask(prov, this.dateRange[0], this.dateRange[1]);
+            for(Queue<StreamsDatum> q : this.outBound.values()) {
+                task.addOutputQueue(q);
+            }
+        }
+        else {
+            throw new InvalidStreamException("Underlying StreamComponoent was NULL.");
+        }
+        return task;
+    }
+
+    /**
+     * The unique of this component
+     * @return
+     */
+    public String getId() {
+        return this.id;
+    }
+
+    @Override
+    public int hashCode() {
+        return this.id.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if(o instanceof StreamComponent)
+            return this.id.equals(((StreamComponent) o).id);
+        else
+            return false;
+    }
+}

Added: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java?rev=1572204&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java Wed Feb 26 19:15:42 2014
@@ -0,0 +1,89 @@
+package org.apache.streams.core.tasks;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.util.SerializationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ *
+ */
+public abstract class BaseStreamsTask implements StreamsTask {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BaseStreamsTask.class);
+
+    private List<Queue<StreamsDatum>> inQueues = new ArrayList<Queue<StreamsDatum>>();
+    private List<Queue<StreamsDatum>> outQueues = new LinkedList<Queue<StreamsDatum>>();
+    private int inIndex = 0;
+
+    @Override
+    public void addInputQueue(Queue<StreamsDatum> inputQueue) {
+        this.inQueues.add(inputQueue);
+    }
+
+    @Override
+    public void addOutputQueue(Queue<StreamsDatum> outputQueue) {
+        this.outQueues.add(outputQueue);
+    }
+
+    @Override
+    public List<Queue<StreamsDatum>> getInputQueues() {
+        return this.inQueues;
+    }
+
+    @Override
+    public List<Queue<StreamsDatum>> getOutputQueues() {
+        return this.outQueues;
+    }
+
+    /**
+     * NOTE NECCESSARY AT THE MOMENT.  MAY BECOME NECESSARY AS WE LOOK AT MAKING JOIN TASKS. CURRENTLY ALL TASK HAVE MAX
+     * OF 1 INPUT QUEUE.
+     * Round Robins through input queues to get the next StreamsDatum. If all input queues are empty, it will return null.
+     * @return the next StreamsDatum or null if all input queues are empty.
+     */
+    protected StreamsDatum getNextDatum() {
+        int startIndex = this.inIndex;
+        int index = startIndex;
+        StreamsDatum datum = null;
+        do {
+            datum = this.inQueues.get(index).poll();
+            index = getNextInputQueueIndex();
+        } while( datum == null && startIndex != index);
+        return datum;
+    }
+
+    /**
+     * Adds a StreamDatum to the outgoing queues.  If there are multiple queues, it uses serialization to create
+     * clones of the datum and adds a new clone to each queue.
+     * @param datum
+     */
+    protected void addToOutgoingQueue(StreamsDatum datum) {
+        if(this.outQueues.size() == 1) {
+            this.outQueues.get(0).offer(datum);
+        }
+        else {
+            for(Queue<StreamsDatum> queue : this.outQueues) {
+                try {
+                    queue.offer((StreamsDatum) SerializationUtil.deserialize(SerializationUtil.serialize(datum)));
+                } catch (RuntimeException e) {
+                    LOGGER.debug("Failed to add StreamsDatum to outgoing queue : {}", datum);
+                    LOGGER.error("Exception while offering StreamsDatum to outgoing queue: {}", e);
+                }
+            }
+        }
+    }
+
+    private int getNextInputQueueIndex() {
+        ++this.inIndex;
+        if(this.inIndex >= this.inQueues.size()) {
+            this.inIndex = 0;
+        }
+        return this.inIndex;
+    }
+}

Added: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsMergeTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsMergeTask.java?rev=1572204&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsMergeTask.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsMergeTask.java Wed Feb 26 19:15:42 2014
@@ -0,0 +1,58 @@
+package org.apache.streams.core.tasks;
+
+import org.apache.streams.core.StreamsDatum;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * NOT USED.  When joins/partions are implemented, a similar pattern could be followed. Done only as basic proof
+ * of concept.
+ */
+public class StreamsMergeTask extends BaseStreamsTask {
+
+    private AtomicBoolean keepRunning;
+    private long sleepTime;
+
+    public StreamsMergeTask() {
+        this(DEFAULT_SLEEP_TIME_MS);
+    }
+
+    public StreamsMergeTask(long sleepTime) {
+        this.sleepTime = sleepTime;
+        this.keepRunning = new AtomicBoolean(true);
+    }
+
+
+    @Override
+    public void stopTask() {
+        this.keepRunning.set(false);
+    }
+
+    @Override
+    public void setStreamConfig(Map<String, Object> config) {
+
+    }
+
+    @Override
+    public boolean isRunning() {
+        return false;
+    }
+
+    @Override
+    public void run() {
+        while(this.keepRunning.get()) {
+            StreamsDatum datum = super.getNextDatum();
+            if(datum != null) {
+                super.addToOutgoingQueue(datum);
+            }
+            else {
+                try {
+                    Thread.sleep(this.sleepTime);
+                } catch (InterruptedException e) {
+                    this.keepRunning.set(false);
+                }
+            }
+        }
+    }
+}

Added: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsPersistWriterTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsPersistWriterTask.java?rev=1572204&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsPersistWriterTask.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsPersistWriterTask.java Wed Feb 26 19:15:42 2014
@@ -0,0 +1,103 @@
+package org.apache.streams.core.tasks;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ */
+public class StreamsPersistWriterTask extends BaseStreamsTask {
+
+
+
+    private StreamsPersistWriter writer;
+    private long sleepTime;
+    private AtomicBoolean keepRunning;
+    private Map<String, Object> streamConfig;
+    private Queue<StreamsDatum> inQueue;
+    private AtomicBoolean isRunning;
+
+    /**
+     * Default constructor.  Uses default sleep of 500ms when inbound queue is empty.
+     * @param writer writer to execute in task
+     */
+    public StreamsPersistWriterTask(StreamsPersistWriter writer) {
+        this(writer, DEFAULT_SLEEP_TIME_MS);
+    }
+
+    /**
+     *
+     * @param writer writer to execute in task
+     * @param sleepTime time to sleep when inbound queue is empty.
+     */
+    public StreamsPersistWriterTask(StreamsPersistWriter writer, long sleepTime) {
+        this.writer = writer;
+        this.sleepTime = sleepTime;
+        this.keepRunning = new AtomicBoolean(true);
+        this.isRunning = new AtomicBoolean(true);
+    }
+
+    @Override
+    public void setStreamConfig(Map<String, Object> config) {
+        this.streamConfig = config;
+    }
+
+    @Override
+    public void addInputQueue(Queue<StreamsDatum> inputQueue) {
+        this.inQueue = inputQueue;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return this.isRunning.get();
+    }
+
+    @Override
+    public void run() {
+        try {
+            this.writer.prepare(this.streamConfig);
+            StreamsDatum datum = this.inQueue.poll();
+            while(datum != null || this.keepRunning.get()) {
+                if(datum != null) {
+                    this.writer.write(datum);
+                }
+                else {
+                    try {
+                        Thread.sleep(this.sleepTime);
+                    } catch (InterruptedException e) {
+                        this.keepRunning.set(false);
+                    }
+                }
+                datum = this.inQueue.poll();
+            }
+
+        } finally {
+            this.writer.cleanUp();
+            this.isRunning.set(false);
+        }
+    }
+
+    @Override
+    public void stopTask() {
+        this.keepRunning.set(false);
+    }
+
+
+    @Override
+    public void addOutputQueue(Queue<StreamsDatum> outputQueue) {
+        throw new UnsupportedOperationException(this.getClass().getName()+" does not support method - setOutputQueue()");
+    }
+
+    @Override
+    public List<Queue<StreamsDatum>> getInputQueues() {
+        List<Queue<StreamsDatum>> queues = new LinkedList<Queue<StreamsDatum>>();
+        queues.add(this.inQueue);
+        return queues;
+    }
+}

Added: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProcessorTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProcessorTask.java?rev=1572204&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProcessorTask.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProcessorTask.java Wed Feb 26 19:15:42 2014
@@ -0,0 +1,102 @@
+package org.apache.streams.core.tasks;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.StreamsProcessor;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ */
+public class StreamsProcessorTask extends BaseStreamsTask {
+
+
+    private StreamsProcessor processor;
+    private long sleepTime;
+    private AtomicBoolean keepRunning;
+    private Map<String, Object> streamConfig;
+    private Queue<StreamsDatum> inQueue;
+    private AtomicBoolean isRunning;
+
+    /**
+     * Default constructor, uses default sleep time of 500ms when inbound queue is empty
+     * @param processor process to run in task
+     */
+    public StreamsProcessorTask(StreamsProcessor processor) {
+        this(processor, DEFAULT_SLEEP_TIME_MS);
+    }
+
+    /**
+     *
+     * @param processor processor to run in task
+     * @param sleepTime time to sleep when incoming queue is empty
+     */
+    public StreamsProcessorTask(StreamsProcessor processor, long sleepTime) {
+        this.processor = processor;
+        this.sleepTime = sleepTime;
+        this.keepRunning = new AtomicBoolean(true);
+        this.isRunning = new AtomicBoolean(true);
+    }
+
+    @Override
+    public void stopTask() {
+        this.keepRunning.set(false);
+    }
+
+    @Override
+    public void setStreamConfig(Map<String, Object> config) {
+        this.streamConfig = config;
+    }
+
+    @Override
+    public void addInputQueue(Queue<StreamsDatum> inputQueue) {
+        this.inQueue = inputQueue;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return this.isRunning.get();
+    }
+
+    @Override
+    public void run() {
+        try {
+            this.processor.prepare(this.streamConfig);
+            StreamsDatum datum = this.inQueue.poll();
+            while(datum != null || this.keepRunning.get()) {
+                if(datum != null) {
+                    List<StreamsDatum> output = this.processor.process(datum);
+                    if(output != null) {
+                        for(StreamsDatum outDatum : output) {
+                            super.addToOutgoingQueue(outDatum);
+                        }
+                    }
+                }
+                else {
+                    try {
+                        Thread.sleep(this.sleepTime);
+                    } catch (InterruptedException e) {
+                        this.keepRunning.set(false);
+                    }
+                }
+                datum = this.inQueue.poll();
+            }
+
+        } finally {
+            this.processor.cleanUp();
+            this.isRunning.set(false);
+        }
+    }
+
+    @Override
+    public List<Queue<StreamsDatum>> getInputQueues() {
+        List<Queue<StreamsDatum>> queues = new LinkedList<Queue<StreamsDatum>>();
+        queues.add(this.inQueue);
+        return queues;
+    }
+}

Added: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProviderTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProviderTask.java?rev=1572204&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProviderTask.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProviderTask.java Wed Feb 26 19:15:42 2014
@@ -0,0 +1,132 @@
+package org.apache.streams.core.tasks;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ */
+public class StreamsProviderTask extends BaseStreamsTask {
+
+    private static enum Type {
+        READ_CURRENT,
+        READ_NEW,
+        READ_RANGE
+    }
+
+    private static final int START = 0;
+    private static final int END = 1;
+
+    private StreamsProvider provider;
+    private AtomicBoolean keepRunning;
+    private Type type;
+    private BigInteger sequence;
+    private DateTime[] dateRange;
+    private Map<String, Object> config;
+    private AtomicBoolean isRunning;
+
+    /**
+     * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readCurrent()}
+     * @param provider
+     */
+    public StreamsProviderTask(StreamsProvider provider) {
+        this.provider = provider;
+        this.type = Type.READ_CURRENT;
+        this.keepRunning = new AtomicBoolean(true);
+        this.isRunning = new AtomicBoolean(true);
+    }
+
+    /**
+     * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readNew(BigInteger)}
+     * @param provider
+     * @param sequence
+     */
+    public StreamsProviderTask(StreamsProvider provider, BigInteger sequence) {
+        this.provider = provider;
+        this.type = Type.READ_NEW;
+        this.sequence = sequence;
+        this.keepRunning = new AtomicBoolean(true);
+        this.isRunning = new AtomicBoolean(true);
+    }
+
+    /**
+     * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readRange(DateTime,DateTime)}
+     * @param provider
+     * @param start
+     * @param end
+     */
+    public StreamsProviderTask(StreamsProvider provider, DateTime start, DateTime end) {
+        this.provider = provider;
+        this.type = Type.READ_RANGE;
+        this.dateRange = new DateTime[2];
+        this.dateRange[START] = start;
+        this.dateRange[END] = end;
+        this.keepRunning = new AtomicBoolean(true);
+        this.isRunning = new AtomicBoolean(true);
+    }
+
+    @Override
+    public void stopTask() {
+        this.keepRunning.set(false);
+    }
+
+    @Override
+    public void addInputQueue(Queue<StreamsDatum> inputQueue) {
+        throw new UnsupportedOperationException(this.getClass().getName()+" does not support method - setInputQueue()");
+    }
+
+    @Override
+    public void setStreamConfig(Map<String, Object> config) {
+        this.config = config;
+    }
+
+    @Override
+    public void run() {
+        try {
+            this.provider.prepare(this.config); //TODO allow for configuration objects
+            StreamsResultSet resultSet = null;
+            this.isRunning.set(true);
+            switch(this.type) {
+                case READ_CURRENT: resultSet = this.provider.readCurrent();
+                    break;
+                case READ_NEW: resultSet = this.provider.readNew(this.sequence);
+                    break;
+                case READ_RANGE: resultSet = this.provider.readRange(this.dateRange[START], this.dateRange[END]);
+                    break;
+                default: throw new RuntimeException("Type has not been added to StreamsProviderTask.");
+            }
+            for(StreamsDatum datum : resultSet) {
+                if(!this.keepRunning.get()) {
+                    break;
+                }
+                if(datum != null)
+                 super.addToOutgoingQueue(datum);
+                else {
+                    try {
+                        Thread.sleep(DEFAULT_SLEEP_TIME_MS);
+                    } catch (InterruptedException e) {
+                        this.keepRunning.set(false);
+                    }
+                }
+            }
+
+        } catch( Exception e ) {
+            e.printStackTrace();
+        } finally
+        {
+            this.provider.cleanUp();
+            this.isRunning.set(false);
+        }
+    }
+
+    public boolean isRunning() {
+        return this.isRunning.get();
+    }
+}

Added: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsTask.java?rev=1572204&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsTask.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsTask.java Wed Feb 26 19:15:42 2014
@@ -0,0 +1,58 @@
+package org.apache.streams.core.tasks;
+
+import org.apache.streams.core.StreamsDatum;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+/**
+ * Interface for all task that will be used to execute instances of {@link org.apache.streams.core.StreamsOperation}
+ * in local mode.
+ */
+public interface StreamsTask extends Runnable{
+
+    public static final long DEFAULT_SLEEP_TIME_MS = 500;
+
+    /**
+     * Informs the task to stop. Tasks may or may not try to empty its inbound queue before halting.
+     */
+    public void stopTask();
+
+    /**
+     * Add an input {@link java.util.Queue} for this task.
+     * @param inputQueue
+     */
+    public void addInputQueue(Queue<StreamsDatum> inputQueue);
+
+    /**
+     * Add an output {@link java.util.Queue} for this task.
+     * @param outputQueue
+     */
+    public void addOutputQueue(Queue<StreamsDatum> outputQueue);
+
+    /**
+     * Set the configuration object that will shared and passed to all instances of StreamsTask.
+     * @param config optional configuration information
+     */
+    public void setStreamConfig(Map<String, Object> config);
+
+    /**
+     * Returns true when the task has not completed. Returns false otherwise
+     * @return true when the task has not completed. Returns false otherwise
+     */
+    public boolean isRunning();
+
+    /**
+     * Returns the input queues that have been set for this task.
+     * @return list of input queues
+     */
+    public List<Queue<StreamsDatum>> getInputQueues();
+
+    /**
+     * Returns the output queues that have been set for this task
+     * @return list of output queues
+     */
+    public List<Queue<StreamsDatum>> getOutputQueues();
+
+}