You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/04/29 16:14:11 UTC
[3/3] git commit: Added configurable timeout to Providers
Added configurable timeout to Providers
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/85510a76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/85510a76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/85510a76
Branch: refs/heads/master
Commit: 85510a760d6fa3ab4b8d6afd833d220575d30f1e
Parents: fdba6ff
Author: mfranklin <mf...@apache.org>
Authored: Tue Apr 29 08:49:21 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Tue Apr 29 10:13:59 2014 -0400
----------------------------------------------------------------------
.../streams/local/builders/LocalStreamBuilder.java | 11 +++++++++--
.../streams/local/builders/StreamComponent.java | 8 +++++++-
.../streams/local/tasks/StreamsProviderTask.java | 14 ++++++++++++--
3 files changed, 28 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/85510a76/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index bf1abe6..8e688ba 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
*/
public class LocalStreamBuilder implements StreamBuilder {
+ public static final String TIMEOUT_KEY = "TIMEOUT";
private Map<String, StreamComponent> providers;
private Map<String, StreamComponent> components;
private Queue<StreamsDatum> queue;
@@ -150,7 +151,7 @@ public class LocalStreamBuilder implements StreamBuilder {
int tasks = comp.getNumTasks();
List<StreamsTask> compTasks = new LinkedList<StreamsTask>();
for(int i=0; i < tasks; ++i) {
- StreamsTask task = comp.createConnectedTask();
+ StreamsTask task = comp.createConnectedTask(getTimeout());
task.setStreamConfig(this.streamConfig);
this.executor.submit(task);
compTasks.add(task);
@@ -162,7 +163,7 @@ public class LocalStreamBuilder implements StreamBuilder {
streamsTasks.put(comp.getId(), compTasks);
}
for(StreamComponent prov : this.providers.values()) {
- StreamsTask task = prov.createConnectedTask();
+ StreamsTask task = prov.createConnectedTask(getTimeout());
task.setStreamConfig(this.streamConfig);
this.executor.submit(task);
provTasks.put(prov.getId(), (StreamsProviderTask) task);
@@ -176,6 +177,9 @@ public class LocalStreamBuilder implements StreamBuilder {
for(StreamsProviderTask task : provTasks.values()) {
isRunning = isRunning || task.isRunning();
}
+ for(StreamComponent task: components.values()) {
+ isRunning = isRunning || task.getInBoundQueue().size() > 0;
+ }
if(isRunning) {
Thread.sleep(3000);
}
@@ -295,5 +299,8 @@ public class LocalStreamBuilder implements StreamBuilder {
return (Queue<StreamsDatum>) SerializationUtil.cloneBySerialization(this.queue);
}
+ protected int getTimeout() {
+ return streamConfig != null && streamConfig.containsKey(TIMEOUT_KEY) ? (Integer)streamConfig.get(TIMEOUT_KEY) : 3000;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/85510a76/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
index f5e9978..5508cb8 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
@@ -160,9 +160,11 @@ public class StreamComponent {
/**
* Creates a {@link org.apache.streams.local.tasks.StreamsTask} that is running a clone of this component whose
* inbound and outbound queues are appropriately connected to the parent and child nodes.
+ *
* @return StreamsTask for this component
+ * @param timeout The timeout to use in milliseconds for any tasks that support configurable timeout
*/
- public StreamsTask createConnectedTask() {
+ public StreamsTask createConnectedTask(int timeout) {
StreamsTask task;
if(this.processor != null) {
if(this.numTasks > 1) {
@@ -201,6 +203,10 @@ public class StreamComponent {
task = new StreamsProviderTask(prov, this.sequence);
else
task = new StreamsProviderTask(prov, this.dateRange[0], this.dateRange[1]);
+ //Adjust the timeout if necessary
+ if(timeout > 0) {
+ ((StreamsProviderTask)task).setTimeout(timeout);
+ }
for(Queue<StreamsDatum> q : this.outBound.values()) {
task.addOutputQueue(q);
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/85510a76/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index 5cf515c..add44bb 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -42,6 +42,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
private Map<String, Object> config;
private AtomicBoolean isRunning;
+ private int timeout;
private int zeros = 0;
private DatumStatusCounter statusCounter = new DatumStatusCounter();
@@ -57,6 +58,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
this.type = Type.READ_CURRENT;
this.keepRunning = new AtomicBoolean(true);
this.isRunning = new AtomicBoolean(true);
+ this.timeout = DEFAULT_TIMEOUT_MS;
}
/**
@@ -70,6 +72,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
this.sequence = sequence;
this.keepRunning = new AtomicBoolean(true);
this.isRunning = new AtomicBoolean(true);
+ this.timeout = DEFAULT_TIMEOUT_MS;
}
/**
@@ -86,6 +89,11 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
this.dateRange[END] = end;
this.keepRunning = new AtomicBoolean(true);
this.isRunning = new AtomicBoolean(true);
+ this.timeout = DEFAULT_TIMEOUT_MS;
+ }
+
+ public void setTimeout(int timeout) {
+ this.timeout = timeout;
}
@Override
@@ -103,6 +111,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
this.config = config;
}
+
@Override
public void run() {
try {
@@ -112,7 +121,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
switch(this.type) {
case PERPETUAL: {
provider.startStream();
- while(this.keepRunning.get() == true) {
+ while(this.keepRunning.get()) {
try {
resultSet = provider.readCurrent();
if( resultSet.size() == 0 )
@@ -121,7 +130,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
zeros = 0;
}
flushResults(resultSet);
- if( zeros > (DEFAULT_TIMEOUT_MS / DEFAULT_SLEEP_TIME_MS))
+ if( zeros > (timeout / DEFAULT_SLEEP_TIME_MS))
this.keepRunning.set(false);
Thread.sleep(DEFAULT_SLEEP_TIME_MS);
} catch (InterruptedException e) {
@@ -149,6 +158,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
}
}
+ @Override
public boolean isRunning() {
return this.isRunning.get();
}