You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/22 00:20:27 UTC

[49/71] [abbrv] git commit: Adding support for perpetual streams, minor refactoring, work on moreover provider

Adding support for perpetual streams, minor refactoring, work on moreover provider

git-svn-id: https://svn.apache.org/repos/asf/incubator/streams/branches/STREAMS-26@1572821 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ce6961d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ce6961d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ce6961d6

Branch: refs/heads/master
Commit: ce6961d6c54ba5367ef3f664c8dccec3448967e6
Parents: 2eb90cd
Author: sblackmon <sb...@unknown>
Authored: Fri Feb 28 05:14:16 2014 +0000
Committer: sblackmon <sb...@unknown>
Committed: Fri Feb 28 05:14:16 2014 +0000

----------------------------------------------------------------------
 streams-contrib/pom.xml                         |  2 +-
 .../streams/console/ConsolePersistReader.java   |  5 ++
 .../ElasticsearchPersistReader.java             |  5 ++
 .../streams/hdfs/WebHdfsPersistWriter.java      |  8 +--
 .../streams/kafka/KafkaPersistReader.java       | 40 ++++--------
 .../streams-provider-moreover/pom.xml           |  4 ++
 .../streams/data/moreover/MoreoverClient.java   |  2 +-
 .../streams/data/moreover/MoreoverProvider.java | 66 ++++++++++----------
 .../data/moreover/MoreoverResultSetWrapper.java | 31 +--------
 .../com/moreover/MoreoverConfiguration.json     |  1 +
 .../streams/rss/provider/RssStreamProvider.java |  5 +-
 .../twitter/provider/TwitterStreamProvider.java |  6 +-
 .../provider/TwitterTimelineProvider.java       |  5 ++
 .../apache/streams/core/StreamsProvider.java    |  6 +-
 .../core/builders/LocalStreamBuilder.java       | 12 +++-
 .../streams/core/builders/StreamBuilder.java    |  9 +++
 .../streams/core/builders/StreamComponent.java  | 13 +++-
 .../streams/core/tasks/StreamsProviderTask.java | 53 +++++++++++-----
 18 files changed, 146 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index 07660ee..c24a33b 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -44,7 +44,7 @@
         <!--<module>streams-provider-datasift</module>-->
         <!--<module>streams-provider-facebook</module>-->
         <!--<module>streams-provider-gnip</module>-->
-        <!--<module>streams-provider-moreover</module>-->
+        <module>streams-provider-moreover</module>
         <module>streams-provider-twitter</module>
         <!--<module>streams-provider-sysomos</module>-->
         <module>streams-provider-rss</module>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
index 2addc65..a3967e7 100644
--- a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
+++ b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
@@ -45,6 +45,11 @@ public class ConsolePersistReader implements StreamsPersistReader {
     }
 
     @Override
+    public void startStream() {
+        // no op
+    }
+
+    @Override
     public StreamsResultSet readAll() {
         return readCurrent();
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
index 8dbdeb0..2129eca 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
@@ -121,6 +121,11 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Iterabl
     }
 
     @Override
+    public void startStream() {
+
+    }
+
+    @Override
     public void prepare(Object o) {
 
         // If we haven't already set up the search, then set up the search.

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index cb3ceec..0fc8407 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -57,9 +57,9 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
 
     private ObjectMapper mapper = new ObjectMapper();
 
-    private HdfsConfiguration hdfsConfiguration;
+    private HdfsWriterConfiguration hdfsConfiguration;
 
-    public WebHdfsPersistWriter(HdfsConfiguration hdfsConfiguration) {
+    public WebHdfsPersistWriter(HdfsWriterConfiguration hdfsConfiguration) {
         this.hdfsConfiguration = hdfsConfiguration;
     }
 
@@ -180,7 +180,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
         this.fileLineCounter = 0;
 
         // Create the path for where the file is going to live.
-        Path filePath = this.path.suffix("/" + this.filePart + "-" + new Date().getTime() + ".tsv");
+        Path filePath = this.path.suffix("/" + hdfsConfiguration.getWriterFilePrefix() + "-" + new Date().getTime() + ".tsv");
 
         try
         {
@@ -248,7 +248,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
     @Override
     public void prepare(Object configurationObject) {
         connectToWebHDFS();
-        path = new Path(hdfsConfiguration.getPath());
+        path = new Path(hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getWriterPath());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
index fd0dfdf..af43d4e 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
@@ -29,7 +29,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-public class KafkaPersistReader implements StreamsPersistReader, Serializable, Runnable {
+public class KafkaPersistReader implements StreamsPersistReader, Serializable {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistReader.class);
 
@@ -62,7 +62,9 @@ public class KafkaPersistReader implements StreamsPersistReader, Serializable, R
         this.config = config;
     }
 
-    public void start() {
+    @Override
+    public void startStream() {
+
         Properties props = new Properties();
         props.setProperty("serializer.encoding", "UTF8");
 
@@ -81,23 +83,6 @@ public class KafkaPersistReader implements StreamsPersistReader, Serializable, R
 
     }
 
-    public void stop() {
-        consumerConnector.shutdown();
-        while( !executor.isTerminated()) {
-            try {
-                executor.awaitTermination(5, TimeUnit.SECONDS);
-            } catch (InterruptedException e) {}
-        }
-    }
-
-    public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
-        this.persistQueue = persistQueue;
-    }
-
-    public Queue<StreamsDatum> getPersistQueue() {
-        return this.persistQueue;
-    }
-
     @Override
     public StreamsResultSet readAll() {
         return readCurrent();
@@ -129,20 +114,17 @@ public class KafkaPersistReader implements StreamsPersistReader, Serializable, R
     }
 
     @Override
-    public void run() {
-        start();
-
-        // once this class can be told when to shutdown by streams, it will run stop
-        // stop();
-    }
-
-    @Override
     public void prepare(Object configurationObject) {
-        start();
+
     }
 
     @Override
     public void cleanUp() {
-        stop();
+        consumerConnector.shutdown();
+        while( !executor.isTerminated()) {
+            try {
+                executor.awaitTermination(5, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {}
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-contrib/streams-provider-moreover/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/pom.xml b/streams-contrib/streams-provider-moreover/pom.xml
index 6b281d9..4988715 100644
--- a/streams-contrib/streams-provider-moreover/pom.xml
+++ b/streams-contrib/streams-provider-moreover/pom.xml
@@ -41,6 +41,10 @@
             <artifactId>slf4j-api</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+        </dependency>
+        <dependency>
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
index 0ab6e33..ec968fe 100644
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
@@ -20,7 +20,7 @@ import java.util.Date;
 public class MoreoverClient {
     private static final Logger logger = LoggerFactory.getLogger(MoreoverClient.class);
 
-    private static final String BASE_URL = "http://metabase.moreover.com/com.facebook.api/v10/articles?key=%s&limit=%s&sequence_id=%s";
+    private static final String BASE_URL = "http://metabase.moreover.com/api/v10/articles?key=%s&limit=%s&sequence_id=%s";
     private final String id;
     private String apiKey;
     private BigInteger lastSequenceId = BigInteger.ZERO;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
index 133d6e3..4772f5e 100644
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
@@ -19,57 +19,47 @@ import java.util.concurrent.*;
 
 public class MoreoverProvider implements StreamsProvider {
 
-    private static Logger logger = LoggerFactory.getLogger(MoreoverProvider.class);
+    public final static String STREAMS_ID = "MoreoverProvider";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverProvider.class);
 
     protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>();
 
-    private List<ExecutorService> tasks = new LinkedList<ExecutorService>();
     private List<MoreoverKeyData> keys;
-    private boolean started = false;
+
+    private MoreoverConfiguration config;
+
+    private ExecutorService executor;
 
     public MoreoverProvider(MoreoverConfiguration moreoverConfiguration) {
+        this.config = moreoverConfiguration;
         this.keys = Lists.newArrayList();
-        for( MoreoverKeyData apiKey : moreoverConfiguration.getApiKeys()) {
+        for( MoreoverKeyData apiKey : config.getApiKeys()) {
             this.keys.add(apiKey);
         }
-        this.keys = Arrays.asList();
     }
 
-    public MoreoverProvider(MoreoverKeyData... keys) {
-        this.keys = Arrays.asList(keys);
-    }
+    public void startStream() {
 
-    @Override
-    public synchronized void start() {
-        logger.trace("Starting Producer");
-        if(!started) {
-            logger.trace("Producer not started.  Initializing");
-            for(MoreoverKeyData key : keys) {
-                MoreoverProviderTask task = new MoreoverProviderTask(key.getId(), key.getKey(), this.providerQueue, key.getStartingSequence());
-                ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
-                service.scheduleWithFixedDelay(task, 0, MoreoverProviderTask.LATENCY, TimeUnit.SECONDS);
-                logger.info("Started producer for {} with service {}", key.getKey(), service.toString());
-                this.tasks.add(service);
-            }
-            started = true;
+        for(MoreoverKeyData key : keys) {
+            MoreoverProviderTask task = new MoreoverProviderTask(key.getId(), key.getKey(), this.providerQueue, key.getStartingSequence());
+            executor.submit(new Thread(task));
+            LOGGER.info("Started producer for {}", key.getKey());
         }
-    }
 
-    @Override
-    public synchronized void stop() {
-        for(ExecutorService service: tasks) {
-            service.shutdown();
-        }
-    }
-
-    @Override
-    public Queue<StreamsDatum> getProviderQueue() {
-        return providerQueue;
     }
 
     @Override
     public StreamsResultSet readCurrent() {
-        return null;
+        LOGGER.debug("readCurrent");
+
+        LOGGER.info("Providing {} docs", providerQueue.size());
+
+        StreamsResultSet result =  new StreamsResultSet(providerQueue);
+
+        LOGGER.info("Exiting");
+
+        return result;
     }
 
     @Override
@@ -82,4 +72,14 @@ public class MoreoverProvider implements StreamsProvider {
         return null;
     }
 
+    @Override
+    public void prepare(Object configurationObject) {
+        LOGGER.debug("Prepare");
+        executor = Executors.newSingleThreadExecutor();
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResultSetWrapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResultSetWrapper.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResultSetWrapper.java
index 9c161c7..0785fac 100644
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResultSetWrapper.java
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResultSetWrapper.java
@@ -5,37 +5,12 @@ import org.apache.streams.core.StreamsResultSet;
 
 import java.math.BigInteger;
 import java.util.Iterator;
+import java.util.Queue;
 
-public class MoreoverResultSetWrapper implements StreamsResultSet {
-
-    private MoreoverResult underlying;
+public class MoreoverResultSetWrapper extends StreamsResultSet {
 
     public MoreoverResultSetWrapper(MoreoverResult underlying) {
-        this.underlying = underlying;
-    }
-
-    @Override
-    public long getStartTime() {
-        return underlying.getStart();
-    }
-
-    @Override
-    public long getEndTime() {
-        return underlying.getEnd();
+        super((Queue<StreamsDatum>)underlying);
     }
 
-    @Override
-    public String getSourceId() {
-        return underlying.getClientId();
-    }
-
-    @Override
-    public BigInteger getMaxSequence() {
-        return underlying.getMaxSequencedId();
-    }
-
-    @Override
-    public Iterator<StreamsDatum> iterator() {
-        return underlying.iterator();
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-contrib/streams-provider-moreover/src/main/jsonschema/com/moreover/MoreoverConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/jsonschema/com/moreover/MoreoverConfiguration.json b/streams-contrib/streams-provider-moreover/src/main/jsonschema/com/moreover/MoreoverConfiguration.json
index 6950918..d88f28c 100644
--- a/streams-contrib/streams-provider-moreover/src/main/jsonschema/com/moreover/MoreoverConfiguration.json
+++ b/streams-contrib/streams-provider-moreover/src/main/jsonschema/com/moreover/MoreoverConfiguration.json
@@ -3,6 +3,7 @@
     "$schema": "http://json-schema.org/draft-03/schema",
     "id": "#",
     "javaType" : "org.apache.streams.moreover.MoreoverConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
     "properties": {
         "apiKeys": {
             "type": "array",

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
index 9ef0bd6..449f187 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
@@ -85,7 +85,8 @@ public class RssStreamProvider implements StreamsProvider, Serializable {
         this.klass = klass;
     }
 
-    public void start() {
+    @Override
+    public void startStream() {
 
         Preconditions.checkNotNull(this.klass);
 
@@ -133,7 +134,7 @@ public class RssStreamProvider implements StreamsProvider, Serializable {
 
     @Override
     public void prepare(Object configurationObject) {
-        start();
+        
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index 4a8a51b..b0b4cf4 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -90,9 +90,10 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable {
 
     }
 
-    public void run() {
+    @Override
+    public void startStream() {
 
-        for (int i = 0; i < 10; i++) {
+        for (int i = 0; i < 5; i++) {
             executor.submit(new TwitterEventProcessor(inQueue, providerQueue, klass));
         }
 
@@ -101,7 +102,6 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable {
 
     @Override
     public StreamsResultSet readCurrent() {
-        run();
         StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
         return result;
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 6fb3e02..40fb961 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -118,6 +118,11 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
 //        }
 //    }
 
+    @Override
+    public void startStream() {
+        // no op
+    }
+
     private void captureTimeline(long currentId) {
 
         Paging paging = new Paging(1, 200);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java b/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java
index 56878a7..2287c9a 100644
--- a/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java
+++ b/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java
@@ -28,11 +28,7 @@ import java.util.Queue;
  */
 public interface StreamsProvider extends StreamsOperation {
 
-//    void start();
-//    void stop();
-//
-//    public Queue<StreamsDatum> getProviderQueue();
-
+    public void startStream();
     public StreamsResultSet readCurrent();
     public StreamsResultSet readNew(BigInteger sequence);
     public StreamsResultSet readRange(DateTime start, DateTime end);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java b/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
index 76e925f..0598dfb 100644
--- a/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
+++ b/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
@@ -67,9 +67,17 @@ public class LocalStreamBuilder implements StreamBuilder{
     }
 
     @Override
+    public StreamBuilder newPerpetualStream(String id, StreamsProvider provider) {
+        validateId(id);
+        this.providers.put(id, new StreamComponent(id, provider, true));
+        ++this.totalTasks;
+        return this;
+    }
+
+    @Override
     public StreamBuilder newReadCurrentStream(String id, StreamsProvider provider) {
         validateId(id);
-        this.providers.put(id, new StreamComponent(id, provider));
+        this.providers.put(id, new StreamComponent(id, provider, false));
         ++this.totalTasks;
         return this;
     }
@@ -139,7 +147,7 @@ public class LocalStreamBuilder implements StreamBuilder{
             }
 
             while(isRunning) {
-                isRunning = false;
+                //isRunning = false;
                 for(StreamsProviderTask task : provTasks.values()) {
                     isRunning = isRunning || task.isRunning();
                 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-core/src/main/java/org/apache/streams/core/builders/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/builders/StreamBuilder.java b/streams-core/src/main/java/org/apache/streams/core/builders/StreamBuilder.java
index 918eb7a..133776d 100644
--- a/streams-core/src/main/java/org/apache/streams/core/builders/StreamBuilder.java
+++ b/streams-core/src/main/java/org/apache/streams/core/builders/StreamBuilder.java
@@ -51,6 +51,15 @@ public interface StreamBuilder {
      * @param provider provider to execute
      * @return this
      */
+    public StreamBuilder newPerpetualStream(String streamId, StreamsProvider provider);
+
+    /**
+     * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream.  The provider will execute
+     * {@link org.apache.streams.core.StreamsProvider:readCurrent()} to produce data.
+     * @param streamId unique if for this provider - must be unique across the entire stream.
+     * @param provider provider to execute
+     * @return this
+     */
     public StreamBuilder newReadCurrentStream(String streamId, StreamsProvider provider);
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java b/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java
index 6243f94..15cb9e8 100644
--- a/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java
+++ b/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java
@@ -33,15 +33,17 @@ public class StreamComponent {
     private DateTime[] dateRange;
     private BigInteger sequence;
     private int numTasks = 1;
+    private boolean perpetual;
 
     /**
      *
      * @param id
      * @param provider
      */
-    public StreamComponent(String id, StreamsProvider provider) {
+    public StreamComponent(String id, StreamsProvider provider, boolean perpetual) {
         this.id = id;
         this.provider = provider;
+        this.perpetual = perpetual;
         initializePrivateVariables();
     }
 
@@ -182,9 +184,14 @@ public class StreamComponent {
             }
         }
         else if(this.provider != null) {
-            StreamsProvider prov = (StreamsProvider)SerializationUtil.cloneBySerialization(this.provider);
+            StreamsProvider prov;
+            if(this.numTasks > 1) {
+                prov = (StreamsProvider)SerializationUtil.cloneBySerialization(this.provider);
+            } else {
+                prov = this.provider;
+            }
             if(this.dateRange == null && this.sequence == null)
-                task = new StreamsProviderTask(prov);
+                task = new StreamsProviderTask(prov, this.perpetual);
             else if(this.sequence != null)
                 task = new StreamsProviderTask(prov, this.sequence);
             else

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProviderTask.java b/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProviderTask.java
index fe83160..07235d1 100644
--- a/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProviderTask.java
+++ b/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProviderTask.java
@@ -16,6 +16,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 public class StreamsProviderTask extends BaseStreamsTask {
 
     private static enum Type {
+        PERPETUAL,
         READ_CURRENT,
         READ_NEW,
         READ_RANGE
@@ -36,9 +37,12 @@ public class StreamsProviderTask extends BaseStreamsTask {
      * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readCurrent()}
      * @param provider
      */
-    public StreamsProviderTask(StreamsProvider provider) {
+    public StreamsProviderTask(StreamsProvider provider, boolean perpetual) {
         this.provider = provider;
-        this.type = Type.READ_CURRENT;
+        if( perpetual )
+            this.type = Type.PERPETUAL;
+        else
+            this.type = Type.READ_CURRENT;
         this.keepRunning = new AtomicBoolean(true);
         this.isRunning = new AtomicBoolean(true);
     }
@@ -94,6 +98,19 @@ public class StreamsProviderTask extends BaseStreamsTask {
             StreamsResultSet resultSet = null;
             this.isRunning.set(true);
             switch(this.type) {
+                case PERPETUAL: {
+                    provider.startStream();
+                    while(this.keepRunning.get() == true) {
+                        try {
+                            resultSet = provider.readCurrent();
+                            flushResults(resultSet);
+                            Thread.sleep(DEFAULT_SLEEP_TIME_MS);
+                        } catch (InterruptedException e) {
+                            this.keepRunning.set(false);
+                        }
+                    }
+                }
+                    break;
                 case READ_CURRENT: resultSet = this.provider.readCurrent();
                     break;
                 case READ_NEW: resultSet = this.provider.readNew(this.sequence);
@@ -102,20 +119,7 @@ public class StreamsProviderTask extends BaseStreamsTask {
                     break;
                 default: throw new RuntimeException("Type has not been added to StreamsProviderTask.");
             }
-            for(StreamsDatum datum : resultSet) {
-                if(!this.keepRunning.get()) {
-                    break;
-                }
-                if(datum != null)
-                 super.addToOutgoingQueue(datum);
-                else {
-                    try {
-                        Thread.sleep(DEFAULT_SLEEP_TIME_MS);
-                    } catch (InterruptedException e) {
-                        this.keepRunning.set(false);
-                    }
-                }
-            }
+            flushResults(resultSet);
 
         } catch( Exception e ) {
             e.printStackTrace();
@@ -129,4 +133,21 @@ public class StreamsProviderTask extends BaseStreamsTask {
     public boolean isRunning() {
         return this.isRunning.get();
     }
+
+    public void flushResults(StreamsResultSet resultSet) {
+        for(StreamsDatum datum : resultSet) {
+            if(!this.keepRunning.get()) {
+                break;
+            }
+            if(datum != null)
+                super.addToOutgoingQueue(datum);
+            else {
+                try {
+                    Thread.sleep(DEFAULT_SLEEP_TIME_MS);
+                } catch (InterruptedException e) {
+                    this.keepRunning.set(false);
+                }
+            }
+        }
+    }
 }