You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/04/29 16:14:11 UTC

[3/3] git commit: Added configurable timeout to Providers

Added configurable timeout to Providers


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/85510a76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/85510a76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/85510a76

Branch: refs/heads/master
Commit: 85510a760d6fa3ab4b8d6afd833d220575d30f1e
Parents: fdba6ff
Author: mfranklin <mf...@apache.org>
Authored: Tue Apr 29 08:49:21 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Tue Apr 29 10:13:59 2014 -0400

----------------------------------------------------------------------
 .../streams/local/builders/LocalStreamBuilder.java    | 11 +++++++++--
 .../streams/local/builders/StreamComponent.java       |  8 +++++++-
 .../streams/local/tasks/StreamsProviderTask.java      | 14 ++++++++++++--
 3 files changed, 28 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/85510a76/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index bf1abe6..8e688ba 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
  */
 public class LocalStreamBuilder implements StreamBuilder {
 
+    public static final String TIMEOUT_KEY = "TIMEOUT";
     private Map<String, StreamComponent> providers;
     private Map<String, StreamComponent> components;
     private Queue<StreamsDatum> queue;
@@ -150,7 +151,7 @@ public class LocalStreamBuilder implements StreamBuilder {
                 int tasks = comp.getNumTasks();
                 List<StreamsTask> compTasks = new LinkedList<StreamsTask>();
                 for(int i=0; i < tasks; ++i) {
-                    StreamsTask task = comp.createConnectedTask();
+                    StreamsTask task = comp.createConnectedTask(getTimeout());
                     task.setStreamConfig(this.streamConfig);
                     this.executor.submit(task);
                     compTasks.add(task);
@@ -162,7 +163,7 @@ public class LocalStreamBuilder implements StreamBuilder {
                 streamsTasks.put(comp.getId(), compTasks);
             }
             for(StreamComponent prov : this.providers.values()) {
-                StreamsTask task = prov.createConnectedTask();
+                StreamsTask task = prov.createConnectedTask(getTimeout());
                 task.setStreamConfig(this.streamConfig);
                 this.executor.submit(task);
                 provTasks.put(prov.getId(), (StreamsProviderTask) task);
@@ -176,6 +177,9 @@ public class LocalStreamBuilder implements StreamBuilder {
                 for(StreamsProviderTask task : provTasks.values()) {
                     isRunning = isRunning || task.isRunning();
                 }
+                for(StreamComponent task: components.values()) {
+                    isRunning = isRunning || task.getInBoundQueue().size() > 0;
+                }
                 if(isRunning) {
                     Thread.sleep(3000);
                 }
@@ -295,5 +299,8 @@ public class LocalStreamBuilder implements StreamBuilder {
         return (Queue<StreamsDatum>) SerializationUtil.cloneBySerialization(this.queue);
     }
 
+    protected int getTimeout() {
+        return streamConfig != null && streamConfig.containsKey(TIMEOUT_KEY) ? (Integer)streamConfig.get(TIMEOUT_KEY) : 3000;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/85510a76/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
index f5e9978..5508cb8 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
@@ -160,9 +160,11 @@ public class StreamComponent {
     /**
      * Creates a {@link org.apache.streams.local.tasks.StreamsTask} that is running a clone of this component whose
      * inbound and outbound queues are appropriately connected to the parent and child nodes.
+     *
      * @return StreamsTask for this component
+     * @param timeout The timeout to use in milliseconds for any tasks that support configurable timeout
      */
-    public StreamsTask createConnectedTask() {
+    public StreamsTask createConnectedTask(int timeout) {
         StreamsTask task;
         if(this.processor != null) {
             if(this.numTasks > 1) {
@@ -201,6 +203,10 @@ public class StreamComponent {
                 task = new StreamsProviderTask(prov, this.sequence);
             else
                 task = new StreamsProviderTask(prov, this.dateRange[0], this.dateRange[1]);
+            //Adjust the timeout if necessary
+            if(timeout > 0) {
+                ((StreamsProviderTask)task).setTimeout(timeout);
+            }
             for(Queue<StreamsDatum> q : this.outBound.values()) {
                 task.addOutputQueue(q);
             }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/85510a76/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index 5cf515c..add44bb 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -42,6 +42,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
     private Map<String, Object> config;
     private AtomicBoolean isRunning;
 
+    private int timeout;
     private int zeros = 0;
     private DatumStatusCounter statusCounter = new DatumStatusCounter();
 
@@ -57,6 +58,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
             this.type = Type.READ_CURRENT;
         this.keepRunning = new AtomicBoolean(true);
         this.isRunning = new AtomicBoolean(true);
+        this.timeout = DEFAULT_TIMEOUT_MS;
     }
 
     /**
@@ -70,6 +72,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
         this.sequence = sequence;
         this.keepRunning = new AtomicBoolean(true);
         this.isRunning = new AtomicBoolean(true);
+        this.timeout = DEFAULT_TIMEOUT_MS;
     }
 
     /**
@@ -86,6 +89,11 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
         this.dateRange[END] = end;
         this.keepRunning = new AtomicBoolean(true);
         this.isRunning = new AtomicBoolean(true);
+        this.timeout = DEFAULT_TIMEOUT_MS;
+    }
+
+    public void setTimeout(int timeout) {
+        this.timeout = timeout;
     }
 
     @Override
@@ -103,6 +111,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
         this.config = config;
     }
 
+
     @Override
     public void run() {
         try {
@@ -112,7 +121,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
             switch(this.type) {
                 case PERPETUAL: {
                     provider.startStream();
-                    while(this.keepRunning.get() == true) {
+                    while(this.keepRunning.get()) {
                         try {
                             resultSet = provider.readCurrent();
                             if( resultSet.size() == 0 )
@@ -121,7 +130,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
                                 zeros = 0;
                             }
                             flushResults(resultSet);
-                            if( zeros > (DEFAULT_TIMEOUT_MS / DEFAULT_SLEEP_TIME_MS))
+                            if( zeros > (timeout / DEFAULT_SLEEP_TIME_MS))
                                 this.keepRunning.set(false);
                             Thread.sleep(DEFAULT_SLEEP_TIME_MS);
                         } catch (InterruptedException e) {
@@ -149,6 +158,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
         }
     }
 
+    @Override
     public boolean isRunning() {
         return this.isRunning.get();
     }