You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/06/12 20:08:24 UTC

[01/14] git commit: STREAMS-83 | Updated test providers with new mehtod

Repository: incubator-streams
Updated Branches:
  refs/heads/STREAMS-83 92d8d8669 -> 73ca1e038


STREAMS-83 | Updated test providers with new mehtod


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a8ae1937
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a8ae1937
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a8ae1937

Branch: refs/heads/STREAMS-83
Commit: a8ae1937f921549ed69b11d644ee3157185495c0
Parents: 92d8d86
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 10:50:35 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 10:50:35 2014 -0500

----------------------------------------------------------------------
 .../streams/local/test/providers/EmptyResultSetProvider.java    | 5 +++++
 .../streams/local/test/providers/NumericMessageProvider.java    | 5 +++++
 .../org/apache/streams/test/component/FileReaderProvider.java   | 5 +++++
 3 files changed, 15 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a8ae1937/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
index d8dc4eb..fe60295 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
@@ -52,6 +52,11 @@ public class EmptyResultSetProvider implements StreamsProvider {
     }
 
     @Override
+    public boolean isRunning() {
+        return false;
+    }
+
+    @Override
     public void prepare(Object configurationObject) {
         //NOP
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a8ae1937/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
index 987c680..159355d 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
@@ -59,6 +59,11 @@ public class NumericMessageProvider implements StreamsProvider {
     }
 
     @Override
+    public boolean isRunning() {
+        return false;
+    }
+
+    @Override
     public void prepare(Object configurationObject) {
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a8ae1937/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
index 3f84c81..9c1ae97 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
@@ -72,6 +72,11 @@ public class FileReaderProvider implements StreamsProvider {
     }
 
     @Override
+    public boolean isRunning() {
+        return this.scanner.hasNextLine();
+    }
+
+    @Override
     public void prepare(Object configurationObject) {
         this.scanner = new Scanner(FileReaderProvider.class.getResourceAsStream(this.fileName));
     }


[05/14] git commit: STREAMS-83 | Updated twitter stream provider with running method

Posted by mf...@apache.org.
STREAMS-83 | Updated twitter stream provider with running method


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/82858ebf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/82858ebf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/82858ebf

Branch: refs/heads/STREAMS-83
Commit: 82858ebfd0c4027780a92e763bb3b6e9fec950b9
Parents: 8bdb63c
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 11:54:16 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 11:54:16 2014 -0500

----------------------------------------------------------------------
 .../apache/streams/twitter/provider/TwitterStreamProvider.java  | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/82858ebf/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index 0a81deb..f7c438c 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -137,6 +137,11 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
     }
 
     @Override
+    public boolean isRunning() {
+        return !executor.isShutdown() && !executor.isTerminated();
+    }
+
+    @Override
     public void prepare(Object o) {
 
         executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));


[07/14] git commit: STREAMS-83 | Updated rss provider with running method

Posted by mf...@apache.org.
STREAMS-83 | Updated rss provider with running method


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ef483d47
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ef483d47
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ef483d47

Branch: refs/heads/STREAMS-83
Commit: ef483d4732be299b8c651caea9ef94a1e8bb4cdc
Parents: 1327b3a
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 12:22:46 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 12:22:46 2014 -0500

----------------------------------------------------------------------
 .../apache/streams/rss/provider/RssStreamProvider.java   | 11 +++++++++++
 1 file changed, 11 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ef483d47/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
index c9bbe49..02ed1db 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
@@ -45,6 +45,7 @@ import java.net.URL;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Created by sblackmon on 12/10/13.
@@ -77,6 +78,8 @@ public class RssStreamProvider implements StreamsProvider {
 
     protected List<SyndFeed> feeds;
 
+    protected final AtomicBoolean running = new AtomicBoolean();
+
     private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
         return new ThreadPoolExecutor(nThreads, nThreads,
                 5000L, TimeUnit.MILLISECONDS,
@@ -121,12 +124,15 @@ public class RssStreamProvider implements StreamsProvider {
         for( int i = 0; i < ((config.getFeeds().size() / 5) + 1); i++ )
             executor.submit(new RssEventProcessor(inQueue, providerQueue, klass));
 
+        running.set(true);
+
     }
 
     public void stop() {
         for (int i = 0; i < ((config.getFeeds().size() / 5) + 1); i++) {
             inQueue.add(RssEventProcessor.TERMINATE);
         }
+        running.set(false);
     }
 
     public Queue<StreamsDatum> getProviderQueue() {
@@ -151,6 +157,11 @@ public class RssStreamProvider implements StreamsProvider {
     }
 
     @Override
+    public boolean isRunning() {
+        return running.get();
+    }
+
+    @Override
     public void prepare(Object configurationObject) {
         
     }


[14/14] git commit: STREAMS-83 | Updated google provider with running method

Posted by mf...@apache.org.
STREAMS-83 | Updated google provider with running method


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/73ca1e03
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/73ca1e03
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/73ca1e03

Branch: refs/heads/STREAMS-83
Commit: 73ca1e038e4cbdd9580aa651af37c4becb75dc22
Parents: b90f7ce
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 12:53:40 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 12:53:40 2014 -0500

----------------------------------------------------------------------
 .../main/java/com/google/gmail/provider/GMailProvider.java   | 8 +++++++-
 .../main/java/com/google/gplus/provider/GPlusProvider.java   | 5 +++++
 2 files changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/73ca1e03/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
index f57cf2e..19a6407 100644
--- a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
+++ b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
@@ -63,6 +63,7 @@ public class GMailProvider implements StreamsProvider, DatumStatusCountable {
     protected BlockingQueue inQueue = new LinkedBlockingQueue<String>(10000);
 
     protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+    protected Future task;
 
     public BlockingQueue<Object> getInQueue() {
         return inQueue;
@@ -105,7 +106,7 @@ public class GMailProvider implements StreamsProvider, DatumStatusCountable {
     @Override
     public void startStream() {
 
-        executor.submit(new GMailImapProviderTask(this));
+        task = executor.submit(new GMailImapProviderTask(this));
 
     }
 
@@ -137,6 +138,11 @@ public class GMailProvider implements StreamsProvider, DatumStatusCountable {
     }
 
     @Override
+    public boolean isRunning() {
+        return !task.isDone() && !task.isCancelled();
+    }
+
+    @Override
     public void prepare(Object configurationObject) {
 
         Preconditions.checkNotNull(this.klass);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/73ca1e03/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusProvider.java
index a5a504b..9257783 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusProvider.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusProvider.java
@@ -148,6 +148,11 @@ public class GPlusProvider implements StreamsProvider {
     }
 
     @Override
+    public boolean isRunning() {
+        return !providerTaskComplete.isDone() && !providerTaskComplete.isCancelled();
+    }
+
+    @Override
     public void prepare(Object configurationObject) {
 
         Preconditions.checkNotNull(this.klass);


[02/14] git commit: STREAMS-83 | Updated Twitter user information provider with running method

Posted by mf...@apache.org.
STREAMS-83 | Updated Twitter user information provider with running method


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/673b38ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/673b38ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/673b38ba

Branch: refs/heads/STREAMS-83
Commit: 673b38ba41212850794847dc71efcd782933fa17
Parents: a8ae193
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 11:39:27 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 11:39:27 2014 -0500

----------------------------------------------------------------------
 .../twitter/provider/TwitterUserInformationProvider.java | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/673b38ba/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index f773e3b..b69c937 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -46,6 +46,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class TwitterUserInformationProvider implements StreamsProvider, Serializable
 {
@@ -71,6 +72,8 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
     protected DateTime start;
     protected DateTime end;
 
+    protected final AtomicBoolean running = new AtomicBoolean();
+
     private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
         return new ThreadPoolExecutor(nThreads, nThreads,
                 5000L, TimeUnit.MILLISECONDS,
@@ -104,7 +107,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
 
     @Override
     public void startStream() {
-        // no op
+        running.set(true);
     }
 
 
@@ -180,6 +183,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
         LOGGER.info("Providing {} docs", providerQueue.size());
 
         StreamsResultSet result =  new StreamsResultSet(providerQueue);
+        running.set(false);
 
         LOGGER.info("Exiting");
 
@@ -201,6 +205,11 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
         return result;
     }
 
+    @Override
+    public boolean isRunning() {
+        return running.get();
+    }
+
     void shutdownAndAwaitTermination(ExecutorService pool) {
         pool.shutdown(); // Disable new tasks from being submitted
         try {


[13/14] git commit: STREAMS-83 | Updated elasticsearch provider with running method

Posted by mf...@apache.org.
STREAMS-83 | Updated elasticsearch provider with running method


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b90f7ce5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b90f7ce5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b90f7ce5

Branch: refs/heads/STREAMS-83
Commit: b90f7ce5022880b75ca47716f6f63237186ebc63
Parents: 8d874a9
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 12:50:24 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 12:50:24 2014 -0500

----------------------------------------------------------------------
 .../src/main/java/org/apache/streams/s3/S3PersistReader.java | 6 ++++--
 .../streams/elasticsearch/ElasticsearchPersistReader.java    | 8 +++++++-
 2 files changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b90f7ce5/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
index a73f744..1123dbb 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
@@ -38,6 +38,7 @@ import java.util.Collection;
 import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 
 public class S3PersistReader implements StreamsPersistReader, DatumStatusCountable {
@@ -55,6 +56,7 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
 
     protected DatumStatusCounter countersTotal = new DatumStatusCounter();
     protected DatumStatusCounter countersCurrent = new DatumStatusCounter();
+    private Future<?> task;
 
     public AmazonS3Client getAmazonS3Client() {
         return this.amazonS3Client;
@@ -78,7 +80,7 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
 
     @Override
     public boolean isRunning() {
-        return !executor.isShutdown() && !executor.isTerminated();
+        return !task.isDone() && !task.isCancelled();
     }
 
     public DatumStatusCounter getDatumStatusCounter() {
@@ -160,7 +162,7 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
 
     public void startStream() {
         LOGGER.debug("startStream");
-        executor.submit(new S3PersistReaderTask(this));
+        task = executor.submit(new S3PersistReaderTask(this));
     }
 
     public StreamsResultSet readCurrent() {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b90f7ce5/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
index 8c0af33..0f91e8a 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
@@ -56,6 +56,7 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Seriali
     private int threadPoolSize = 10;
     private ExecutorService executor;
     private ReadWriteLock lock = new ReentrantReadWriteLock();
+    private Future<?> readerTask;
 
     public ElasticsearchPersistReader() {
     }
@@ -69,7 +70,7 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Seriali
     public void startStream() {
         LOGGER.debug("startStream");
         executor = Executors.newSingleThreadExecutor();
-        executor.submit(new ElasticsearchPersistReaderTask(this, elasticsearchQuery));
+        readerTask = executor.submit(new ElasticsearchPersistReaderTask(this, elasticsearchQuery));
     }
 
     @Override
@@ -118,6 +119,11 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Seriali
     }
 
     @Override
+    public boolean isRunning() {
+        return !readerTask.isDone() && !readerTask.isCancelled();
+    }
+
+    @Override
     public void cleanUp() {
         this.shutdownAndAwaitTermination(executor);
         LOGGER.info("PersistReader done");


[12/14] git commit: STREAMS-83 | Updated webhdfs provider with running method

Posted by mf...@apache.org.
STREAMS-83 | Updated webhdfs provider with running method


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/8d874a91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/8d874a91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/8d874a91

Branch: refs/heads/STREAMS-83
Commit: 8d874a913624bc9344109cf07cbaddbfc9510089
Parents: c9e80f5
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 12:42:18 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 12:42:18 2014 -0500

----------------------------------------------------------------------
 .../java/org/apache/streams/hdfs/WebHdfsPersistReader.java  | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8d874a91/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index 7bc7f61..1baddc3 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -40,6 +40,7 @@ import java.security.PrivilegedExceptionAction;
 import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 
 /**
@@ -67,6 +68,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
 
     protected DatumStatusCounter countersTotal = new DatumStatusCounter();
     protected DatumStatusCounter countersCurrent = new DatumStatusCounter();
+    private Future<?> task;
 
     public WebHdfsPersistReader(HdfsReaderConfiguration hdfsConfiguration) {
         this.hdfsConfiguration = hdfsConfiguration;
@@ -165,7 +167,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
     @Override
     public void startStream() {
         LOGGER.debug("startStream");
-        executor.submit(new WebHdfsPersistReaderTask(this));
+        task = executor.submit(new WebHdfsPersistReaderTask(this));
     }
 
     @Override
@@ -196,6 +198,11 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
     }
 
     @Override
+    public boolean isRunning() {
+        return !task.isDone() && !task.isCancelled();
+    }
+
+    @Override
     public DatumStatusCounter getDatumStatusCounter() {
         return countersTotal;
     }


[11/14] git commit: STREAMS-83 | Updated kafka provider with running method

Posted by mf...@apache.org.
STREAMS-83 | Updated kafka provider with running method


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c9e80f5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c9e80f5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c9e80f5d

Branch: refs/heads/STREAMS-83
Commit: c9e80f5de11579fe9356aa2bb60a8b6aefdc483b
Parents: b206852
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 12:37:20 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 12:37:20 2014 -0500

----------------------------------------------------------------------
 .../main/java/org/apache/streams/kafka/KafkaPersistReader.java  | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c9e80f5d/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
index fd49b1d..a7810b1 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
@@ -121,6 +121,11 @@ public class KafkaPersistReader implements StreamsPersistReader, Serializable {
         return null;
     }
 
+    @Override
+    public boolean isRunning() {
+        return !executor.isShutdown() && !executor.isTerminated();
+    }
+
     private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
         Properties props = new Properties();
         props.put("zookeeper.connect", a_zookeeper);


[10/14] git commit: STREAMS-83 | Updated s3 provider with running method

Posted by mf...@apache.org.
STREAMS-83 | Updated s3 provider with running method


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b206852c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b206852c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b206852c

Branch: refs/heads/STREAMS-83
Commit: b206852c78b625adb7637a1f22ed4ee40ffa4674
Parents: 42346da
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 12:33:21 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 12:33:21 2014 -0500

----------------------------------------------------------------------
 .../src/main/java/org/apache/streams/s3/S3PersistReader.java    | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b206852c/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
index 5c7413e..a73f744 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
@@ -76,6 +76,11 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
         return null;
     }
 
+    @Override
+    public boolean isRunning() {
+        return !executor.isShutdown() && !executor.isTerminated();
+    }
+
     public DatumStatusCounter getDatumStatusCounter() {
         return countersTotal;
     }


[03/14] git commit: STREAMS-83 | Updated Sysomos information provider with running method

Posted by mf...@apache.org.
STREAMS-83 | Updated Sysomos information provider with running method


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/89b191e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/89b191e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/89b191e1

Branch: refs/heads/STREAMS-83
Commit: 89b191e104c94338ccdac6c5fb70dffe1faa38db
Parents: 673b38b
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 11:39:55 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 11:39:55 2014 -0500

----------------------------------------------------------------------
 .../org/apache/streams/sysomos/provider/SysomosProvider.java  | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/89b191e1/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
index 0073ac2..7d8a35e 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
@@ -154,6 +154,11 @@ public class SysomosProvider implements StreamsProvider {
     }
 
     @Override
+    public boolean isRunning() {
+        return completedHeartbeats.size() < this.getConfig().getHeartbeatIds().size();
+    }
+
+    @Override
     public void prepare(Object configurationObject) {
         this.providerQueue = constructQueue();
         if(configurationObject instanceof Map) {
@@ -187,7 +192,7 @@ public class SysomosProvider implements StreamsProvider {
         try {
             this.lock.writeLock().lock();
             this.completedHeartbeats.add(heartbeatId);
-            if(completedHeartbeats.size() == this.getConfig().getHeartbeatIds().size()) {
+            if(!this.isRunning()) {
                 this.cleanUp();
             }
         } finally {


[04/14] git commit: STREAMS-83 | Updated Console provider with running method

Posted by mf...@apache.org.
STREAMS-83 | Updated Console provider with running method


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/8bdb63c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/8bdb63c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/8bdb63c5

Branch: refs/heads/STREAMS-83
Commit: 8bdb63c5a244c91ebf9819c2a13726a50edb5c85
Parents: 89b191e
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 11:40:13 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 11:40:13 2014 -0500

----------------------------------------------------------------------
 .../java/org/apache/streams/console/ConsolePersistReader.java   | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8bdb63c5/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
index e2eb646..776d5a3 100644
--- a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
+++ b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
@@ -104,4 +104,9 @@ public class ConsolePersistReader implements StreamsPersistReader {
     public StreamsResultSet readRange(DateTime start, DateTime end) {
         return readCurrent();
     }
+
+    @Override
+    public boolean isRunning() {
+        return true;  //Will always be running
+    }
 }


[08/14] git commit: STREAMS-83 | Updated moreover provider with running method

Posted by mf...@apache.org.
STREAMS-83 | Updated moreover provider with running method


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5bad0f35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5bad0f35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5bad0f35

Branch: refs/heads/STREAMS-83
Commit: 5bad0f35549f07679f82b827c9a215ac568f0b91
Parents: ef483d4
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 12:24:51 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 12:24:51 2014 -0500

----------------------------------------------------------------------
 .../org/apache/streams/data/moreover/MoreoverProvider.java    | 5 +++++
 .../org/apache/streams/rss/provider/RssStreamProvider.java    | 7 +------
 2 files changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bad0f35/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
index 5b53c0c..de5c88a 100644
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
@@ -92,6 +92,11 @@ public class MoreoverProvider implements StreamsProvider {
     }
 
     @Override
+    public boolean isRunning() {
+        return !executor.isShutdown() && !executor.isTerminated();
+    }
+
+    @Override
     public void prepare(Object configurationObject) {
         LOGGER.debug("Prepare");
         executor = Executors.newSingleThreadExecutor();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bad0f35/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
index 02ed1db..0cc3430 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
@@ -78,8 +78,6 @@ public class RssStreamProvider implements StreamsProvider {
 
     protected List<SyndFeed> feeds;
 
-    protected final AtomicBoolean running = new AtomicBoolean();
-
     private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
         return new ThreadPoolExecutor(nThreads, nThreads,
                 5000L, TimeUnit.MILLISECONDS,
@@ -124,15 +122,12 @@ public class RssStreamProvider implements StreamsProvider {
         for( int i = 0; i < ((config.getFeeds().size() / 5) + 1); i++ )
             executor.submit(new RssEventProcessor(inQueue, providerQueue, klass));
 
-        running.set(true);
-
     }
 
     public void stop() {
         for (int i = 0; i < ((config.getFeeds().size() / 5) + 1); i++) {
             inQueue.add(RssEventProcessor.TERMINATE);
         }
-        running.set(false);
     }
 
     public Queue<StreamsDatum> getProviderQueue() {
@@ -158,7 +153,7 @@ public class RssStreamProvider implements StreamsProvider {
 
     @Override
     public boolean isRunning() {
-        return running.get();
+        return !executor.isTerminated() && !executor.isShutdown();
     }
 
     @Override


[06/14] git commit: STREAMS-83 | Updated twitter timeline provider with running method

Posted by mf...@apache.org.
STREAMS-83 | Updated twitter timeline provider with running method


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/1327b3a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/1327b3a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/1327b3a3

Branch: refs/heads/STREAMS-83
Commit: 1327b3a38036e78af427adb5d7b122882ce49cc7
Parents: 82858eb
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 12:16:47 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 12:16:47 2014 -0500

----------------------------------------------------------------------
 .../twitter/provider/TwitterTimelineProvider.java       | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1327b3a3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 27ba777..9f9d524 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -42,6 +42,7 @@ import twitter4j.conf.ConfigurationBuilder;
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.Iterator;
@@ -86,6 +87,8 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
     protected DateTime start;
     protected DateTime end;
 
+    protected final AtomicBoolean running = new AtomicBoolean();
+
     Boolean jsonStoreEnabled;
     Boolean includeEntitiesEnabled;
 
@@ -225,7 +228,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
         } finally {
             lock.writeLock().unlock();
         }
-
+        running.set(false);
         LOGGER.info("Exiting");
 
         return result;
@@ -246,6 +249,11 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
         throw new NotImplementedException();
     }
 
+    @Override
+    public boolean isRunning() {
+        return running.get();
+    }
+
     void shutdownAndAwaitTermination(ExecutorService pool) {
         pool.shutdown(); // Disable new tasks from being submitted
         try {
@@ -269,7 +277,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
     public void prepare(Object o) {
 
         executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
-
+        running.set(true);
         try {
             lock.writeLock().lock();
             providerQueue = constructQueue();


[09/14] git commit: STREAMS-83 | Updated datasift provider with running method

Posted by mf...@apache.org.
STREAMS-83 | Updated datasift provider with running method


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/42346da2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/42346da2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/42346da2

Branch: refs/heads/STREAMS-83
Commit: 42346da25633fdfd8ac096c36cf182a5d065c804
Parents: 5bad0f3
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 12:30:41 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 12:30:41 2014 -0500

----------------------------------------------------------------------
 .../streams/datasift/provider/DatasiftStreamProvider.java       | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/42346da2/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
index 96466ea..538d8d1 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
@@ -190,6 +190,11 @@ public class DatasiftStreamProvider implements StreamsProvider {
     }
 
     @Override
+    public boolean isRunning() {
+        return this.clients != null && this.clients.size() > 0;
+    }
+
+    @Override
     public void prepare(Object configurationObject) {
         this.interactions = new ConcurrentLinkedQueue<Interaction>();
         this.clients = Maps.newHashMap();