You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/05/21 20:13:01 UTC
git commit: Re-enabled counts and tested that they work with
configured timeout (STREAMS-90)
Repository: incubator-streams
Updated Branches:
refs/heads/master 905cc8c77 -> 2ffdc754f
Re-enabled counts and tested that they work with configured timeout (STREAMS-90)
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2ffdc754
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2ffdc754
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2ffdc754
Branch: refs/heads/master
Commit: 2ffdc754f32047af63e66976b1c4282f73caa6a9
Parents: 905cc8c
Author: mfranklin <mf...@apache.org>
Authored: Wed May 21 14:12:34 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Wed May 21 14:12:34 2014 -0400
----------------------------------------------------------------------
.../local/builders/LocalStreamBuilder.java | 3 +-
.../local/tasks/StreamsProviderTask.java | 5 +-
.../apache/streams/local/tasks/StreamsTask.java | 10 +++-
.../local/builders/LocalStreamBuilderTest.java | 37 +++++++++++-
.../test/providers/EmptyResultSetProvider.java | 63 ++++++++++++++++++++
5 files changed, 112 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2ffdc754/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index d313b3f..cbfc463 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -347,7 +347,8 @@ public class LocalStreamBuilder implements StreamBuilder {
}
protected int getTimeout() {
- return streamConfig != null && streamConfig.containsKey(TIMEOUT_KEY) ? (Integer)streamConfig.get(TIMEOUT_KEY) : 3000;
+ //Set the timeout of it is configured, otherwise signal downstream components to use their default
+ return streamConfig != null && streamConfig.containsKey(TIMEOUT_KEY) ? (Integer)streamConfig.get(TIMEOUT_KEY) : -1;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2ffdc754/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index cc4844c..ff72e1b 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -32,8 +32,6 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
private static final int START = 0;
private static final int END = 1;
- private static final int DEFAULT_TIMEOUT_MS = 1000000;
-
private StreamsProvider provider;
private AtomicBoolean keepRunning;
private Type type;
@@ -118,6 +116,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
this.provider.prepare(this.config); //TODO allow for configuration objects
StreamsResultSet resultSet = null;
this.isRunning.set(true);
+ long maxZeros = timeout / DEFAULT_SLEEP_TIME_MS;
switch(this.type) {
case PERPETUAL: {
provider.startStream();
@@ -131,7 +130,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
}
flushResults(resultSet);
// the way this works needs to change...
- if( zeros > (timeout))
+ if(zeros > maxZeros)
this.keepRunning.set(false);
Thread.sleep(DEFAULT_SLEEP_TIME_MS);
} catch (InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2ffdc754/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
index e7f5a2c..402d8be 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
@@ -11,8 +11,16 @@ import java.util.Queue;
* in local mode.
*/
public interface StreamsTask extends Runnable{
+ /**
+ * Represents the default time to sleep a task before polling for the next set of work items
+ */
+ static final long DEFAULT_SLEEP_TIME_MS = 5000;
- public static final long DEFAULT_SLEEP_TIME_MS = 5000;
+ /**
+ * Represents the default amount of time to wait for new data to flow through the system before assuming it should
+ * shut down
+ */
+ static final int DEFAULT_TIMEOUT_MS = 100000; //Will result in a default timeout of 1 minute if used
/**
* Informs the task to stop. Tasks may or may not try to empty its inbound queue before halting.
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2ffdc754/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
index 0bdaf61..9801d95 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
@@ -1,9 +1,14 @@
package org.apache.streams.local.builders;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.test.processors.PassthroughDatumCounterProcessor;
import org.apache.streams.core.test.providers.NumericMessageProvider;
import org.apache.streams.core.test.writer.SystemOutWriter;
+import org.apache.streams.local.tasks.StreamsTask;
+import org.apache.streams.local.test.providers.EmptyResultSetProvider;
import org.junit.Before;
import org.junit.Test;
@@ -11,6 +16,7 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.HashSet;
+import java.util.Map;
import java.util.Scanner;
import static org.junit.Assert.*;
@@ -156,9 +162,38 @@ public class LocalStreamBuilderTest {
++count;
scanner.nextLine();
}
- assertThat(count, greaterThan(numDatums*2)); // using > because number of lines in system.out is non-deterministic
+ assertThat(count, greaterThan(numDatums * 2)); // using > because number of lines in system.out is non-deterministic
}
+ @Test
+ public void testDefaultProviderTimeout() {
+ long start = System.currentTimeMillis();
+ StreamBuilder builder = new LocalStreamBuilder();
+ builder.newPerpetualStream("prov1", new EmptyResultSetProvider())
+ .addStreamsProcessor("proc1", new PassthroughDatumCounterProcessor(), 1, "prov1")
+ .addStreamsProcessor("proc2", new PassthroughDatumCounterProcessor(), 1, "proc1")
+ .addStreamsPersistWriter("w1", new SystemOutWriter(), 1, "proc1");
+ builder.start();
+ long end = System.currentTimeMillis();
+ //We care mostly that it doesn't terminate too early. With thread shutdowns, etc, the actual time is indeterminate. Just make sure there is an upper bound
+ assertThat((int)(end - start), is(allOf(greaterThanOrEqualTo(StreamsTask.DEFAULT_TIMEOUT_MS), lessThanOrEqualTo(2 * (StreamsTask.DEFAULT_TIMEOUT_MS)))));
+ }
+ @Test
+ public void testConfiguredProviderTimeout() {
+ Map<String, Object> config = Maps.newHashMap();
+ int timeout = 10000;
+ config.put(LocalStreamBuilder.TIMEOUT_KEY, timeout);
+ long start = System.currentTimeMillis();
+ StreamBuilder builder = new LocalStreamBuilder(Queues.<StreamsDatum>newLinkedBlockingQueue(), config);
+ builder.newPerpetualStream("prov1", new EmptyResultSetProvider())
+ .addStreamsProcessor("proc1", new PassthroughDatumCounterProcessor(), 1, "prov1")
+ .addStreamsProcessor("proc2", new PassthroughDatumCounterProcessor(), 1, "proc1")
+ .addStreamsPersistWriter("w1", new SystemOutWriter(), 1, "proc1");
+ builder.start();
+ long end = System.currentTimeMillis();
+ //We care mostly that it doesn't terminate too early. With thread shutdowns, etc, the actual time is indeterminate. Just make sure there is an upper bound
+ assertThat((int)(end - start), is(allOf(greaterThanOrEqualTo(timeout), lessThanOrEqualTo(4 * timeout))));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2ffdc754/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
new file mode 100644
index 0000000..d8dc4eb
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.local.test.providers;
+
+import com.google.common.collect.Queues;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+
+/**
+ * Provides new, empty instances of result set.
+ */
+public class EmptyResultSetProvider implements StreamsProvider {
+ @Override
+ public void startStream() {
+ //NOP
+ }
+
+ @Override
+ public StreamsResultSet readCurrent() {
+ return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue());
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue());
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue());
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ //NOP
+ }
+
+ @Override
+ public void cleanUp() {
+ //NOP
+ }
+}