You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/22 00:20:27 UTC
[49/71] [abbrv] git commit: Adding support for perpetual streams,
minor refactoring, work on moreover provider
Adding support for perpetual streams, minor refactoring, work on moreover provider
git-svn-id: https://svn.apache.org/repos/asf/incubator/streams/branches/STREAMS-26@1572821 13f79535-47bb-0310-9956-ffa450edef68
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ce6961d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ce6961d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ce6961d6
Branch: refs/heads/master
Commit: ce6961d6c54ba5367ef3f664c8dccec3448967e6
Parents: 2eb90cd
Author: sblackmon <sb...@unknown>
Authored: Fri Feb 28 05:14:16 2014 +0000
Committer: sblackmon <sb...@unknown>
Committed: Fri Feb 28 05:14:16 2014 +0000
----------------------------------------------------------------------
streams-contrib/pom.xml | 2 +-
.../streams/console/ConsolePersistReader.java | 5 ++
.../ElasticsearchPersistReader.java | 5 ++
.../streams/hdfs/WebHdfsPersistWriter.java | 8 +--
.../streams/kafka/KafkaPersistReader.java | 40 ++++--------
.../streams-provider-moreover/pom.xml | 4 ++
.../streams/data/moreover/MoreoverClient.java | 2 +-
.../streams/data/moreover/MoreoverProvider.java | 66 ++++++++++----------
.../data/moreover/MoreoverResultSetWrapper.java | 31 +--------
.../com/moreover/MoreoverConfiguration.json | 1 +
.../streams/rss/provider/RssStreamProvider.java | 5 +-
.../twitter/provider/TwitterStreamProvider.java | 6 +-
.../provider/TwitterTimelineProvider.java | 5 ++
.../apache/streams/core/StreamsProvider.java | 6 +-
.../core/builders/LocalStreamBuilder.java | 12 +++-
.../streams/core/builders/StreamBuilder.java | 9 +++
.../streams/core/builders/StreamComponent.java | 13 +++-
.../streams/core/tasks/StreamsProviderTask.java | 53 +++++++++++-----
18 files changed, 146 insertions(+), 127 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index 07660ee..c24a33b 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -44,7 +44,7 @@
<!--<module>streams-provider-datasift</module>-->
<!--<module>streams-provider-facebook</module>-->
<!--<module>streams-provider-gnip</module>-->
- <!--<module>streams-provider-moreover</module>-->
+ <module>streams-provider-moreover</module>
<module>streams-provider-twitter</module>
<!--<module>streams-provider-sysomos</module>-->
<module>streams-provider-rss</module>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
index 2addc65..a3967e7 100644
--- a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
+++ b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
@@ -45,6 +45,11 @@ public class ConsolePersistReader implements StreamsPersistReader {
}
@Override
+ public void startStream() {
+ // no op
+ }
+
+ @Override
public StreamsResultSet readAll() {
return readCurrent();
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
index 8dbdeb0..2129eca 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
@@ -121,6 +121,11 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Iterabl
}
@Override
+ public void startStream() {
+
+ }
+
+ @Override
public void prepare(Object o) {
// If we haven't already set up the search, then set up the search.
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index cb3ceec..0fc8407 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -57,9 +57,9 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
private ObjectMapper mapper = new ObjectMapper();
- private HdfsConfiguration hdfsConfiguration;
+ private HdfsWriterConfiguration hdfsConfiguration;
- public WebHdfsPersistWriter(HdfsConfiguration hdfsConfiguration) {
+ public WebHdfsPersistWriter(HdfsWriterConfiguration hdfsConfiguration) {
this.hdfsConfiguration = hdfsConfiguration;
}
@@ -180,7 +180,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
this.fileLineCounter = 0;
// Create the path for where the file is going to live.
- Path filePath = this.path.suffix("/" + this.filePart + "-" + new Date().getTime() + ".tsv");
+ Path filePath = this.path.suffix("/" + hdfsConfiguration.getWriterFilePrefix() + "-" + new Date().getTime() + ".tsv");
try
{
@@ -248,7 +248,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
@Override
public void prepare(Object configurationObject) {
connectToWebHDFS();
- path = new Path(hdfsConfiguration.getPath());
+ path = new Path(hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getWriterPath());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
index fd0dfdf..af43d4e 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
@@ -29,7 +29,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-public class KafkaPersistReader implements StreamsPersistReader, Serializable, Runnable {
+public class KafkaPersistReader implements StreamsPersistReader, Serializable {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistReader.class);
@@ -62,7 +62,9 @@ public class KafkaPersistReader implements StreamsPersistReader, Serializable, R
this.config = config;
}
- public void start() {
+ @Override
+ public void startStream() {
+
Properties props = new Properties();
props.setProperty("serializer.encoding", "UTF8");
@@ -81,23 +83,6 @@ public class KafkaPersistReader implements StreamsPersistReader, Serializable, R
}
- public void stop() {
- consumerConnector.shutdown();
- while( !executor.isTerminated()) {
- try {
- executor.awaitTermination(5, TimeUnit.SECONDS);
- } catch (InterruptedException e) {}
- }
- }
-
- public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
- this.persistQueue = persistQueue;
- }
-
- public Queue<StreamsDatum> getPersistQueue() {
- return this.persistQueue;
- }
-
@Override
public StreamsResultSet readAll() {
return readCurrent();
@@ -129,20 +114,17 @@ public class KafkaPersistReader implements StreamsPersistReader, Serializable, R
}
@Override
- public void run() {
- start();
-
- // once this class can be told when to shutdown by streams, it will run stop
- // stop();
- }
-
- @Override
public void prepare(Object configurationObject) {
- start();
+
}
@Override
public void cleanUp() {
- stop();
+ consumerConnector.shutdown();
+ while( !executor.isTerminated()) {
+ try {
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {}
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-contrib/streams-provider-moreover/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/pom.xml b/streams-contrib/streams-provider-moreover/pom.xml
index 6b281d9..4988715 100644
--- a/streams-contrib/streams-provider-moreover/pom.xml
+++ b/streams-contrib/streams-provider-moreover/pom.xml
@@ -41,6 +41,10 @@
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </dependency>
+ <dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
index 0ab6e33..ec968fe 100644
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
@@ -20,7 +20,7 @@ import java.util.Date;
public class MoreoverClient {
private static final Logger logger = LoggerFactory.getLogger(MoreoverClient.class);
- private static final String BASE_URL = "http://metabase.moreover.com/com.facebook.api/v10/articles?key=%s&limit=%s&sequence_id=%s";
+ private static final String BASE_URL = "http://metabase.moreover.com/api/v10/articles?key=%s&limit=%s&sequence_id=%s";
private final String id;
private String apiKey;
private BigInteger lastSequenceId = BigInteger.ZERO;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
index 133d6e3..4772f5e 100644
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
@@ -19,57 +19,47 @@ import java.util.concurrent.*;
public class MoreoverProvider implements StreamsProvider {
- private static Logger logger = LoggerFactory.getLogger(MoreoverProvider.class);
+ public final static String STREAMS_ID = "MoreoverProvider";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverProvider.class);
protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>();
- private List<ExecutorService> tasks = new LinkedList<ExecutorService>();
private List<MoreoverKeyData> keys;
- private boolean started = false;
+
+ private MoreoverConfiguration config;
+
+ private ExecutorService executor;
public MoreoverProvider(MoreoverConfiguration moreoverConfiguration) {
+ this.config = moreoverConfiguration;
this.keys = Lists.newArrayList();
- for( MoreoverKeyData apiKey : moreoverConfiguration.getApiKeys()) {
+ for( MoreoverKeyData apiKey : config.getApiKeys()) {
this.keys.add(apiKey);
}
- this.keys = Arrays.asList();
}
- public MoreoverProvider(MoreoverKeyData... keys) {
- this.keys = Arrays.asList(keys);
- }
+ public void startStream() {
- @Override
- public synchronized void start() {
- logger.trace("Starting Producer");
- if(!started) {
- logger.trace("Producer not started. Initializing");
- for(MoreoverKeyData key : keys) {
- MoreoverProviderTask task = new MoreoverProviderTask(key.getId(), key.getKey(), this.providerQueue, key.getStartingSequence());
- ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
- service.scheduleWithFixedDelay(task, 0, MoreoverProviderTask.LATENCY, TimeUnit.SECONDS);
- logger.info("Started producer for {} with service {}", key.getKey(), service.toString());
- this.tasks.add(service);
- }
- started = true;
+ for(MoreoverKeyData key : keys) {
+ MoreoverProviderTask task = new MoreoverProviderTask(key.getId(), key.getKey(), this.providerQueue, key.getStartingSequence());
+ executor.submit(new Thread(task));
+ LOGGER.info("Started producer for {}", key.getKey());
}
- }
- @Override
- public synchronized void stop() {
- for(ExecutorService service: tasks) {
- service.shutdown();
- }
- }
-
- @Override
- public Queue<StreamsDatum> getProviderQueue() {
- return providerQueue;
}
@Override
public StreamsResultSet readCurrent() {
- return null;
+ LOGGER.debug("readCurrent");
+
+ LOGGER.info("Providing {} docs", providerQueue.size());
+
+ StreamsResultSet result = new StreamsResultSet(providerQueue);
+
+ LOGGER.info("Exiting");
+
+ return result;
}
@Override
@@ -82,4 +72,14 @@ public class MoreoverProvider implements StreamsProvider {
return null;
}
+ @Override
+ public void prepare(Object configurationObject) {
+ LOGGER.debug("Prepare");
+ executor = Executors.newSingleThreadExecutor();
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResultSetWrapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResultSetWrapper.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResultSetWrapper.java
index 9c161c7..0785fac 100644
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResultSetWrapper.java
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResultSetWrapper.java
@@ -5,37 +5,12 @@ import org.apache.streams.core.StreamsResultSet;
import java.math.BigInteger;
import java.util.Iterator;
+import java.util.Queue;
-public class MoreoverResultSetWrapper implements StreamsResultSet {
-
- private MoreoverResult underlying;
+public class MoreoverResultSetWrapper extends StreamsResultSet {
public MoreoverResultSetWrapper(MoreoverResult underlying) {
- this.underlying = underlying;
- }
-
- @Override
- public long getStartTime() {
- return underlying.getStart();
- }
-
- @Override
- public long getEndTime() {
- return underlying.getEnd();
+ super((Queue<StreamsDatum>)underlying);
}
- @Override
- public String getSourceId() {
- return underlying.getClientId();
- }
-
- @Override
- public BigInteger getMaxSequence() {
- return underlying.getMaxSequencedId();
- }
-
- @Override
- public Iterator<StreamsDatum> iterator() {
- return underlying.iterator();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-contrib/streams-provider-moreover/src/main/jsonschema/com/moreover/MoreoverConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/jsonschema/com/moreover/MoreoverConfiguration.json b/streams-contrib/streams-provider-moreover/src/main/jsonschema/com/moreover/MoreoverConfiguration.json
index 6950918..d88f28c 100644
--- a/streams-contrib/streams-provider-moreover/src/main/jsonschema/com/moreover/MoreoverConfiguration.json
+++ b/streams-contrib/streams-provider-moreover/src/main/jsonschema/com/moreover/MoreoverConfiguration.json
@@ -3,6 +3,7 @@
"$schema": "http://json-schema.org/draft-03/schema",
"id": "#",
"javaType" : "org.apache.streams.moreover.MoreoverConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
"properties": {
"apiKeys": {
"type": "array",
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
index 9ef0bd6..449f187 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
@@ -85,7 +85,8 @@ public class RssStreamProvider implements StreamsProvider, Serializable {
this.klass = klass;
}
- public void start() {
+ @Override
+ public void startStream() {
Preconditions.checkNotNull(this.klass);
@@ -133,7 +134,7 @@ public class RssStreamProvider implements StreamsProvider, Serializable {
@Override
public void prepare(Object configurationObject) {
- start();
+
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index 4a8a51b..b0b4cf4 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -90,9 +90,10 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable {
}
- public void run() {
+ @Override
+ public void startStream() {
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < 5; i++) {
executor.submit(new TwitterEventProcessor(inQueue, providerQueue, klass));
}
@@ -101,7 +102,6 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable {
@Override
public StreamsResultSet readCurrent() {
- run();
StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
return result;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 6fb3e02..40fb961 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -118,6 +118,11 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
// }
// }
+ @Override
+ public void startStream() {
+ // no op
+ }
+
private void captureTimeline(long currentId) {
Paging paging = new Paging(1, 200);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java b/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java
index 56878a7..2287c9a 100644
--- a/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java
+++ b/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java
@@ -28,11 +28,7 @@ import java.util.Queue;
*/
public interface StreamsProvider extends StreamsOperation {
-// void start();
-// void stop();
-//
-// public Queue<StreamsDatum> getProviderQueue();
-
+ public void startStream();
public StreamsResultSet readCurrent();
public StreamsResultSet readNew(BigInteger sequence);
public StreamsResultSet readRange(DateTime start, DateTime end);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java b/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
index 76e925f..0598dfb 100644
--- a/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
+++ b/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
@@ -67,9 +67,17 @@ public class LocalStreamBuilder implements StreamBuilder{
}
@Override
+ public StreamBuilder newPerpetualStream(String id, StreamsProvider provider) {
+ validateId(id);
+ this.providers.put(id, new StreamComponent(id, provider, true));
+ ++this.totalTasks;
+ return this;
+ }
+
+ @Override
public StreamBuilder newReadCurrentStream(String id, StreamsProvider provider) {
validateId(id);
- this.providers.put(id, new StreamComponent(id, provider));
+ this.providers.put(id, new StreamComponent(id, provider, false));
++this.totalTasks;
return this;
}
@@ -139,7 +147,7 @@ public class LocalStreamBuilder implements StreamBuilder{
}
while(isRunning) {
- isRunning = false;
+ //isRunning = false;
for(StreamsProviderTask task : provTasks.values()) {
isRunning = isRunning || task.isRunning();
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-core/src/main/java/org/apache/streams/core/builders/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/builders/StreamBuilder.java b/streams-core/src/main/java/org/apache/streams/core/builders/StreamBuilder.java
index 918eb7a..133776d 100644
--- a/streams-core/src/main/java/org/apache/streams/core/builders/StreamBuilder.java
+++ b/streams-core/src/main/java/org/apache/streams/core/builders/StreamBuilder.java
@@ -51,6 +51,15 @@ public interface StreamBuilder {
* @param provider provider to execute
* @return this
*/
+ public StreamBuilder newPerpetualStream(String streamId, StreamsProvider provider);
+
+ /**
+ * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream. The provider will execute
+ * {@link org.apache.streams.core.StreamsProvider:readCurrent()} to produce data.
+ * @param streamId unique if for this provider - must be unique across the entire stream.
+ * @param provider provider to execute
+ * @return this
+ */
public StreamBuilder newReadCurrentStream(String streamId, StreamsProvider provider);
/**
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java b/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java
index 6243f94..15cb9e8 100644
--- a/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java
+++ b/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java
@@ -33,15 +33,17 @@ public class StreamComponent {
private DateTime[] dateRange;
private BigInteger sequence;
private int numTasks = 1;
+ private boolean perpetual;
/**
*
* @param id
* @param provider
*/
- public StreamComponent(String id, StreamsProvider provider) {
+ public StreamComponent(String id, StreamsProvider provider, boolean perpetual) {
this.id = id;
this.provider = provider;
+ this.perpetual = perpetual;
initializePrivateVariables();
}
@@ -182,9 +184,14 @@ public class StreamComponent {
}
}
else if(this.provider != null) {
- StreamsProvider prov = (StreamsProvider)SerializationUtil.cloneBySerialization(this.provider);
+ StreamsProvider prov;
+ if(this.numTasks > 1) {
+ prov = (StreamsProvider)SerializationUtil.cloneBySerialization(this.provider);
+ } else {
+ prov = this.provider;
+ }
if(this.dateRange == null && this.sequence == null)
- task = new StreamsProviderTask(prov);
+ task = new StreamsProviderTask(prov, this.perpetual);
else if(this.sequence != null)
task = new StreamsProviderTask(prov, this.sequence);
else
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6961d6/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProviderTask.java b/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProviderTask.java
index fe83160..07235d1 100644
--- a/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProviderTask.java
+++ b/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProviderTask.java
@@ -16,6 +16,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class StreamsProviderTask extends BaseStreamsTask {
private static enum Type {
+ PERPETUAL,
READ_CURRENT,
READ_NEW,
READ_RANGE
@@ -36,9 +37,12 @@ public class StreamsProviderTask extends BaseStreamsTask {
* Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readCurrent()}
* @param provider
*/
- public StreamsProviderTask(StreamsProvider provider) {
+ public StreamsProviderTask(StreamsProvider provider, boolean perpetual) {
this.provider = provider;
- this.type = Type.READ_CURRENT;
+ if( perpetual )
+ this.type = Type.PERPETUAL;
+ else
+ this.type = Type.READ_CURRENT;
this.keepRunning = new AtomicBoolean(true);
this.isRunning = new AtomicBoolean(true);
}
@@ -94,6 +98,19 @@ public class StreamsProviderTask extends BaseStreamsTask {
StreamsResultSet resultSet = null;
this.isRunning.set(true);
switch(this.type) {
+ case PERPETUAL: {
+ provider.startStream();
+ while(this.keepRunning.get() == true) {
+ try {
+ resultSet = provider.readCurrent();
+ flushResults(resultSet);
+ Thread.sleep(DEFAULT_SLEEP_TIME_MS);
+ } catch (InterruptedException e) {
+ this.keepRunning.set(false);
+ }
+ }
+ }
+ break;
case READ_CURRENT: resultSet = this.provider.readCurrent();
break;
case READ_NEW: resultSet = this.provider.readNew(this.sequence);
@@ -102,20 +119,7 @@ public class StreamsProviderTask extends BaseStreamsTask {
break;
default: throw new RuntimeException("Type has not been added to StreamsProviderTask.");
}
- for(StreamsDatum datum : resultSet) {
- if(!this.keepRunning.get()) {
- break;
- }
- if(datum != null)
- super.addToOutgoingQueue(datum);
- else {
- try {
- Thread.sleep(DEFAULT_SLEEP_TIME_MS);
- } catch (InterruptedException e) {
- this.keepRunning.set(false);
- }
- }
- }
+ flushResults(resultSet);
} catch( Exception e ) {
e.printStackTrace();
@@ -129,4 +133,21 @@ public class StreamsProviderTask extends BaseStreamsTask {
public boolean isRunning() {
return this.isRunning.get();
}
+
+ public void flushResults(StreamsResultSet resultSet) {
+ for(StreamsDatum datum : resultSet) {
+ if(!this.keepRunning.get()) {
+ break;
+ }
+ if(datum != null)
+ super.addToOutgoingQueue(datum);
+ else {
+ try {
+ Thread.sleep(DEFAULT_SLEEP_TIME_MS);
+ } catch (InterruptedException e) {
+ this.keepRunning.set(false);
+ }
+ }
+ }
+ }
}