You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/05/21 20:13:01 UTC

git commit: Re-enabled counts and tested that they work with configured timeout (STREAMS-90)

Repository: incubator-streams
Updated Branches:
  refs/heads/master 905cc8c77 -> 2ffdc754f


Re-enabled counts and tested that they work with configured timeout (STREAMS-90)


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2ffdc754
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2ffdc754
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2ffdc754

Branch: refs/heads/master
Commit: 2ffdc754f32047af63e66976b1c4282f73caa6a9
Parents: 905cc8c
Author: mfranklin <mf...@apache.org>
Authored: Wed May 21 14:12:34 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Wed May 21 14:12:34 2014 -0400

----------------------------------------------------------------------
 .../local/builders/LocalStreamBuilder.java      |  3 +-
 .../local/tasks/StreamsProviderTask.java        |  5 +-
 .../apache/streams/local/tasks/StreamsTask.java | 10 +++-
 .../local/builders/LocalStreamBuilderTest.java  | 37 +++++++++++-
 .../test/providers/EmptyResultSetProvider.java  | 63 ++++++++++++++++++++
 5 files changed, 112 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2ffdc754/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index d313b3f..cbfc463 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -347,7 +347,8 @@ public class LocalStreamBuilder implements StreamBuilder {
     }
 
     protected int getTimeout() {
-        return streamConfig != null && streamConfig.containsKey(TIMEOUT_KEY) ? (Integer)streamConfig.get(TIMEOUT_KEY) : 3000;
+    //Set the timeout of it is configured, otherwise signal downstream components to use their default
+        return streamConfig != null && streamConfig.containsKey(TIMEOUT_KEY) ? (Integer)streamConfig.get(TIMEOUT_KEY) : -1;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2ffdc754/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index cc4844c..ff72e1b 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -32,8 +32,6 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
     private static final int START = 0;
     private static final int END = 1;
 
-    private static final int DEFAULT_TIMEOUT_MS = 1000000;
-
     private StreamsProvider provider;
     private AtomicBoolean keepRunning;
     private Type type;
@@ -118,6 +116,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
             this.provider.prepare(this.config); //TODO allow for configuration objects
             StreamsResultSet resultSet = null;
             this.isRunning.set(true);
+            long maxZeros = timeout / DEFAULT_SLEEP_TIME_MS;
             switch(this.type) {
                 case PERPETUAL: {
                     provider.startStream();
@@ -131,7 +130,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
                             }
                             flushResults(resultSet);
                             // the way this works needs to change...
-                            if( zeros > (timeout))
+                            if(zeros > maxZeros)
                                 this.keepRunning.set(false);
                             Thread.sleep(DEFAULT_SLEEP_TIME_MS);
                         } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2ffdc754/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
index e7f5a2c..402d8be 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
@@ -11,8 +11,16 @@ import java.util.Queue;
  * in local mode.
  */
 public interface StreamsTask extends Runnable{
+    /**
+     * Represents the default time to sleep a task before polling for the next set of work items
+     */
+    static final long DEFAULT_SLEEP_TIME_MS = 5000;
 
-    public static final long DEFAULT_SLEEP_TIME_MS = 5000;
+    /**
+     * Represents the default amount of time to wait for new data to flow through the system before assuming it should
+     * shut down
+     */
+    static final int DEFAULT_TIMEOUT_MS = 100000; //Will result in a default timeout of 1 minute if used
 
     /**
      * Informs the task to stop. Tasks may or may not try to empty its inbound queue before halting.

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2ffdc754/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
index 0bdaf61..9801d95 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
@@ -1,9 +1,14 @@
 package org.apache.streams.local.builders;
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
 import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.test.processors.PassthroughDatumCounterProcessor;
 import org.apache.streams.core.test.providers.NumericMessageProvider;
 import org.apache.streams.core.test.writer.SystemOutWriter;
+import org.apache.streams.local.tasks.StreamsTask;
+import org.apache.streams.local.test.providers.EmptyResultSetProvider;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -11,6 +16,7 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Scanner;
 
 import static org.junit.Assert.*;
@@ -156,9 +162,38 @@ public class LocalStreamBuilderTest {
             ++count;
             scanner.nextLine();
         }
-        assertThat(count, greaterThan(numDatums*2)); // using > because number of lines in system.out is non-deterministic
+        assertThat(count, greaterThan(numDatums * 2)); // using > because number of lines in system.out is non-deterministic
 
     }
 
+    @Test
+    public void testDefaultProviderTimeout() {
+        long start = System.currentTimeMillis();
+        StreamBuilder builder = new LocalStreamBuilder();
+        builder.newPerpetualStream("prov1", new EmptyResultSetProvider())
+                .addStreamsProcessor("proc1", new PassthroughDatumCounterProcessor(), 1, "prov1")
+                .addStreamsProcessor("proc2", new PassthroughDatumCounterProcessor(), 1, "proc1")
+                .addStreamsPersistWriter("w1", new SystemOutWriter(), 1, "proc1");
+        builder.start();
+        long end = System.currentTimeMillis();
+        //We care mostly that it doesn't terminate too early.  With thread shutdowns, etc, the actual time is indeterminate.  Just make sure there is an upper bound
+        assertThat((int)(end - start), is(allOf(greaterThanOrEqualTo(StreamsTask.DEFAULT_TIMEOUT_MS), lessThanOrEqualTo(2 * (StreamsTask.DEFAULT_TIMEOUT_MS)))));
+    }
 
+    @Test
+    public void testConfiguredProviderTimeout() {
+        Map<String, Object> config = Maps.newHashMap();
+        int timeout = 10000;
+        config.put(LocalStreamBuilder.TIMEOUT_KEY, timeout);
+        long start = System.currentTimeMillis();
+        StreamBuilder builder = new LocalStreamBuilder(Queues.<StreamsDatum>newLinkedBlockingQueue(), config);
+        builder.newPerpetualStream("prov1", new EmptyResultSetProvider())
+                .addStreamsProcessor("proc1", new PassthroughDatumCounterProcessor(), 1, "prov1")
+                .addStreamsProcessor("proc2", new PassthroughDatumCounterProcessor(), 1, "proc1")
+                .addStreamsPersistWriter("w1", new SystemOutWriter(), 1, "proc1");
+        builder.start();
+        long end = System.currentTimeMillis();
+        //We care mostly that it doesn't terminate too early.  With thread shutdowns, etc, the actual time is indeterminate.  Just make sure there is an upper bound
+        assertThat((int)(end - start), is(allOf(greaterThanOrEqualTo(timeout), lessThanOrEqualTo(4 * timeout))));
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2ffdc754/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
new file mode 100644
index 0000000..d8dc4eb
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.local.test.providers;
+
+import com.google.common.collect.Queues;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+
+/**
+ * Provides new, empty instances of result set.
+ */
+public class EmptyResultSetProvider implements StreamsProvider {
+    @Override
+    public void startStream() {
+        //NOP
+    }
+
+    @Override
+    public StreamsResultSet readCurrent() {
+        return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue());
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger sequence) {
+        return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue());
+    }
+
+    @Override
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue());
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        //NOP
+    }
+
+    @Override
+    public void cleanUp() {
+        //NOP
+    }
+}