You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/06/17 21:45:44 UTC
[01/15] git commit: STREAMS-83 | Updated interface with isRunning
method
Repository: incubator-streams
Updated Branches:
refs/heads/master ec2ae3518 -> 73ca1e038
STREAMS-83 | Updated interface with isRunning method
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/92d8d866
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/92d8d866
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/92d8d866
Branch: refs/heads/master
Commit: 92d8d8669a077c884deee635e48728e0b089194e
Parents: ec2ae35
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 10:36:59 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 10:36:59 2014 -0500
----------------------------------------------------------------------
.../apache/streams/core/StreamsProvider.java | 38 +++++++++++++++++---
1 file changed, 33 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/92d8d866/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java b/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java
index 2287c9a..1e6cf12 100644
--- a/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java
+++ b/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java
@@ -24,13 +24,41 @@ import java.math.BigInteger;
import java.util.Queue;
/**
- * Created by sblackmon on 12/13/13.
+ * A StreamsProvider represents the entry point into the Streams pipeline. Providers are responsible for inserting
+ * data into the pipeline in discrete result sets.
*/
public interface StreamsProvider extends StreamsOperation {
- public void startStream();
- public StreamsResultSet readCurrent();
- public StreamsResultSet readNew(BigInteger sequence);
- public StreamsResultSet readRange(DateTime start, DateTime end);
+ /**
+ * Start the operation of the stream
+ */
+ void startStream();
+ /**
+ * Read the current items available from the provider
+ * @return a non-null {@link org.apache.streams.core.StreamsResultSet}
+ */
+ StreamsResultSet readCurrent();
+
+ /**
+ * TODO: Define how this operates or eliminate
+ * @param sequence
+ * @return
+ */
+ StreamsResultSet readNew(BigInteger sequence);
+
+ /**
+ * TODO: Define how this operates or eliminate
+ * @param start
+ * @param end
+ * @return
+ */
+ StreamsResultSet readRange(DateTime start, DateTime end);
+
+ /**
+ * Flag to indicate whether the provider is producing data
+ * @return true if the processor is actively awaiting or producing data. False otherwise.
+ */
+ boolean isRunning();
}
+
[09/15] git commit: STREAMS-83 | Updated moreover provider with
running method
Posted by mf...@apache.org.
STREAMS-83 | Updated moreover provider with running method
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5bad0f35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5bad0f35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5bad0f35
Branch: refs/heads/master
Commit: 5bad0f35549f07679f82b827c9a215ac568f0b91
Parents: ef483d4
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 12:24:51 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 12:24:51 2014 -0500
----------------------------------------------------------------------
.../org/apache/streams/data/moreover/MoreoverProvider.java | 5 +++++
.../org/apache/streams/rss/provider/RssStreamProvider.java | 7 +------
2 files changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bad0f35/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
index 5b53c0c..de5c88a 100644
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
@@ -92,6 +92,11 @@ public class MoreoverProvider implements StreamsProvider {
}
@Override
+ public boolean isRunning() {
+ return !executor.isShutdown() && !executor.isTerminated();
+ }
+
+ @Override
public void prepare(Object configurationObject) {
LOGGER.debug("Prepare");
executor = Executors.newSingleThreadExecutor();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bad0f35/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
index 02ed1db..0cc3430 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
@@ -78,8 +78,6 @@ public class RssStreamProvider implements StreamsProvider {
protected List<SyndFeed> feeds;
- protected final AtomicBoolean running = new AtomicBoolean();
-
private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
return new ThreadPoolExecutor(nThreads, nThreads,
5000L, TimeUnit.MILLISECONDS,
@@ -124,15 +122,12 @@ public class RssStreamProvider implements StreamsProvider {
for( int i = 0; i < ((config.getFeeds().size() / 5) + 1); i++ )
executor.submit(new RssEventProcessor(inQueue, providerQueue, klass));
- running.set(true);
-
}
public void stop() {
for (int i = 0; i < ((config.getFeeds().size() / 5) + 1); i++) {
inQueue.add(RssEventProcessor.TERMINATE);
}
- running.set(false);
}
public Queue<StreamsDatum> getProviderQueue() {
@@ -158,7 +153,7 @@ public class RssStreamProvider implements StreamsProvider {
@Override
public boolean isRunning() {
- return running.get();
+ return !executor.isTerminated() && !executor.isShutdown();
}
@Override
[05/15] git commit: STREAMS-83 | Updated Console provider with
running method
Posted by mf...@apache.org.
STREAMS-83 | Updated Console provider with running method
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/8bdb63c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/8bdb63c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/8bdb63c5
Branch: refs/heads/master
Commit: 8bdb63c5a244c91ebf9819c2a13726a50edb5c85
Parents: 89b191e
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 11:40:13 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 11:40:13 2014 -0500
----------------------------------------------------------------------
.../java/org/apache/streams/console/ConsolePersistReader.java | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8bdb63c5/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
index e2eb646..776d5a3 100644
--- a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
+++ b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
@@ -104,4 +104,9 @@ public class ConsolePersistReader implements StreamsPersistReader {
public StreamsResultSet readRange(DateTime start, DateTime end) {
return readCurrent();
}
+
+ @Override
+ public boolean isRunning() {
+ return true; //Will always be running
+ }
}
[15/15] git commit: STREAMS-83 | Updated google provider with running
method
Posted by mf...@apache.org.
STREAMS-83 | Updated google provider with running method
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/73ca1e03
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/73ca1e03
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/73ca1e03
Branch: refs/heads/master
Commit: 73ca1e038e4cbdd9580aa651af37c4becb75dc22
Parents: b90f7ce
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 12:53:40 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 12:53:40 2014 -0500
----------------------------------------------------------------------
.../main/java/com/google/gmail/provider/GMailProvider.java | 8 +++++++-
.../main/java/com/google/gplus/provider/GPlusProvider.java | 5 +++++
2 files changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/73ca1e03/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
index f57cf2e..19a6407 100644
--- a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
+++ b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
@@ -63,6 +63,7 @@ public class GMailProvider implements StreamsProvider, DatumStatusCountable {
protected BlockingQueue inQueue = new LinkedBlockingQueue<String>(10000);
protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ protected Future task;
public BlockingQueue<Object> getInQueue() {
return inQueue;
@@ -105,7 +106,7 @@ public class GMailProvider implements StreamsProvider, DatumStatusCountable {
@Override
public void startStream() {
- executor.submit(new GMailImapProviderTask(this));
+ task = executor.submit(new GMailImapProviderTask(this));
}
@@ -137,6 +138,11 @@ public class GMailProvider implements StreamsProvider, DatumStatusCountable {
}
@Override
+ public boolean isRunning() {
+ return !task.isDone() && !task.isCancelled();
+ }
+
+ @Override
public void prepare(Object configurationObject) {
Preconditions.checkNotNull(this.klass);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/73ca1e03/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusProvider.java
index a5a504b..9257783 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusProvider.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusProvider.java
@@ -148,6 +148,11 @@ public class GPlusProvider implements StreamsProvider {
}
@Override
+ public boolean isRunning() {
+ return !providerTaskComplete.isDone() && !providerTaskComplete.isCancelled();
+ }
+
+ @Override
public void prepare(Object configurationObject) {
Preconditions.checkNotNull(this.klass);
[02/15] git commit: STREAMS-83 | Updated test providers with new
mehtod
Posted by mf...@apache.org.
STREAMS-83 | Updated test providers with new mehtod
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a8ae1937
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a8ae1937
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a8ae1937
Branch: refs/heads/master
Commit: a8ae1937f921549ed69b11d644ee3157185495c0
Parents: 92d8d86
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 10:50:35 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 10:50:35 2014 -0500
----------------------------------------------------------------------
.../streams/local/test/providers/EmptyResultSetProvider.java | 5 +++++
.../streams/local/test/providers/NumericMessageProvider.java | 5 +++++
.../org/apache/streams/test/component/FileReaderProvider.java | 5 +++++
3 files changed, 15 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a8ae1937/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
index d8dc4eb..fe60295 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
@@ -52,6 +52,11 @@ public class EmptyResultSetProvider implements StreamsProvider {
}
@Override
+ public boolean isRunning() {
+ return false;
+ }
+
+ @Override
public void prepare(Object configurationObject) {
//NOP
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a8ae1937/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
index 987c680..159355d 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
@@ -59,6 +59,11 @@ public class NumericMessageProvider implements StreamsProvider {
}
@Override
+ public boolean isRunning() {
+ return false;
+ }
+
+ @Override
public void prepare(Object configurationObject) {
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a8ae1937/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
index 3f84c81..9c1ae97 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
@@ -72,6 +72,11 @@ public class FileReaderProvider implements StreamsProvider {
}
@Override
+ public boolean isRunning() {
+ return this.scanner.hasNextLine();
+ }
+
+ @Override
public void prepare(Object configurationObject) {
this.scanner = new Scanner(FileReaderProvider.class.getResourceAsStream(this.fileName));
}
[07/15] git commit: STREAMS-83 | Updated twitter timeline provider
with running method
Posted by mf...@apache.org.
STREAMS-83 | Updated twitter timeline provider with running method
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/1327b3a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/1327b3a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/1327b3a3
Branch: refs/heads/master
Commit: 1327b3a38036e78af427adb5d7b122882ce49cc7
Parents: 82858eb
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 12:16:47 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 12:16:47 2014 -0500
----------------------------------------------------------------------
.../twitter/provider/TwitterTimelineProvider.java | 12 ++++++++++--
1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1327b3a3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 27ba777..9f9d524 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -42,6 +42,7 @@ import twitter4j.conf.ConfigurationBuilder;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.Iterator;
@@ -86,6 +87,8 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
protected DateTime start;
protected DateTime end;
+ protected final AtomicBoolean running = new AtomicBoolean();
+
Boolean jsonStoreEnabled;
Boolean includeEntitiesEnabled;
@@ -225,7 +228,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
} finally {
lock.writeLock().unlock();
}
-
+ running.set(false);
LOGGER.info("Exiting");
return result;
@@ -246,6 +249,11 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
throw new NotImplementedException();
}
+ @Override
+ public boolean isRunning() {
+ return running.get();
+ }
+
void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
@@ -269,7 +277,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
public void prepare(Object o) {
executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
-
+ running.set(true);
try {
lock.writeLock().lock();
providerQueue = constructQueue();
[13/15] git commit: STREAMS-83 | Updated webhdfs provider with
running method
Posted by mf...@apache.org.
STREAMS-83 | Updated webhdfs provider with running method
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/8d874a91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/8d874a91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/8d874a91
Branch: refs/heads/master
Commit: 8d874a913624bc9344109cf07cbaddbfc9510089
Parents: c9e80f5
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 12:42:18 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 12:42:18 2014 -0500
----------------------------------------------------------------------
.../java/org/apache/streams/hdfs/WebHdfsPersistReader.java | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8d874a91/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index 7bc7f61..1baddc3 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -40,6 +40,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
/**
@@ -67,6 +68,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
protected DatumStatusCounter countersTotal = new DatumStatusCounter();
protected DatumStatusCounter countersCurrent = new DatumStatusCounter();
+ private Future<?> task;
public WebHdfsPersistReader(HdfsReaderConfiguration hdfsConfiguration) {
this.hdfsConfiguration = hdfsConfiguration;
@@ -165,7 +167,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
@Override
public void startStream() {
LOGGER.debug("startStream");
- executor.submit(new WebHdfsPersistReaderTask(this));
+ task = executor.submit(new WebHdfsPersistReaderTask(this));
}
@Override
@@ -196,6 +198,11 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
}
@Override
+ public boolean isRunning() {
+ return !task.isDone() && !task.isCancelled();
+ }
+
+ @Override
public DatumStatusCounter getDatumStatusCounter() {
return countersTotal;
}
[06/15] git commit: STREAMS-83 | Updated twitter stream provider with
running method
Posted by mf...@apache.org.
STREAMS-83 | Updated twitter stream provider with running method
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/82858ebf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/82858ebf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/82858ebf
Branch: refs/heads/master
Commit: 82858ebfd0c4027780a92e763bb3b6e9fec950b9
Parents: 8bdb63c
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 11:54:16 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 11:54:16 2014 -0500
----------------------------------------------------------------------
.../apache/streams/twitter/provider/TwitterStreamProvider.java | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/82858ebf/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index 0a81deb..f7c438c 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -137,6 +137,11 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
}
@Override
+ public boolean isRunning() {
+ return !executor.isShutdown() && !executor.isTerminated();
+ }
+
+ @Override
public void prepare(Object o) {
executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
[12/15] git commit: STREAMS-83 | Updated kafka provider with running
method
Posted by mf...@apache.org.
STREAMS-83 | Updated kafka provider with running method
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c9e80f5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c9e80f5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c9e80f5d
Branch: refs/heads/master
Commit: c9e80f5de11579fe9356aa2bb60a8b6aefdc483b
Parents: b206852
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 12:37:20 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 12:37:20 2014 -0500
----------------------------------------------------------------------
.../main/java/org/apache/streams/kafka/KafkaPersistReader.java | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c9e80f5d/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
index fd49b1d..a7810b1 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
@@ -121,6 +121,11 @@ public class KafkaPersistReader implements StreamsPersistReader, Serializable {
return null;
}
+ @Override
+ public boolean isRunning() {
+ return !executor.isShutdown() && !executor.isTerminated();
+ }
+
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
[11/15] git commit: STREAMS-83 | Updated s3 provider with running
method
Posted by mf...@apache.org.
STREAMS-83 | Updated s3 provider with running method
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b206852c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b206852c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b206852c
Branch: refs/heads/master
Commit: b206852c78b625adb7637a1f22ed4ee40ffa4674
Parents: 42346da
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 12:33:21 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 12:33:21 2014 -0500
----------------------------------------------------------------------
.../src/main/java/org/apache/streams/s3/S3PersistReader.java | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b206852c/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
index 5c7413e..a73f744 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
@@ -76,6 +76,11 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
return null;
}
+ @Override
+ public boolean isRunning() {
+ return !executor.isShutdown() && !executor.isTerminated();
+ }
+
public DatumStatusCounter getDatumStatusCounter() {
return countersTotal;
}
[08/15] git commit: STREAMS-83 | Updated rss provider with running
method
Posted by mf...@apache.org.
STREAMS-83 | Updated rss provider with running method
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ef483d47
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ef483d47
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ef483d47
Branch: refs/heads/master
Commit: ef483d4732be299b8c651caea9ef94a1e8bb4cdc
Parents: 1327b3a
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 12:22:46 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 12:22:46 2014 -0500
----------------------------------------------------------------------
.../apache/streams/rss/provider/RssStreamProvider.java | 11 +++++++++++
1 file changed, 11 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ef483d47/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
index c9bbe49..02ed1db 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
@@ -45,6 +45,7 @@ import java.net.URL;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Created by sblackmon on 12/10/13.
@@ -77,6 +78,8 @@ public class RssStreamProvider implements StreamsProvider {
protected List<SyndFeed> feeds;
+ protected final AtomicBoolean running = new AtomicBoolean();
+
private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
return new ThreadPoolExecutor(nThreads, nThreads,
5000L, TimeUnit.MILLISECONDS,
@@ -121,12 +124,15 @@ public class RssStreamProvider implements StreamsProvider {
for( int i = 0; i < ((config.getFeeds().size() / 5) + 1); i++ )
executor.submit(new RssEventProcessor(inQueue, providerQueue, klass));
+ running.set(true);
+
}
public void stop() {
for (int i = 0; i < ((config.getFeeds().size() / 5) + 1); i++) {
inQueue.add(RssEventProcessor.TERMINATE);
}
+ running.set(false);
}
public Queue<StreamsDatum> getProviderQueue() {
@@ -151,6 +157,11 @@ public class RssStreamProvider implements StreamsProvider {
}
@Override
+ public boolean isRunning() {
+ return running.get();
+ }
+
+ @Override
public void prepare(Object configurationObject) {
}
[14/15] git commit: STREAMS-83 | Updated elasticsearch provider with
running method
Posted by mf...@apache.org.
STREAMS-83 | Updated elasticsearch provider with running method
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b90f7ce5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b90f7ce5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b90f7ce5
Branch: refs/heads/master
Commit: b90f7ce5022880b75ca47716f6f63237186ebc63
Parents: 8d874a9
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 12:50:24 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 12:50:24 2014 -0500
----------------------------------------------------------------------
.../src/main/java/org/apache/streams/s3/S3PersistReader.java | 6 ++++--
.../streams/elasticsearch/ElasticsearchPersistReader.java | 8 +++++++-
2 files changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b90f7ce5/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
index a73f744..1123dbb 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
@@ -38,6 +38,7 @@ import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
public class S3PersistReader implements StreamsPersistReader, DatumStatusCountable {
@@ -55,6 +56,7 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
protected DatumStatusCounter countersTotal = new DatumStatusCounter();
protected DatumStatusCounter countersCurrent = new DatumStatusCounter();
+ private Future<?> task;
public AmazonS3Client getAmazonS3Client() {
return this.amazonS3Client;
@@ -78,7 +80,7 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
@Override
public boolean isRunning() {
- return !executor.isShutdown() && !executor.isTerminated();
+ return !task.isDone() && !task.isCancelled();
}
public DatumStatusCounter getDatumStatusCounter() {
@@ -160,7 +162,7 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
public void startStream() {
LOGGER.debug("startStream");
- executor.submit(new S3PersistReaderTask(this));
+ task = executor.submit(new S3PersistReaderTask(this));
}
public StreamsResultSet readCurrent() {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b90f7ce5/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
index 8c0af33..0f91e8a 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
@@ -56,6 +56,7 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Seriali
private int threadPoolSize = 10;
private ExecutorService executor;
private ReadWriteLock lock = new ReentrantReadWriteLock();
+ private Future<?> readerTask;
public ElasticsearchPersistReader() {
}
@@ -69,7 +70,7 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Seriali
public void startStream() {
LOGGER.debug("startStream");
executor = Executors.newSingleThreadExecutor();
- executor.submit(new ElasticsearchPersistReaderTask(this, elasticsearchQuery));
+ readerTask = executor.submit(new ElasticsearchPersistReaderTask(this, elasticsearchQuery));
}
@Override
@@ -118,6 +119,11 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Seriali
}
@Override
+ public boolean isRunning() {
+ return !readerTask.isDone() && !readerTask.isCancelled();
+ }
+
+ @Override
public void cleanUp() {
this.shutdownAndAwaitTermination(executor);
LOGGER.info("PersistReader done");
[10/15] git commit: STREAMS-83 | Updated datasift provider with
running method
Posted by mf...@apache.org.
STREAMS-83 | Updated datasift provider with running method
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/42346da2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/42346da2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/42346da2
Branch: refs/heads/master
Commit: 42346da25633fdfd8ac096c36cf182a5d065c804
Parents: 5bad0f3
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 12:30:41 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 12:30:41 2014 -0500
----------------------------------------------------------------------
.../streams/datasift/provider/DatasiftStreamProvider.java | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/42346da2/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
index 96466ea..538d8d1 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java
@@ -190,6 +190,11 @@ public class DatasiftStreamProvider implements StreamsProvider {
}
@Override
+ public boolean isRunning() {
+ return this.clients != null && this.clients.size() > 0;
+ }
+
+ @Override
public void prepare(Object configurationObject) {
this.interactions = new ConcurrentLinkedQueue<Interaction>();
this.clients = Maps.newHashMap();
[03/15] git commit: STREAMS-83 | Updated Twitter user information
provider with running method
Posted by mf...@apache.org.
STREAMS-83 | Updated Twitter user information provider with running method
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/673b38ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/673b38ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/673b38ba
Branch: refs/heads/master
Commit: 673b38ba41212850794847dc71efcd782933fa17
Parents: a8ae193
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 11:39:27 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 11:39:27 2014 -0500
----------------------------------------------------------------------
.../twitter/provider/TwitterUserInformationProvider.java | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/673b38ba/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index f773e3b..b69c937 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -46,6 +46,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
public class TwitterUserInformationProvider implements StreamsProvider, Serializable
{
@@ -71,6 +72,8 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
protected DateTime start;
protected DateTime end;
+ protected final AtomicBoolean running = new AtomicBoolean();
+
private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
return new ThreadPoolExecutor(nThreads, nThreads,
5000L, TimeUnit.MILLISECONDS,
@@ -104,7 +107,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
@Override
public void startStream() {
- // no op
+ running.set(true);
}
@@ -180,6 +183,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
LOGGER.info("Providing {} docs", providerQueue.size());
StreamsResultSet result = new StreamsResultSet(providerQueue);
+ running.set(false);
LOGGER.info("Exiting");
@@ -201,6 +205,11 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
return result;
}
+ @Override
+ public boolean isRunning() {
+ return running.get();
+ }
+
void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
[04/15] git commit: STREAMS-83 | Updated Sysomos information provider
with running method
Posted by mf...@apache.org.
STREAMS-83 | Updated Sysomos information provider with running method
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/89b191e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/89b191e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/89b191e1
Branch: refs/heads/master
Commit: 89b191e104c94338ccdac6c5fb70dffe1faa38db
Parents: 673b38b
Author: mfranklin <mf...@apache.org>
Authored: Thu Jun 12 11:39:55 2014 -0500
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jun 12 11:39:55 2014 -0500
----------------------------------------------------------------------
.../org/apache/streams/sysomos/provider/SysomosProvider.java | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/89b191e1/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
index 0073ac2..7d8a35e 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
@@ -154,6 +154,11 @@ public class SysomosProvider implements StreamsProvider {
}
@Override
+ public boolean isRunning() {
+ return completedHeartbeats.size() < this.getConfig().getHeartbeatIds().size();
+ }
+
+ @Override
public void prepare(Object configurationObject) {
this.providerQueue = constructQueue();
if(configurationObject instanceof Map) {
@@ -187,7 +192,7 @@ public class SysomosProvider implements StreamsProvider {
try {
this.lock.writeLock().lock();
this.completedHeartbeats.add(heartbeatId);
- if(completedHeartbeats.size() == this.getConfig().getHeartbeatIds().size()) {
+ if(!this.isRunning()) {
this.cleanUp();
}
} finally {