You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/22 00:20:18 UTC

[40/71] [abbrv] adding uncommitted core classes, and updates to twitter, es, mongo, hdfs

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProcessorTask.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProcessorTask.java b/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProcessorTask.java
new file mode 100644
index 0000000..30466a3
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProcessorTask.java
@@ -0,0 +1,102 @@
+package org.apache.streams.core.tasks;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.StreamsProcessor;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ */
+public class StreamsProcessorTask extends BaseStreamsTask {
+
+
+    private StreamsProcessor processor;
+    private long sleepTime;
+    private AtomicBoolean keepRunning;
+    private Map<String, Object> streamConfig;
+    private Queue<StreamsDatum> inQueue;
+    private AtomicBoolean isRunning;
+
+    /**
+     * Default constructor, uses default sleep time of 500ms when inbound queue is empty
+     * @param processor process to run in task
+     */
+    public StreamsProcessorTask(StreamsProcessor processor) {
+        this(processor, DEFAULT_SLEEP_TIME_MS);
+    }
+
+    /**
+     *
+     * @param processor processor to run in task
+     * @param sleepTime time to sleep when incoming queue is empty
+     */
+    public StreamsProcessorTask(StreamsProcessor processor, long sleepTime) {
+        this.processor = processor;
+        this.sleepTime = sleepTime;
+        this.keepRunning = new AtomicBoolean(true);
+        this.isRunning = new AtomicBoolean(true);
+    }
+
+    @Override
+    public void stopTask() {
+        this.keepRunning.set(false);
+    }
+
+    @Override
+    public void setStreamConfig(Map<String, Object> config) {
+        this.streamConfig = config;
+    }
+
+    @Override
+    public void addInputQueue(Queue<StreamsDatum> inputQueue) {
+        this.inQueue = inputQueue;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return this.isRunning.get();
+    }
+
+    @Override
+    public void run() {
+        try {
+            this.processor.prepare(this.streamConfig);
+            StreamsDatum datum = this.inQueue.poll();
+            while(datum != null || this.keepRunning.get()) {
+                if(datum != null) {
+                    List<StreamsDatum> output = this.processor.process(datum);
+                    if(output != null) {
+                        for(StreamsDatum outDatum : output) {
+                            super.addToOutgoingQueue(outDatum);
+                        }
+                    }
+                }
+                else {
+                    try {
+                        Thread.sleep(this.sleepTime);
+                    } catch (InterruptedException e) {
+                        this.keepRunning.set(false);
+                    }
+                }
+                datum = this.inQueue.poll();
+            }
+
+        } finally {
+            this.processor.cleanUp();
+            this.isRunning.set(false);
+        }
+    }
+
+    @Override
+    public List<Queue<StreamsDatum>> getInputQueues() {
+        List<Queue<StreamsDatum>> queues = new LinkedList<Queue<StreamsDatum>>();
+        queues.add(this.inQueue);
+        return queues;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProviderTask.java b/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProviderTask.java
new file mode 100644
index 0000000..fe83160
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsProviderTask.java
@@ -0,0 +1,132 @@
+package org.apache.streams.core.tasks;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ */
+public class StreamsProviderTask extends BaseStreamsTask {
+
+    private static enum Type {
+        READ_CURRENT,
+        READ_NEW,
+        READ_RANGE
+    }
+
+    private static final int START = 0;
+    private static final int END = 1;
+
+    private StreamsProvider provider;
+    private AtomicBoolean keepRunning;
+    private Type type;
+    private BigInteger sequence;
+    private DateTime[] dateRange;
+    private Map<String, Object> config;
+    private AtomicBoolean isRunning;
+
+    /**
+     * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readCurrent()}
+     * @param provider
+     */
+    public StreamsProviderTask(StreamsProvider provider) {
+        this.provider = provider;
+        this.type = Type.READ_CURRENT;
+        this.keepRunning = new AtomicBoolean(true);
+        this.isRunning = new AtomicBoolean(true);
+    }
+
+    /**
+     * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readNew(BigInteger)}
+     * @param provider
+     * @param sequence
+     */
+    public StreamsProviderTask(StreamsProvider provider, BigInteger sequence) {
+        this.provider = provider;
+        this.type = Type.READ_NEW;
+        this.sequence = sequence;
+        this.keepRunning = new AtomicBoolean(true);
+        this.isRunning = new AtomicBoolean(true);
+    }
+
+    /**
+     * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readRange(DateTime,DateTime)}
+     * @param provider
+     * @param start
+     * @param end
+     */
+    public StreamsProviderTask(StreamsProvider provider, DateTime start, DateTime end) {
+        this.provider = provider;
+        this.type = Type.READ_RANGE;
+        this.dateRange = new DateTime[2];
+        this.dateRange[START] = start;
+        this.dateRange[END] = end;
+        this.keepRunning = new AtomicBoolean(true);
+        this.isRunning = new AtomicBoolean(true);
+    }
+
+    @Override
+    public void stopTask() {
+        this.keepRunning.set(false);
+    }
+
+    @Override
+    public void addInputQueue(Queue<StreamsDatum> inputQueue) {
+        throw new UnsupportedOperationException(this.getClass().getName()+" does not support method - setInputQueue()");
+    }
+
+    @Override
+    public void setStreamConfig(Map<String, Object> config) {
+        this.config = config;
+    }
+
+    @Override
+    public void run() {
+        try {
+            this.provider.prepare(this.config); //TODO allow for configuration objects
+            StreamsResultSet resultSet = null;
+            this.isRunning.set(true);
+            switch(this.type) {
+                case READ_CURRENT: resultSet = this.provider.readCurrent();
+                    break;
+                case READ_NEW: resultSet = this.provider.readNew(this.sequence);
+                    break;
+                case READ_RANGE: resultSet = this.provider.readRange(this.dateRange[START], this.dateRange[END]);
+                    break;
+                default: throw new RuntimeException("Type has not been added to StreamsProviderTask.");
+            }
+            for(StreamsDatum datum : resultSet) {
+                if(!this.keepRunning.get()) {
+                    break;
+                }
+                if(datum != null)
+                 super.addToOutgoingQueue(datum);
+                else {
+                    try {
+                        Thread.sleep(DEFAULT_SLEEP_TIME_MS);
+                    } catch (InterruptedException e) {
+                        this.keepRunning.set(false);
+                    }
+                }
+            }
+
+        } catch( Exception e ) {
+            e.printStackTrace();
+        } finally
+        {
+            this.provider.cleanUp();
+            this.isRunning.set(false);
+        }
+    }
+
+    public boolean isRunning() {
+        return this.isRunning.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4705fcb7/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsTask.java b/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsTask.java
new file mode 100644
index 0000000..6121d83
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/tasks/StreamsTask.java
@@ -0,0 +1,58 @@
+package org.apache.streams.core.tasks;
+
+import org.apache.streams.core.StreamsDatum;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+/**
+ * Interface for all task that will be used to execute instances of {@link org.apache.streams.core.StreamsOperation}
+ * in local mode.
+ */
+public interface StreamsTask extends Runnable{
+
+    public static final long DEFAULT_SLEEP_TIME_MS = 500;
+
+    /**
+     * Informs the task to stop. Tasks may or may not try to empty its inbound queue before halting.
+     */
+    public void stopTask();
+
+    /**
+     * Add an input {@link java.util.Queue} for this task.
+     * @param inputQueue
+     */
+    public void addInputQueue(Queue<StreamsDatum> inputQueue);
+
+    /**
+     * Add an output {@link java.util.Queue} for this task.
+     * @param outputQueue
+     */
+    public void addOutputQueue(Queue<StreamsDatum> outputQueue);
+
+    /**
+     * Set the configuration object that will shared and passed to all instances of StreamsTask.
+     * @param config optional configuration information
+     */
+    public void setStreamConfig(Map<String, Object> config);
+
+    /**
+     * Returns true when the task has not completed. Returns false otherwise
+     * @return true when the task has not completed. Returns false otherwise
+     */
+    public boolean isRunning();
+
+    /**
+     * Returns the input queues that have been set for this task.
+     * @return list of input queues
+     */
+    public List<Queue<StreamsDatum>> getInputQueues();
+
+    /**
+     * Returns the output queues that have been set for this task
+     * @return list of output queues
+     */
+    public List<Queue<StreamsDatum>> getOutputQueues();
+
+}