You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/22 00:20:18 UTC
[40/71] [abbrv] adding uncommitted core classes,
and updates to twitter, es, mongo, hdfs
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProcessorTask.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProcessorTask.java b/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProcessorTask.java
new file mode 100644
index 0000000..30466a3
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProcessorTask.java
@@ -0,0 +1,102 @@
+package org.apache.streams.core.tasks;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.StreamsProcessor;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ */
+public class StreamsProcessorTask extends BaseStreamsTask {
+
+
+ private StreamsProcessor processor;
+ private long sleepTime;
+ private AtomicBoolean keepRunning;
+ private Map<String, Object> streamConfig;
+ private Queue<StreamsDatum> inQueue;
+ private AtomicBoolean isRunning;
+
+ /**
+ * Default constructor, uses default sleep time of 500ms when inbound queue is empty
+ * @param processor process to run in task
+ */
+ public StreamsProcessorTask(StreamsProcessor processor) {
+ this(processor, DEFAULT_SLEEP_TIME_MS);
+ }
+
+ /**
+ *
+ * @param processor processor to run in task
+ * @param sleepTime time to sleep when incoming queue is empty
+ */
+ public StreamsProcessorTask(StreamsProcessor processor, long sleepTime) {
+ this.processor = processor;
+ this.sleepTime = sleepTime;
+ this.keepRunning = new AtomicBoolean(true);
+ this.isRunning = new AtomicBoolean(true);
+ }
+
+ @Override
+ public void stopTask() {
+ this.keepRunning.set(false);
+ }
+
+ @Override
+ public void setStreamConfig(Map<String, Object> config) {
+ this.streamConfig = config;
+ }
+
+ @Override
+ public void addInputQueue(Queue<StreamsDatum> inputQueue) {
+ this.inQueue = inputQueue;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return this.isRunning.get();
+ }
+
+ @Override
+ public void run() {
+ try {
+ this.processor.prepare(this.streamConfig);
+ StreamsDatum datum = this.inQueue.poll();
+ while(datum != null || this.keepRunning.get()) {
+ if(datum != null) {
+ List<StreamsDatum> output = this.processor.process(datum);
+ if(output != null) {
+ for(StreamsDatum outDatum : output) {
+ super.addToOutgoingQueue(outDatum);
+ }
+ }
+ }
+ else {
+ try {
+ Thread.sleep(this.sleepTime);
+ } catch (InterruptedException e) {
+ this.keepRunning.set(false);
+ }
+ }
+ datum = this.inQueue.poll();
+ }
+
+ } finally {
+ this.processor.cleanUp();
+ this.isRunning.set(false);
+ }
+ }
+
+ @Override
+ public List<Queue<StreamsDatum>> getInputQueues() {
+ List<Queue<StreamsDatum>> queues = new LinkedList<Queue<StreamsDatum>>();
+ queues.add(this.inQueue);
+ return queues;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProviderTask.java b/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProviderTask.java
new file mode 100644
index 0000000..fe83160
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProviderTask.java
@@ -0,0 +1,132 @@
+package org.apache.streams.core.tasks;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ */
+public class StreamsProviderTask extends BaseStreamsTask {
+
+ private static enum Type {
+ READ_CURRENT,
+ READ_NEW,
+ READ_RANGE
+ }
+
+ private static final int START = 0;
+ private static final int END = 1;
+
+ private StreamsProvider provider;
+ private AtomicBoolean keepRunning;
+ private Type type;
+ private BigInteger sequence;
+ private DateTime[] dateRange;
+ private Map<String, Object> config;
+ private AtomicBoolean isRunning;
+
+ /**
+ * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readCurrent()}
+ * @param provider
+ */
+ public StreamsProviderTask(StreamsProvider provider) {
+ this.provider = provider;
+ this.type = Type.READ_CURRENT;
+ this.keepRunning = new AtomicBoolean(true);
+ this.isRunning = new AtomicBoolean(true);
+ }
+
+ /**
+ * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readNew(BigInteger)}
+ * @param provider
+ * @param sequence
+ */
+ public StreamsProviderTask(StreamsProvider provider, BigInteger sequence) {
+ this.provider = provider;
+ this.type = Type.READ_NEW;
+ this.sequence = sequence;
+ this.keepRunning = new AtomicBoolean(true);
+ this.isRunning = new AtomicBoolean(true);
+ }
+
+ /**
+ * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readRange(DateTime,DateTime)}
+ * @param provider
+ * @param start
+ * @param end
+ */
+ public StreamsProviderTask(StreamsProvider provider, DateTime start, DateTime end) {
+ this.provider = provider;
+ this.type = Type.READ_RANGE;
+ this.dateRange = new DateTime[2];
+ this.dateRange[START] = start;
+ this.dateRange[END] = end;
+ this.keepRunning = new AtomicBoolean(true);
+ this.isRunning = new AtomicBoolean(true);
+ }
+
+ @Override
+ public void stopTask() {
+ this.keepRunning.set(false);
+ }
+
+ @Override
+ public void addInputQueue(Queue<StreamsDatum> inputQueue) {
+ throw new UnsupportedOperationException(this.getClass().getName()+" does not support method - setInputQueue()");
+ }
+
+ @Override
+ public void setStreamConfig(Map<String, Object> config) {
+ this.config = config;
+ }
+
+ @Override
+ public void run() {
+ try {
+ this.provider.prepare(this.config); //TODO allow for configuration objects
+ StreamsResultSet resultSet = null;
+ this.isRunning.set(true);
+ switch(this.type) {
+ case READ_CURRENT: resultSet = this.provider.readCurrent();
+ break;
+ case READ_NEW: resultSet = this.provider.readNew(this.sequence);
+ break;
+ case READ_RANGE: resultSet = this.provider.readRange(this.dateRange[START], this.dateRange[END]);
+ break;
+ default: throw new RuntimeException("Type has not been added to StreamsProviderTask.");
+ }
+ for(StreamsDatum datum : resultSet) {
+ if(!this.keepRunning.get()) {
+ break;
+ }
+ if(datum != null)
+ super.addToOutgoingQueue(datum);
+ else {
+ try {
+ Thread.sleep(DEFAULT_SLEEP_TIME_MS);
+ } catch (InterruptedException e) {
+ this.keepRunning.set(false);
+ }
+ }
+ }
+
+ } catch( Exception e ) {
+ e.printStackTrace();
+ } finally
+ {
+ this.provider.cleanUp();
+ this.isRunning.set(false);
+ }
+ }
+
+ public boolean isRunning() {
+ return this.isRunning.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsTask.java b/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsTask.java
new file mode 100644
index 0000000..6121d83
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsTask.java
@@ -0,0 +1,58 @@
+package org.apache.streams.core.tasks;
+
+import org.apache.streams.core.StreamsDatum;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+/**
+ * Interface for all task that will be used to execute instances of {@link org.apache.streams.core.StreamsOperation}
+ * in local mode.
+ */
+public interface StreamsTask extends Runnable{
+
+ public static final long DEFAULT_SLEEP_TIME_MS = 500;
+
+ /**
+ * Informs the task to stop. Tasks may or may not try to empty its inbound queue before halting.
+ */
+ public void stopTask();
+
+ /**
+ * Add an input {@link java.util.Queue} for this task.
+ * @param inputQueue
+ */
+ public void addInputQueue(Queue<StreamsDatum> inputQueue);
+
+ /**
+ * Add an output {@link java.util.Queue} for this task.
+ * @param outputQueue
+ */
+ public void addOutputQueue(Queue<StreamsDatum> outputQueue);
+
+ /**
+ * Set the configuration object that will shared and passed to all instances of StreamsTask.
+ * @param config optional configuration information
+ */
+ public void setStreamConfig(Map<String, Object> config);
+
+ /**
+ * Returns true when the task has not completed. Returns false otherwise
+ * @return true when the task has not completed. Returns false otherwise
+ */
+ public boolean isRunning();
+
+ /**
+ * Returns the input queues that have been set for this task.
+ * @return list of input queues
+ */
+ public List<Queue<StreamsDatum>> getInputQueues();
+
+ /**
+ * Returns the output queues that have been set for this task
+ * @return list of output queues
+ */
+ public List<Queue<StreamsDatum>> getOutputQueues();
+
+}