You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/02/26 20:15:43 UTC
svn commit: r1572204 - in /incubator/streams/branches/STREAMS-26:
streams-contrib/ streams-contrib/streams-persist-elasticsearch/
streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/
streams-contrib/streams-pers...
Author: sblackmon
Date: Wed Feb 26 19:15:42 2014
New Revision: 1572204
URL: http://svn.apache.org/r1572204
Log:
adding uncommitted core classes, and updates to twitter, es, mongo, hdfs
Added:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java
incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/
incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/InvalidStreamException.java
incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamBuilder.java
incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java
incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/
incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java
incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsMergeTask.java
incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsPersistWriterTask.java
incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProcessorTask.java
incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProviderTask.java
incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsTask.java
Removed:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriterTask.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProfileProcessor.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTypeConverter.java
Modified:
incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml?rev=1572204&r1=1572203&r2=1572204&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/pom.xml Wed Feb 26 19:15:42 2014
@@ -40,6 +40,7 @@
<module>streams-persist-elasticsearch</module>
<module>streams-persist-hdfs</module>
<module>streams-persist-kafka</module>
+ <module>streams-persist-mongo</module>
<!--<module>streams-provider-datasift</module>-->
<!--<module>streams-provider-facebook</module>-->
<!--<module>streams-provider-gnip</module>-->
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml?rev=1572204&r1=1572203&r2=1572204&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/pom.xml Wed Feb 26 19:15:42 2014
@@ -74,6 +74,7 @@
<generateBuilders>true</generateBuilders>
<sourcePaths>
<sourcePath>src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchConfiguration.json</sourcePath>
+ <sourcePath>src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json</sourcePath>
</sourcePaths>
<outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
<targetPackage>org.apache.streams.elasticsearch.pojo</targetPackage>
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java?rev=1572204&r1=1572203&r2=1572204&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java Wed Feb 26 19:15:42 2014
@@ -116,6 +116,10 @@ public class ElasticsearchPersistWriter
this.config = ElasticsearchConfigurator.detectConfiguration(config);
}
+ public ElasticsearchPersistWriter(ElasticsearchConfiguration config) {
+ this.config = config;
+ }
+
private static final int BYTES_IN_MB = 1024*1024;
private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
private volatile int totalByteCount = 0;
Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java?rev=1572204&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java Wed Feb 26 19:15:42 2014
@@ -0,0 +1,150 @@
+package org.apache.streams.elasticsearch;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.action.percolate.PercolateResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * References:
+ * Some helpful references to help
+ * Purpose URL
+ * ------------- ----------------------------------------------------------------
+ * [Status Codes] http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
+ * [Test Cases] http://greenbytes.de/tech/tc/httpredirects/
+ * [t.co behavior] https://dev.twitter.com/docs/tco-redirection-behavior
+ */
+
+public class PercolateProcessor implements StreamsProcessor, Runnable
+{
+ private final static Logger LOGGER = LoggerFactory.getLogger(PercolateProcessor.class);
+
+ private ObjectMapper mapper = new ObjectMapper();
+
+ protected Queue<StreamsDatum> inQueue;
+ protected Queue<StreamsDatum> outQueue;
+
+ private ElasticsearchWriterConfiguration config;
+ private ElasticsearchClientManager manager;
+
+ public PercolateProcessor(Queue<StreamsDatum> inQueue) {
+ this.inQueue = inQueue;
+ this.outQueue = new LinkedBlockingQueue<StreamsDatum>();
+ }
+
+ public ElasticsearchClientManager getManager() {
+ return manager;
+ }
+
+ public void setManager(ElasticsearchClientManager manager) {
+ this.manager = manager;
+ }
+
+ public ElasticsearchWriterConfiguration getConfig() {
+ return config;
+ }
+
+ public void setConfig(ElasticsearchWriterConfiguration config) {
+ this.config = config;
+ }
+
+ public void start() {
+ Preconditions.checkNotNull(config);
+ Preconditions.checkNotNull(manager);
+ Preconditions.checkNotNull(manager.getClient());
+ }
+
+ public void stop() {
+
+ }
+
+ public Queue<StreamsDatum> getProcessorOutputQueue() {
+ return outQueue;
+ }
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+
+ List<StreamsDatum> result = Lists.newArrayList();
+
+ String json;
+ ObjectNode node;
+ // first check for valid json
+ if( entry.getDocument() instanceof String ) {
+ json = (String) entry.getDocument();
+ try {
+ node = (ObjectNode) mapper.readTree(json);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return null;
+ }
+ } else {
+ node = (ObjectNode) entry.getDocument();
+ json = node.asText();
+ }
+
+ PercolateResponse response = manager.getClient().preparePercolate(config.getIndex(), config.getType()).setSource(json).execute().actionGet();
+
+ ArrayNode tagArray = JsonNodeFactory.instance.arrayNode();
+
+ for( String match : response.getMatches())
+ tagArray.add(match);
+
+ // need utility methods for get / create specific node
+ ObjectNode extensions = (ObjectNode) node.get("extensions");
+ ObjectNode w2o = (ObjectNode) extensions.get("w2o");
+ w2o.put("tags", tagArray);
+
+ result.add(entry);
+
+ return result;
+
+ }
+
+ @Override
+ public void prepare(Object o) {
+ start();
+ }
+
+ @Override
+ public void cleanUp() {
+ stop();
+ }
+
+ @Override
+ public void run() {
+
+ while(true) {
+ StreamsDatum item;
+ try {
+ item = inQueue.poll();
+
+ Thread.sleep(new Random().nextInt(100));
+
+ for( StreamsDatum entry : process(item)) {
+ outQueue.offer(entry);
+ }
+
+
+ } catch (Exception e) {
+ e.printStackTrace();
+
+ }
+ }
+ }
+}
\ No newline at end of file
Added: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json?rev=1572204&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json (added)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json Wed Feb 26 19:15:42 2014
@@ -0,0 +1,18 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType" : "org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration",
+ "extends": {"$ref":"ElasticsearchConfiguration.json"},
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "index": {
+ "type": "string",
+ "description": "Index to write to"
+ },
+ "type": {
+ "type": "string",
+ "description": "Type to write as"
+ }
+ }
+}
\ No newline at end of file
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java?rev=1572204&r1=1572203&r2=1572204&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java Wed Feb 26 19:15:42 2014
@@ -18,6 +18,8 @@ import org.slf4j.LoggerFactory;
import org.apache.streams.hdfs.HdfsConfiguration;
+import java.io.Closeable;
+import java.io.Flushable;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.URI;
@@ -26,7 +28,7 @@ import java.security.PrivilegedException
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
-public class WebHdfsPersistWriter implements StreamsPersistWriter, Runnable
+public class WebHdfsPersistWriter implements StreamsPersistWriter, Runnable, Flushable, Closeable
{
private final static Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistWriter.class);
@@ -300,13 +302,13 @@ public class WebHdfsPersistWriter implem
}
}
- public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
- this.persistQueue = persistQueue;
- }
-
- public Queue<StreamsDatum> getPersistQueue() {
- return persistQueue;
- }
+// public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+// this.persistQueue = persistQueue;
+// }
+//
+// public Queue<StreamsDatum> getPersistQueue() {
+// return persistQueue;
+// }
@Override
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java?rev=1572204&r1=1572203&r2=1572204&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java Wed Feb 26 19:15:42 2014
@@ -20,7 +20,7 @@ public class WebHdfsPersistWriterTask im
public void run() {
while(true) {
- if( writer.getPersistQueue().peek() != null ) {
+ if( writer.persistQueue.peek() != null ) {
try {
StreamsDatum entry = writer.persistQueue.remove();
writer.write(entry);
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java?rev=1572204&r1=1572203&r2=1572204&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java Wed Feb 26 19:15:42 2014
@@ -21,6 +21,7 @@ import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
+import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
public class MongoPersistWriter implements StreamsPersistWriter, Runnable
@@ -110,14 +111,12 @@ public class MongoPersistWriter implemen
client.cleanCursors(true);
}
- @Override
public void start() {
connectToMongo();
}
- @Override
public void stop() {
try {
@@ -132,32 +131,39 @@ public class MongoPersistWriter implemen
}
}
- @Override
public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
this.persistQueue = persistQueue;
}
- @Override
public Queue<StreamsDatum> getPersistQueue() {
return persistQueue;
}
-
- @Override
public void run() {
- start();
+ while(true) {
+ if( persistQueue.peek() != null ) {
+ try {
+ StreamsDatum entry = persistQueue.remove();
+ write(entry);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ try {
+ Thread.sleep(new Random().nextInt(1));
+ } catch (InterruptedException e) {}
+ }
- Thread task = new Thread(new MongoPersistWriterTask(this));
- task.start();
+ }
- try {
- task.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- return;
- }
+ @Override
+ public void prepare(Object configurationObject) {
+ start();
+ }
+ @Override
+ public void cleanUp() {
stop();
}
}
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java?rev=1572204&r1=1572203&r2=1572204&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java Wed Feb 26 19:15:42 2014
@@ -20,6 +20,7 @@ import org.apache.streams.core.StreamsDa
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.twitter.processor.TwitterEventProcessor;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java?rev=1572204&r1=1572203&r2=1572204&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java Wed Feb 26 19:15:42 2014
@@ -2,6 +2,10 @@ package org.apache.streams.twitter.provi
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@@ -15,13 +19,14 @@ import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-import twitter4j.Twitter;
-import twitter4j.TwitterFactory;
+import twitter4j.*;
import twitter4j.conf.ConfigurationBuilder;
+import twitter4j.json.DataObjectFactory;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.Iterator;
+import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.*;
@@ -29,7 +34,9 @@ import java.util.concurrent.*;
/**
* Created by sblackmon on 12/10/13.
*/
-public class TwitterTimelineProvider implements StreamsProvider, Serializable, Runnable {
+public class TwitterTimelineProvider implements StreamsProvider, Serializable {
+
+ private final static String STREAMS_ID = "TwitterTimelineProvider";
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProvider.class);
@@ -45,11 +52,10 @@ public class TwitterTimelineProvider imp
this.config = config;
}
- protected volatile BlockingQueue<String> inQueue = new LinkedBlockingQueue<String>(10000);
-
protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
protected Twitter client;
+ protected Iterator<Long> ids;
ListenableFuture providerTaskComplete;
//
@@ -92,69 +98,118 @@ public class TwitterTimelineProvider imp
return this.providerQueue;
}
- public void run() {
-
- executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+// public void run() {
+//
+// LOGGER.info("{} Running", STREAMS_ID);
+//
+// while( ids.hasNext() ) {
+// Long currentId = ids.next();
+// LOGGER.info("Provider Task Starting: {}", currentId);
+// captureTimeline(currentId);
+// }
+//
+// LOGGER.info("{} Finished. Cleaning up...", STREAMS_ID);
+//
+// client.shutdown();
+//
+// LOGGER.info("{} Exiting", STREAMS_ID);
+//
+// while(!providerTaskComplete.isDone() && !providerTaskComplete.isCancelled() ) {
+// try {
+// Thread.sleep(100);
+// } catch (InterruptedException e) {}
+// }
+// }
- Preconditions.checkNotNull(providerQueue);
+ private void captureTimeline(long currentId) {
- Preconditions.checkNotNull(this.klass);
+ Paging paging = new Paging(1, 200);
+ List<Status> statuses = null;
+ boolean KeepGoing = true;
+ boolean hadFailure = false;
+
+ do
+ {
+ int keepTrying = 0;
+
+ // keep trying to load, give it 5 attempts.
+ //while (keepTrying < 10)
+ while (keepTrying < 1)
+ {
+
+ try
+ {
+ statuses = client.getUserTimeline(currentId, paging);
+
+ for (Status tStat : statuses)
+ {
+// if( provider.start != null &&
+// provider.start.isAfter(new DateTime(tStat.getCreatedAt())))
+// {
+// // they hit the last date we wanted to collect
+// // we can now exit early
+// KeepGoing = false;
+// }
+ // emit the record
+ String json = DataObjectFactory.getRawJSON(tStat);
+
+ providerQueue.offer(new StreamsDatum(json));
+
+ }
+
+ paging.setPage(paging.getPage() + 1);
+
+ keepTrying = 10;
+ }
+ catch(TwitterException twitterException) {
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+ }
+ catch(Exception e)
+ {
+ hadFailure = true;
+ keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+ }
+ finally
+ {
+ // Shutdown the twitter to release the resources
+ client.shutdown();
+ }
+ }
+ }
+ while ((statuses != null) && (statuses.size() > 0) && KeepGoing);
+ }
- Preconditions.checkNotNull(config.getOauth().getConsumerKey());
- Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
- Preconditions.checkNotNull(config.getOauth().getAccessToken());
- Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
+ public StreamsResultSet readCurrent() {
- Preconditions.checkNotNull(config.getFollow());
+ Preconditions.checkArgument(ids.hasNext());
- Preconditions.checkArgument(config.getEndpoint().equals("statuses/user_timeline"));
+ LOGGER.info("{} readCurrent", STREAMS_ID);
- Boolean jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true);
- Boolean includeEntitiesEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getIncludeEntities()))).or(true);
-
- Iterator<Long> ids = config.getFollow().iterator();
while( ids.hasNext() ) {
- Long id = ids.next();
-
- String baseUrl = config.getProtocol() + "://" + config.getHost() + ":" + config.getPort() + "/" + config.getVersion() + "/";
-
- ConfigurationBuilder builder = new ConfigurationBuilder()
- .setOAuthConsumerKey(config.getOauth().getConsumerKey())
- .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
- .setOAuthAccessToken(config.getOauth().getAccessToken())
- .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
- .setIncludeEntitiesEnabled(includeEntitiesEnabled)
- .setJSONStoreEnabled(jsonStoreEnabled)
- .setAsyncNumThreads(3)
- .setRestBaseURL(baseUrl);
-
- Twitter twitter = new TwitterFactory(builder.build()).getInstance();
-
- providerTaskComplete = executor.submit(new TwitterTimelineProviderTask(this, twitter, id));
+ Long currentId = ids.next();
+ LOGGER.info("Provider Task Starting: {}", currentId);
+ captureTimeline(currentId);
}
- for (int i = 0; i < 1; i++) {
- executor.submit(new TwitterEventProcessor(inQueue, providerQueue, klass));
- }
- }
+ LOGGER.info("{} Finished. Cleaning up...", STREAMS_ID);
- @Override
- public StreamsResultSet readCurrent() {
- run();
- StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
+ StreamsResultSet result = (StreamsResultSet) ImmutableList.copyOf(Iterators.consumingIterator(providerQueue.iterator()));
+ LOGGER.info("{} providing {} docs", STREAMS_ID, providerQueue.size());
+ LOGGER.info("{} Exiting", STREAMS_ID);
return result;
+
}
- @Override
public StreamsResultSet readNew(BigInteger sequence) {
+ LOGGER.debug("{} readNew", STREAMS_ID);
throw new NotImplementedException();
}
- @Override
public StreamsResultSet readRange(DateTime start, DateTime end) {
+ LOGGER.debug("{} readRange", STREAMS_ID);
this.start = start;
this.end = end;
- run();
+ readCurrent();
StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
return result;
}
@@ -181,6 +236,8 @@ public class TwitterTimelineProvider imp
@Override
public void prepare(Object o) {
+ executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+
Preconditions.checkNotNull(providerQueue);
Preconditions.checkNotNull(this.klass);
@@ -197,33 +254,29 @@ public class TwitterTimelineProvider imp
Boolean jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true);
Boolean includeEntitiesEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getIncludeEntities()))).or(true);
- Iterator<Long> ids = config.getFollow().iterator();
- while( ids.hasNext() ) {
- Long id = ids.next();
+ ids = config.getFollow().iterator();
- String baseUrl = config.getProtocol() + "://" + config.getHost() + ":" + config.getPort() + "/" + config.getVersion() + "/";
+ String baseUrl = config.getProtocol() + "://" + config.getHost() + ":" + config.getPort() + "/" + config.getVersion() + "/";
- ConfigurationBuilder builder = new ConfigurationBuilder()
- .setOAuthConsumerKey(config.getOauth().getConsumerKey())
- .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
- .setOAuthAccessToken(config.getOauth().getAccessToken())
- .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
- .setIncludeEntitiesEnabled(includeEntitiesEnabled)
- .setJSONStoreEnabled(jsonStoreEnabled)
- .setAsyncNumThreads(3)
- .setRestBaseURL(baseUrl);
+ ConfigurationBuilder builder = new ConfigurationBuilder()
+ .setOAuthConsumerKey(config.getOauth().getConsumerKey())
+ .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
+ .setOAuthAccessToken(config.getOauth().getAccessToken())
+ .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
+ .setIncludeEntitiesEnabled(includeEntitiesEnabled)
+ .setJSONStoreEnabled(jsonStoreEnabled)
+ .setAsyncNumThreads(3)
+ .setRestBaseURL(baseUrl);
- Twitter twitter = new TwitterFactory(builder.build()).getInstance();
- providerTaskComplete = executor.submit(new TwitterTimelineProviderTask(this, twitter, id));
- }
+ client = new TwitterFactory(builder.build()).getInstance();
- for (int i = 0; i < 1; i++) {
- executor.submit(new TwitterEventProcessor(inQueue, providerQueue, klass));
- }
}
@Override
public void cleanUp() {
+
+ client.shutdown();
+
shutdownAndAwaitTermination(executor);
}
}
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java?rev=1572204&r1=1572203&r2=1572204&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java Wed Feb 26 19:15:42 2014
@@ -60,7 +60,7 @@ public class TwitterTimelineProviderTask
// emit the record
String json = DataObjectFactory.getRawJSON(tStat);
- provider.inQueue.offer(json);
+ //provider.offer(json);
}
Added: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java?rev=1572204&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java Wed Feb 26 19:15:42 2014
@@ -0,0 +1,23 @@
+package org.apache.streams.core;
+
+import java.io.Serializable;
+
+/**
+ *
+ */
+public interface StreamsOperation extends Serializable {
+
+ /**
+ * This method will be called after initialization/serialization. Initialize any non-serializable objects here.
+ * @param configurationObject Any object to help intialize the operation. ie. Map, JobContext, Properties, etc. The type
+ * will be based on where the operation is being run (ie. hadoop, storm, locally, etc.)
+ */
+ public void prepare(Object configurationObject);
+
+ /**
+ * No guarantee that this method will ever be called. But upon shutdown of the stream, an attempt to call this method
+ * will be made.
+ * Use this method to terminate connections, etc.
+ */
+ public void cleanUp();
+}
Added: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/InvalidStreamException.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/InvalidStreamException.java?rev=1572204&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/InvalidStreamException.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/InvalidStreamException.java Wed Feb 26 19:15:42 2014
@@ -0,0 +1,23 @@
+package org.apache.streams.core.builders;
+
+/**
+ * Exception that indicates a malformed data stream in some way.
+ */
+public class InvalidStreamException extends RuntimeException {
+
+ public InvalidStreamException() {
+ super();
+ }
+
+ public InvalidStreamException(String s) {
+ super(s);
+ }
+
+ public InvalidStreamException(String s, Throwable throwable) {
+ super(s, throwable);
+ }
+
+ public InvalidStreamException(Throwable throwable) {
+ super(throwable);
+ }
+}
Added: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java?rev=1572204&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/LocalStreamBuilder.java Wed Feb 26 19:15:42 2014
@@ -0,0 +1,256 @@
+package org.apache.streams.core.builders;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.tasks.StreamsProviderTask;
+import org.apache.streams.core.tasks.StreamsTask;
+import org.apache.streams.util.SerializationUtil;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link org.apache.streams.core.builders.StreamBuilder} implementation to run a data processing stream in a single
+ * JVM across many threads. Depending on your data stream, the JVM heap may need to be set to a high value. Default
+ * implementation uses unbound {@link java.util.concurrent.ConcurrentLinkedQueue} to connect stream components.
+ */
+public class LocalStreamBuilder implements StreamBuilder{
+
+ private Map<String, StreamComponent> providers;
+ private Map<String, StreamComponent> components;
+ private Queue<StreamsDatum> queue;
+ private Map<String, Object> streamConfig;
+ private ExecutorService executor;
+ private int totalTasks;
+
+ /**
+ *
+ */
+ public LocalStreamBuilder(){
+ this(new ConcurrentLinkedQueue<StreamsDatum>(), null);
+ }
+
+ /**
+ *
+ * @param streamConfig
+ */
+ public LocalStreamBuilder(Map<String, Object> streamConfig) {
+ this(new ConcurrentLinkedQueue<StreamsDatum>(), streamConfig);
+ }
+
+ /**
+ *
+ * @param queueType
+ */
+ public LocalStreamBuilder(Queue<StreamsDatum> queueType) {
+ this(queueType, null);
+ }
+
+ /**
+ *
+ * @param queueType
+ * @param streamConfig
+ */
+ public LocalStreamBuilder(Queue<StreamsDatum> queueType, Map<String, Object> streamConfig) {
+ this.queue = queueType;
+ this.providers = new HashMap<String, StreamComponent>();
+ this.components = new HashMap<String, StreamComponent>();
+ this.streamConfig = streamConfig;
+ this.totalTasks = 0;
+ }
+
+ @Override
+ public StreamBuilder newReadCurrentStream(String id, StreamsProvider provider) {
+ validateId(id);
+ this.providers.put(id, new StreamComponent(id, provider));
+ ++this.totalTasks;
+ return this;
+ }
+
+ @Override
+ public StreamBuilder newReadNewStream(String id, StreamsProvider provider, BigInteger sequence) {
+ validateId(id);
+ this.providers.put(id, new StreamComponent(id, provider, sequence));
+ ++this.totalTasks;
+ return this;
+ }
+
+ @Override
+ public StreamBuilder newReadRangeStream(String id, StreamsProvider provider, DateTime start, DateTime end) {
+ validateId(id);
+ this.providers.put(id, new StreamComponent(id, provider, start, end));
+ ++this.totalTasks;
+ return this;
+ }
+
+ @Override
+ public StreamBuilder addStreamsProcessor(String id, StreamsProcessor processor, int numTasks, String... inBoundIds) {
+ validateId(id);
+ StreamComponent comp = new StreamComponent(id, processor, cloneQueue(), numTasks);
+ this.components.put(id, comp);
+ connectToOtherComponents(inBoundIds, comp);
+ this.totalTasks += numTasks;
+ return this;
+ }
+
+ @Override
+ public StreamBuilder addStreamsPersistWriter(String id, StreamsPersistWriter writer, int numTasks, String... inBoundIds) {
+ validateId(id);
+ StreamComponent comp = new StreamComponent(id, writer, cloneQueue(), numTasks);
+ this.components.put(id, comp);
+ connectToOtherComponents(inBoundIds, comp);
+ this.totalTasks += numTasks;
+ return this;
+ }
+
+ /**
+ * Runs the data stream in the this JVM and blocks till completion.
+ */
+ @Override
+ public void start() {
+ boolean isRunning = true;
+ this.executor = Executors.newFixedThreadPool(this.totalTasks);
+ Map<String, StreamsProviderTask> provTasks = new HashMap<String, StreamsProviderTask>();
+ Map<String, List<StreamsTask>> streamsTasks = new HashMap<String, List<StreamsTask>>();
+ try {
+ for(StreamComponent comp : this.components.values()) {
+ int tasks = comp.getNumTasks();
+ List<StreamsTask> compTasks = new LinkedList<StreamsTask>();
+ for(int i=0; i < tasks; ++i) {
+ StreamsTask task = comp.createConnectedTask();
+ task.setStreamConfig(this.streamConfig);
+ this.executor.submit(task);
+ compTasks.add(task);
+ }
+ streamsTasks.put(comp.getId(), compTasks);
+ }
+ for(StreamComponent prov : this.providers.values()) {
+ StreamsTask task = prov.createConnectedTask();
+ task.setStreamConfig(this.streamConfig);
+ this.executor.submit(task);
+ provTasks.put(prov.getId(), (StreamsProviderTask) task);
+ }
+
+ while(isRunning) {
+ isRunning = false;
+ for(StreamsProviderTask task : provTasks.values()) {
+ isRunning = isRunning || task.isRunning();
+ }
+ if(isRunning) {
+ Thread.sleep(10000);
+ }
+ }
+ this.executor.shutdown();
+ //complete stream shut down gracfully
+ for(StreamComponent prov : this.providers.values()) {
+ shutDownTask(prov, streamsTasks);
+ }
+ //need to make this configurable
+ if(!this.executor.awaitTermination(10, TimeUnit.SECONDS)) { // all threads should have terminated already.
+ this.executor.shutdownNow();
+ this.executor.awaitTermination(10, TimeUnit.SECONDS);
+ }
+ } catch (InterruptedException e){
+ //give the stream 30secs to try to shutdown gracefully, then force shutdown otherwise
+ for(List<StreamsTask> tasks : streamsTasks.values()) {
+ for(StreamsTask task : tasks) {
+ task.stopTask();
+ }
+ }
+ this.executor.shutdown();
+ try {
+ if(!this.executor.awaitTermination(30, TimeUnit.SECONDS)){
+ this.executor.shutdownNow();
+ }
+ }catch (InterruptedException ie) {
+ this.executor.shutdownNow();
+ throw new RuntimeException(ie);
+ }
+ }
+
+ }
+
+ /**
+ * Shutsdown the running tasks in sudo depth first search kind of way. Checks that the upstream components have
+ * finished running before shutting down. Waits till inbound queue is empty to shutdown.
+ * @param comp StreamComponent to shut down.
+ * @param streamTasks the list of non-StreamsProvider tasks for this stream.
+ * @throws InterruptedException
+ */
+ private void shutDownTask(StreamComponent comp, Map<String, List<StreamsTask>> streamTasks) throws InterruptedException {
+ List<StreamsTask> tasks = streamTasks.get(comp.getId());
+ if(tasks != null) { //not a StreamProvider
+ boolean parentsShutDown = true;
+ for(StreamComponent parent : comp.getUpStreamComponents()) {
+ List<StreamsTask> parentTasks = streamTasks.get(parent.getId());
+ //if parentTask == null, its a provider and is not running anymore
+ if(parentTasks != null) {
+ for(StreamsTask task : parentTasks) {
+ parentsShutDown = parentsShutDown && !task.isRunning();
+ }
+ }
+ }
+ if(parentsShutDown) {
+ for(StreamsTask task : tasks) {
+ task.stopTask();
+ }
+ for(StreamsTask task : tasks) {
+ while(task.isRunning()) {
+ Thread.sleep(500);
+ }
+ }
+ }
+ }
+ Collection<StreamComponent> children = comp.getDownStreamComponents();
+ if(children != null) {
+ for(StreamComponent child : comp.getDownStreamComponents()) {
+ shutDownTask(child, streamTasks);
+ }
+ }
+ }
+
+ /**
+ * NOT IMPLEMENTED.
+ */
+ @Override
+ public void stop() {
+
+ }
+
+ private void connectToOtherComponents(String[] conntectToIds, StreamComponent toBeConnected) {
+ for(String id : conntectToIds) {
+ StreamComponent upStream = null;
+ if(this.providers.containsKey(id)) {
+ upStream = this.providers.get(id);
+ }
+ else if(this.components.containsKey(id)) {
+ upStream = this.components.get(id);
+ }
+ else {
+ throw new InvalidStreamException("Cannot connect to id, "+id+", because id does not exist.");
+ }
+ upStream.addOutBoundQueue(toBeConnected, toBeConnected.getInBoundQueue());
+ toBeConnected.addInboundQueue(upStream);
+ }
+ }
+
+ private void validateId(String id) {
+ if(this.providers.containsKey(id) || this.components.containsKey(id)) {
+ throw new InvalidStreamException("Duplicate id. "+id+" is already assigned to another component");
+ }
+ }
+
+
+ private Queue<StreamsDatum> cloneQueue() {
+ return (Queue<StreamsDatum>)SerializationUtil.cloneBySerialization(this.queue);
+ }
+
+
+}
Added: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamBuilder.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamBuilder.java?rev=1572204&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamBuilder.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamBuilder.java Wed Feb 26 19:15:42 2014
@@ -0,0 +1,97 @@
+package org.apache.streams.core.builders;
+
+import org.apache.streams.core.*;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.Queue;
+
+/**
+ * Interface for building data streams.
+ *
+ * <pre>
+ * StreamBuilder builder = ...
+ * builder.newReadCurrentStream(. . .)
+ * .addStreamsProcessor(. . .)
+ * ...
+ * .addStreamsPersistWriter(. . .)
+ * builder.run();
+ * </pre>
+ *
+ */
+public interface StreamBuilder {
+
+
+ /**
+ * Add a {@link org.apache.streams.core.StreamsProcessor} to the data processing stream.
+ * @param processorId unique id for this processor - must be unique across the entire stream
+ * @param processor the processor to execute
+ * @param numTasks the number of instances of this processor to run concurrently
+ * @param connectToIds the ids of the {@link org.apache.streams.core.StreamsOperation} that this process will
+ * receive data from.
+ * @return this
+ */
+ public StreamBuilder addStreamsProcessor(String processorId, StreamsProcessor processor, int numTasks, String... connectToIds);
+
+ /**
+ * Add a {@link org.apache.streams.core.StreamsPersistWriter} to the data processing stream.
+ * @param persistWriterId unique id for this processor - must be unique across the entire stream
+ * @param writer the writer to execute
+ * @param numTasks the number of instances of this writer to run concurrently
+ * @param connectToIds the ids of the {@link org.apache.streams.core.StreamsOperation} that this process will
+ * receive data from.
+ * @return this
+ */
+ public StreamBuilder addStreamsPersistWriter(String persistWriterId, StreamsPersistWriter writer, int numTasks, String... connectToIds);
+
+ /**
+ * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream. The provider will execute
+ * {@link org.apache.streams.core.StreamsProvider:readCurrent()} to produce data.
+ * @param streamId unique if for this provider - must be unique across the entire stream.
+ * @param provider provider to execute
+ * @return this
+ */
+ public StreamBuilder newReadCurrentStream(String streamId, StreamsProvider provider);
+
+ /**
+ * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream. The provider will execute
+ * {@link org.apache.streams.core.StreamsProvider:readNext(BigInteger)} to produce data.
+ * @param streamId unique if for this provider - must be unique across the entire stream.
+ * @param provider provider to execute
+ * @param sequence sequence to pass to {@link org.apache.streams.core.StreamsProvider:readNext(BigInteger)} method
+ * @return this
+ */
+ public StreamBuilder newReadNewStream(String streamId, StreamsProvider provider, BigInteger sequence);
+
+ /**
+ * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream. The provider will execute
+ * {@link org.apache.streams.core.StreamsProvider:readRange(DateTime, DateTime)} to produce data. Whether the start
+ * and end dates are inclusive or exclusive is up to the implementation.
+ * @param streamId unique if for this provider - must be unique across the entire stream.
+ * @param provider provider to execute
+ * @param start start date
+ * @param end end date
+ * @return this
+ */
+ public StreamBuilder newReadRangeStream(String streamId, StreamsProvider provider, DateTime start, DateTime end);
+
+ /**
+ * Builds the stream, and starts it or submits it based on implementation.
+ */
+ public void start();
+
+ /**
+ * Stops the streams processing. No guarantee on a smooth shutdown. Optional method, may not be implemented in
+ * all cases.
+ */
+ public void stop();
+
+
+
+
+
+
+
+
+
+}
Added: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java?rev=1572204&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/builders/StreamComponent.java Wed Feb 26 19:15:42 2014
@@ -0,0 +1,217 @@
+package org.apache.streams.core.builders;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.tasks.StreamsPersistWriterTask;
+import org.apache.streams.core.tasks.StreamsProcessorTask;
+import org.apache.streams.core.tasks.StreamsProviderTask;
+import org.apache.streams.core.tasks.StreamsTask;
+import org.apache.streams.util.SerializationUtil;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.*;
+
+/**
+ * Stores the implementations of {@link org.apache.streams.core.StreamsOperation}, the StreamsOperations it is connected
+ * to and the necessary metadata to construct a data stream.
+ */
+public class StreamComponent {
+
+ private static final int START = 1;
+ private static final int END = 2;
+
+ private String id;
+ private Set<StreamComponent> inBound;
+ private Map<StreamComponent, Queue<StreamsDatum>> outBound;
+ private Queue<StreamsDatum> inQueue;
+ private StreamsProvider provider;
+ private StreamsProcessor processor;
+ private StreamsPersistWriter writer;
+ private DateTime[] dateRange;
+ private BigInteger sequence;
+ private int numTasks = 1;
+
+ /**
+ *
+ * @param id
+ * @param provider
+ */
+ public StreamComponent(String id, StreamsProvider provider) {
+ this.id = id;
+ this.provider = provider;
+ initializePrivateVariables();
+ }
+
+ /**
+ *
+ * @param id
+ * @param provider
+ * @param start
+ * @param end
+ */
+ public StreamComponent(String id, StreamsProvider provider, DateTime start, DateTime end) {
+ this.id = id;
+ this.provider = provider;
+ this.dateRange = new DateTime[2];
+ this.dateRange[START] = start;
+ this.dateRange[END] = end;
+ initializePrivateVariables();
+ }
+
+
+ /**
+ *
+ * @param id
+ * @param provider
+ * @param sequence
+ */
+ public StreamComponent(String id, StreamsProvider provider, BigInteger sequence) {
+ this.id = id;
+ this.provider = provider;
+ this.sequence = sequence;
+ }
+
+ /**
+ *
+ * @param id
+ * @param processor
+ * @param inQueue
+ * @param numTasks
+ */
+ public StreamComponent(String id, StreamsProcessor processor, Queue<StreamsDatum> inQueue, int numTasks) {
+ this.id = id;
+ this.processor = processor;
+ this.inQueue = inQueue;
+ this.numTasks = numTasks;
+ initializePrivateVariables();
+ }
+
+ /**
+ *
+ * @param id
+ * @param writer
+ * @param inQueue
+ * @param numTasks
+ */
+ public StreamComponent(String id, StreamsPersistWriter writer, Queue<StreamsDatum> inQueue, int numTasks) {
+ this.id = id;
+ this.writer = writer;
+ this.inQueue = inQueue;
+ this.numTasks = numTasks;
+ initializePrivateVariables();
+ }
+
+ private void initializePrivateVariables() {
+ this.inBound = new HashSet<StreamComponent>();
+ this.outBound = new HashMap<StreamComponent, Queue<StreamsDatum>>();
+ }
+
+ /**
+ * Add an outbound queue for this component. The queue should be an inbound queue of a downstream component.
+ * @param component the component that this supplying their inbound queue
+ * @param queue the queue to to put post processed/provided datums on
+ */
+ public void addOutBoundQueue(StreamComponent component, Queue<StreamsDatum> queue) {
+ this.outBound.put(component, queue);
+ }
+
+ /**
+ * Add a component that supplies data through the inbound queue.
+ * @param component that supplies data through the inbound queue
+ */
+ public void addInboundQueue(StreamComponent component) {
+ this.inBound.add(component);
+ }
+
+ /**
+ * The components that are immediately downstream of this component (aka child nodes)
+ * @return Collection of child nodes of this component
+ */
+ public Collection<StreamComponent> getDownStreamComponents() {
+ return this.outBound.keySet();
+ }
+
+ /**
+ * The components that are immediately upstream of this component (aka parent nodes)
+ * @return Collection of parent nodes of this component
+ */
+ public Collection<StreamComponent> getUpStreamComponents() {
+ return this.inBound;
+ }
+
+ /**
+ * The inbound queue for this component
+ * @return inbound queue
+ */
+ public Queue<StreamsDatum> getInBoundQueue() {
+ return this.inQueue;
+ }
+
+ /**
+ * The number of tasks this to run this component
+ * @return
+ */
+ public int getNumTasks() {
+ return this.numTasks;
+ }
+
+ /**
+ * Creates a {@link org.apache.streams.core.tasks.StreamsTask} that is running a clone of this component whose
+ * inbound and outbound queues are appropriately connected to the parent and child nodes.
+ * @return StreamsTask for this component
+ */
+ public StreamsTask createConnectedTask() {
+ StreamsTask task;
+ if(this.processor != null) {
+ task = new StreamsProcessorTask((StreamsProcessor)SerializationUtil.cloneBySerialization(this.processor));
+ task.addInputQueue(this.inQueue);
+ for(Queue<StreamsDatum> q : this.outBound.values()) {
+ task.addOutputQueue(q);
+ }
+ }
+ else if(this.writer != null) {
+ task = new StreamsPersistWriterTask((StreamsPersistWriter) SerializationUtil.cloneBySerialization(this.writer));
+ task.addInputQueue(this.inQueue);
+ }
+ else if(this.provider != null) {
+ StreamsProvider prov = (StreamsProvider)SerializationUtil.cloneBySerialization(this.provider);
+ if(this.dateRange == null && this.sequence == null)
+ task = new StreamsProviderTask(prov);
+ else if(this.sequence != null)
+ task = new StreamsProviderTask(prov, this.sequence);
+ else
+ task = new StreamsProviderTask(prov, this.dateRange[0], this.dateRange[1]);
+ for(Queue<StreamsDatum> q : this.outBound.values()) {
+ task.addOutputQueue(q);
+ }
+ }
+ else {
+ throw new InvalidStreamException("Underlying StreamComponoent was NULL.");
+ }
+ return task;
+ }
+
+ /**
+ * The unique of this component
+ * @return
+ */
+ public String getId() {
+ return this.id;
+ }
+
+ @Override
+ public int hashCode() {
+ return this.id.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if(o instanceof StreamComponent)
+ return this.id.equals(((StreamComponent) o).id);
+ else
+ return false;
+ }
+}
Added: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java?rev=1572204&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/BaseStreamsTask.java Wed Feb 26 19:15:42 2014
@@ -0,0 +1,89 @@
+package org.apache.streams.core.tasks;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.util.SerializationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ *
+ */
+public abstract class BaseStreamsTask implements StreamsTask {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(BaseStreamsTask.class);
+
+ private List<Queue<StreamsDatum>> inQueues = new ArrayList<Queue<StreamsDatum>>();
+ private List<Queue<StreamsDatum>> outQueues = new LinkedList<Queue<StreamsDatum>>();
+ private int inIndex = 0;
+
+ @Override
+ public void addInputQueue(Queue<StreamsDatum> inputQueue) {
+ this.inQueues.add(inputQueue);
+ }
+
+ @Override
+ public void addOutputQueue(Queue<StreamsDatum> outputQueue) {
+ this.outQueues.add(outputQueue);
+ }
+
+ @Override
+ public List<Queue<StreamsDatum>> getInputQueues() {
+ return this.inQueues;
+ }
+
+ @Override
+ public List<Queue<StreamsDatum>> getOutputQueues() {
+ return this.outQueues;
+ }
+
+ /**
+ * NOTE NECCESSARY AT THE MOMENT. MAY BECOME NECESSARY AS WE LOOK AT MAKING JOIN TASKS. CURRENTLY ALL TASK HAVE MAX
+ * OF 1 INPUT QUEUE.
+ * Round Robins through input queues to get the next StreamsDatum. If all input queues are empty, it will return null.
+ * @return the next StreamsDatum or null if all input queues are empty.
+ */
+ protected StreamsDatum getNextDatum() {
+ int startIndex = this.inIndex;
+ int index = startIndex;
+ StreamsDatum datum = null;
+ do {
+ datum = this.inQueues.get(index).poll();
+ index = getNextInputQueueIndex();
+ } while( datum == null && startIndex != index);
+ return datum;
+ }
+
+ /**
+ * Adds a StreamDatum to the outgoing queues. If there are multiple queues, it uses serialization to create
+ * clones of the datum and adds a new clone to each queue.
+ * @param datum
+ */
+ protected void addToOutgoingQueue(StreamsDatum datum) {
+ if(this.outQueues.size() == 1) {
+ this.outQueues.get(0).offer(datum);
+ }
+ else {
+ for(Queue<StreamsDatum> queue : this.outQueues) {
+ try {
+ queue.offer((StreamsDatum) SerializationUtil.deserialize(SerializationUtil.serialize(datum)));
+ } catch (RuntimeException e) {
+ LOGGER.debug("Failed to add StreamsDatum to outgoing queue : {}", datum);
+ LOGGER.error("Exception while offering StreamsDatum to outgoing queue: {}", e);
+ }
+ }
+ }
+ }
+
+ private int getNextInputQueueIndex() {
+ ++this.inIndex;
+ if(this.inIndex >= this.inQueues.size()) {
+ this.inIndex = 0;
+ }
+ return this.inIndex;
+ }
+}
Added: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsMergeTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsMergeTask.java?rev=1572204&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsMergeTask.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsMergeTask.java Wed Feb 26 19:15:42 2014
@@ -0,0 +1,58 @@
+package org.apache.streams.core.tasks;
+
+import org.apache.streams.core.StreamsDatum;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * NOT USED. When joins/partions are implemented, a similar pattern could be followed. Done only as basic proof
+ * of concept.
+ */
+public class StreamsMergeTask extends BaseStreamsTask {
+
+ private AtomicBoolean keepRunning;
+ private long sleepTime;
+
+ public StreamsMergeTask() {
+ this(DEFAULT_SLEEP_TIME_MS);
+ }
+
+ public StreamsMergeTask(long sleepTime) {
+ this.sleepTime = sleepTime;
+ this.keepRunning = new AtomicBoolean(true);
+ }
+
+
+ @Override
+ public void stopTask() {
+ this.keepRunning.set(false);
+ }
+
+ @Override
+ public void setStreamConfig(Map<String, Object> config) {
+
+ }
+
+ @Override
+ public boolean isRunning() {
+ return false;
+ }
+
+ @Override
+ public void run() {
+ while(this.keepRunning.get()) {
+ StreamsDatum datum = super.getNextDatum();
+ if(datum != null) {
+ super.addToOutgoingQueue(datum);
+ }
+ else {
+ try {
+ Thread.sleep(this.sleepTime);
+ } catch (InterruptedException e) {
+ this.keepRunning.set(false);
+ }
+ }
+ }
+ }
+}
Added: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsPersistWriterTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsPersistWriterTask.java?rev=1572204&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsPersistWriterTask.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsPersistWriterTask.java Wed Feb 26 19:15:42 2014
@@ -0,0 +1,103 @@
+package org.apache.streams.core.tasks;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ */
+public class StreamsPersistWriterTask extends BaseStreamsTask {
+
+
+
+ private StreamsPersistWriter writer;
+ private long sleepTime;
+ private AtomicBoolean keepRunning;
+ private Map<String, Object> streamConfig;
+ private Queue<StreamsDatum> inQueue;
+ private AtomicBoolean isRunning;
+
+ /**
+ * Default constructor. Uses default sleep of 500ms when inbound queue is empty.
+ * @param writer writer to execute in task
+ */
+ public StreamsPersistWriterTask(StreamsPersistWriter writer) {
+ this(writer, DEFAULT_SLEEP_TIME_MS);
+ }
+
+ /**
+ *
+ * @param writer writer to execute in task
+ * @param sleepTime time to sleep when inbound queue is empty.
+ */
+ public StreamsPersistWriterTask(StreamsPersistWriter writer, long sleepTime) {
+ this.writer = writer;
+ this.sleepTime = sleepTime;
+ this.keepRunning = new AtomicBoolean(true);
+ this.isRunning = new AtomicBoolean(true);
+ }
+
+ @Override
+ public void setStreamConfig(Map<String, Object> config) {
+ this.streamConfig = config;
+ }
+
+ @Override
+ public void addInputQueue(Queue<StreamsDatum> inputQueue) {
+ this.inQueue = inputQueue;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return this.isRunning.get();
+ }
+
+ @Override
+ public void run() {
+ try {
+ this.writer.prepare(this.streamConfig);
+ StreamsDatum datum = this.inQueue.poll();
+ while(datum != null || this.keepRunning.get()) {
+ if(datum != null) {
+ this.writer.write(datum);
+ }
+ else {
+ try {
+ Thread.sleep(this.sleepTime);
+ } catch (InterruptedException e) {
+ this.keepRunning.set(false);
+ }
+ }
+ datum = this.inQueue.poll();
+ }
+
+ } finally {
+ this.writer.cleanUp();
+ this.isRunning.set(false);
+ }
+ }
+
+ @Override
+ public void stopTask() {
+ this.keepRunning.set(false);
+ }
+
+
+ @Override
+ public void addOutputQueue(Queue<StreamsDatum> outputQueue) {
+ throw new UnsupportedOperationException(this.getClass().getName()+" does not support method - setOutputQueue()");
+ }
+
+ @Override
+ public List<Queue<StreamsDatum>> getInputQueues() {
+ List<Queue<StreamsDatum>> queues = new LinkedList<Queue<StreamsDatum>>();
+ queues.add(this.inQueue);
+ return queues;
+ }
+}
Added: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProcessorTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProcessorTask.java?rev=1572204&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProcessorTask.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProcessorTask.java Wed Feb 26 19:15:42 2014
@@ -0,0 +1,102 @@
+package org.apache.streams.core.tasks;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.StreamsProcessor;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ */
+public class StreamsProcessorTask extends BaseStreamsTask {
+
+
+ private StreamsProcessor processor;
+ private long sleepTime;
+ private AtomicBoolean keepRunning;
+ private Map<String, Object> streamConfig;
+ private Queue<StreamsDatum> inQueue;
+ private AtomicBoolean isRunning;
+
+ /**
+ * Default constructor, uses default sleep time of 500ms when inbound queue is empty
+ * @param processor process to run in task
+ */
+ public StreamsProcessorTask(StreamsProcessor processor) {
+ this(processor, DEFAULT_SLEEP_TIME_MS);
+ }
+
+ /**
+ *
+ * @param processor processor to run in task
+ * @param sleepTime time to sleep when incoming queue is empty
+ */
+ public StreamsProcessorTask(StreamsProcessor processor, long sleepTime) {
+ this.processor = processor;
+ this.sleepTime = sleepTime;
+ this.keepRunning = new AtomicBoolean(true);
+ this.isRunning = new AtomicBoolean(true);
+ }
+
+ @Override
+ public void stopTask() {
+ this.keepRunning.set(false);
+ }
+
+ @Override
+ public void setStreamConfig(Map<String, Object> config) {
+ this.streamConfig = config;
+ }
+
+ @Override
+ public void addInputQueue(Queue<StreamsDatum> inputQueue) {
+ this.inQueue = inputQueue;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return this.isRunning.get();
+ }
+
+ @Override
+ public void run() {
+ try {
+ this.processor.prepare(this.streamConfig);
+ StreamsDatum datum = this.inQueue.poll();
+ while(datum != null || this.keepRunning.get()) {
+ if(datum != null) {
+ List<StreamsDatum> output = this.processor.process(datum);
+ if(output != null) {
+ for(StreamsDatum outDatum : output) {
+ super.addToOutgoingQueue(outDatum);
+ }
+ }
+ }
+ else {
+ try {
+ Thread.sleep(this.sleepTime);
+ } catch (InterruptedException e) {
+ this.keepRunning.set(false);
+ }
+ }
+ datum = this.inQueue.poll();
+ }
+
+ } finally {
+ this.processor.cleanUp();
+ this.isRunning.set(false);
+ }
+ }
+
+ @Override
+ public List<Queue<StreamsDatum>> getInputQueues() {
+ List<Queue<StreamsDatum>> queues = new LinkedList<Queue<StreamsDatum>>();
+ queues.add(this.inQueue);
+ return queues;
+ }
+}
Added: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProviderTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProviderTask.java?rev=1572204&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProviderTask.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProviderTask.java Wed Feb 26 19:15:42 2014
@@ -0,0 +1,132 @@
+package org.apache.streams.core.tasks;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ */
+public class StreamsProviderTask extends BaseStreamsTask {
+
+ private static enum Type {
+ READ_CURRENT,
+ READ_NEW,
+ READ_RANGE
+ }
+
+ private static final int START = 0;
+ private static final int END = 1;
+
+ private StreamsProvider provider;
+ private AtomicBoolean keepRunning;
+ private Type type;
+ private BigInteger sequence;
+ private DateTime[] dateRange;
+ private Map<String, Object> config;
+ private AtomicBoolean isRunning;
+
+ /**
+ * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readCurrent()}
+ * @param provider
+ */
+ public StreamsProviderTask(StreamsProvider provider) {
+ this.provider = provider;
+ this.type = Type.READ_CURRENT;
+ this.keepRunning = new AtomicBoolean(true);
+ this.isRunning = new AtomicBoolean(true);
+ }
+
+ /**
+ * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readNew(BigInteger)}
+ * @param provider
+ * @param sequence
+ */
+ public StreamsProviderTask(StreamsProvider provider, BigInteger sequence) {
+ this.provider = provider;
+ this.type = Type.READ_NEW;
+ this.sequence = sequence;
+ this.keepRunning = new AtomicBoolean(true);
+ this.isRunning = new AtomicBoolean(true);
+ }
+
+ /**
+ * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readRange(DateTime,DateTime)}
+ * @param provider
+ * @param start
+ * @param end
+ */
+ public StreamsProviderTask(StreamsProvider provider, DateTime start, DateTime end) {
+ this.provider = provider;
+ this.type = Type.READ_RANGE;
+ this.dateRange = new DateTime[2];
+ this.dateRange[START] = start;
+ this.dateRange[END] = end;
+ this.keepRunning = new AtomicBoolean(true);
+ this.isRunning = new AtomicBoolean(true);
+ }
+
+ @Override
+ public void stopTask() {
+ this.keepRunning.set(false);
+ }
+
+ @Override
+ public void addInputQueue(Queue<StreamsDatum> inputQueue) {
+ throw new UnsupportedOperationException(this.getClass().getName()+" does not support method - setInputQueue()");
+ }
+
+ @Override
+ public void setStreamConfig(Map<String, Object> config) {
+ this.config = config;
+ }
+
+ @Override
+ public void run() {
+ try {
+ this.provider.prepare(this.config); //TODO allow for configuration objects
+ StreamsResultSet resultSet = null;
+ this.isRunning.set(true);
+ switch(this.type) {
+ case READ_CURRENT: resultSet = this.provider.readCurrent();
+ break;
+ case READ_NEW: resultSet = this.provider.readNew(this.sequence);
+ break;
+ case READ_RANGE: resultSet = this.provider.readRange(this.dateRange[START], this.dateRange[END]);
+ break;
+ default: throw new RuntimeException("Type has not been added to StreamsProviderTask.");
+ }
+ for(StreamsDatum datum : resultSet) {
+ if(!this.keepRunning.get()) {
+ break;
+ }
+ if(datum != null)
+ super.addToOutgoingQueue(datum);
+ else {
+ try {
+ Thread.sleep(DEFAULT_SLEEP_TIME_MS);
+ } catch (InterruptedException e) {
+ this.keepRunning.set(false);
+ }
+ }
+ }
+
+ } catch( Exception e ) {
+ e.printStackTrace();
+ } finally
+ {
+ this.provider.cleanUp();
+ this.isRunning.set(false);
+ }
+ }
+
+ public boolean isRunning() {
+ return this.isRunning.get();
+ }
+}
Added: incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsTask.java?rev=1572204&view=auto
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsTask.java (added)
+++ incubator/streams/branches/STREAMS-26/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsTask.java Wed Feb 26 19:15:42 2014
@@ -0,0 +1,58 @@
+package org.apache.streams.core.tasks;
+
+import org.apache.streams.core.StreamsDatum;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+/**
+ * Interface for all task that will be used to execute instances of {@link org.apache.streams.core.StreamsOperation}
+ * in local mode.
+ */
+public interface StreamsTask extends Runnable{
+
+ public static final long DEFAULT_SLEEP_TIME_MS = 500;
+
+ /**
+ * Informs the task to stop. Tasks may or may not try to empty its inbound queue before halting.
+ */
+ public void stopTask();
+
+ /**
+ * Add an input {@link java.util.Queue} for this task.
+ * @param inputQueue
+ */
+ public void addInputQueue(Queue<StreamsDatum> inputQueue);
+
+ /**
+ * Add an output {@link java.util.Queue} for this task.
+ * @param outputQueue
+ */
+ public void addOutputQueue(Queue<StreamsDatum> outputQueue);
+
+ /**
+ * Set the configuration object that will shared and passed to all instances of StreamsTask.
+ * @param config optional configuration information
+ */
+ public void setStreamConfig(Map<String, Object> config);
+
+ /**
+ * Returns true when the task has not completed. Returns false otherwise
+ * @return true when the task has not completed. Returns false otherwise
+ */
+ public boolean isRunning();
+
+ /**
+ * Returns the input queues that have been set for this task.
+ * @return list of input queues
+ */
+ public List<Queue<StreamsDatum>> getInputQueues();
+
+ /**
+ * Returns the output queues that have been set for this task
+ * @return list of output queues
+ */
+ public List<Queue<StreamsDatum>> getOutputQueues();
+
+}