You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/09/03 18:06:56 UTC

git commit: STREAMS-161 | Setting started to true during early exception

Repository: incubator-streams
Updated Branches:
  refs/heads/master 67ab1ff6e -> fe4cec680


STREAMS-161 | Setting started to true during early exception


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/fe4cec68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/fe4cec68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/fe4cec68

Branch: refs/heads/master
Commit: fe4cec6806379c51864dee2f2e46da95e5a1264d
Parents: 67ab1ff
Author: mfranklin <mf...@apache.org>
Authored: Wed Sep 3 12:02:40 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Wed Sep 3 12:02:40 2014 -0400

----------------------------------------------------------------------
 streams-runtimes/streams-runtime-local/pom.xml  |   6 +
 .../local/tasks/StreamsProviderTask.java        |  17 ++-
 .../local/tasks/StreamsProviderTaskTest.java    | 147 +++++++++++++++++++
 3 files changed, 167 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fe4cec68/streams-runtimes/streams-runtime-local/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/pom.xml b/streams-runtimes/streams-runtime-local/pom.xml
index b7ddb9a..0d9138d 100644
--- a/streams-runtimes/streams-runtime-local/pom.xml
+++ b/streams-runtimes/streams-runtime-local/pom.xml
@@ -83,6 +83,12 @@
             <version>1.3</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>1.9.5</version>
+            <scope>test</scope>
+        </dependency>
 
     </dependencies>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fe4cec68/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index 8809d5a..d040c4b 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -60,6 +60,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
     private Map<String, Object> config;
 
     private int timeout;
+    private long sleepTime;
     private int zeros = 0;
     private DatumStatusCounter statusCounter = new DatumStatusCounter();
 
@@ -74,6 +75,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
         else
             this.type = Type.READ_CURRENT;
         this.timeout = DEFAULT_TIMEOUT_MS;
+        this.sleepTime = DEFAULT_SLEEP_TIME_MS;
     }
 
     /**
@@ -86,6 +88,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
         this.type = Type.READ_NEW;
         this.sequence = sequence;
         this.timeout = DEFAULT_TIMEOUT_MS;
+        this.sleepTime = DEFAULT_SLEEP_TIME_MS;
     }
 
     /**
@@ -101,12 +104,17 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
         this.dateRange[START] = start;
         this.dateRange[END] = end;
         this.timeout = DEFAULT_TIMEOUT_MS;
+        this.sleepTime = DEFAULT_SLEEP_TIME_MS;
     }
 
     public void setTimeout(int timeout) {
         this.timeout = timeout;
     }
 
+    public void setSleepTime(long sleepTime) {
+        this.sleepTime = sleepTime;
+    }
+
     @Override
     public void stopTask() {
         LOGGER.debug("Stopping Provider Task for {}", this.provider.getClass().getSimpleName());
@@ -130,7 +138,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
             this.provider.prepare(this.config); //TODO allow for configuration objects
             StreamsResultSet resultSet = null;
             //Negative values mean we want to run forever
-            long maxZeros = timeout < 0 ? Long.MAX_VALUE : (timeout / DEFAULT_SLEEP_TIME_MS);
+            long maxZeros = timeout < 0 ? Long.MAX_VALUE : (timeout / sleepTime);
             switch(this.type) {
                 case PERPETUAL: {
                     provider.startStream();
@@ -147,7 +155,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
                             // the way this works needs to change...
                             if(zeros > maxZeros)
                                 this.keepRunning.set(false);
-                            Thread.sleep(DEFAULT_SLEEP_TIME_MS);
+                            Thread.sleep(sleepTime);
                         } catch (InterruptedException e) {
                             LOGGER.warn("Thread interrupted");
                             this.keepRunning.set(false);
@@ -177,6 +185,9 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
         } finally {
             LOGGER.debug("Complete Provider Task execution for {}", this.provider.getClass().getSimpleName());
             this.provider.cleanUp();
+            //Setting started to 'true' here will allow the isRunning() method to return false in the event of an exception
+            //before started would normally be set to true n the run method.
+            this.started.set(true);
             this.keepRunning.set(false);
         }
     }
@@ -209,7 +220,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
             }
             else {
                 try {
-                    Thread.sleep(DEFAULT_SLEEP_TIME_MS);
+                    Thread.sleep(sleepTime);
                 } catch (InterruptedException e) {
                     LOGGER.warn("Thread interrupted");
                     this.keepRunning.set(false);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fe4cec68/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java
new file mode 100644
index 0000000..2cc4fed
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.local.tasks;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.util.ComponentUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests the StreamsProviderTask.
+ */
+public class StreamsProviderTaskTest {
+
+    protected StreamsProvider mockProvider;
+    protected ExecutorService pool;
+
+    @Before
+    public void setup() {
+        mockProvider = mock(StreamsProvider.class);
+        pool = Executors.newFixedThreadPool(1);
+    }
+
+    @Test
+    public void runPerpetual() {
+        StreamsProviderTask task = new StreamsProviderTask(mockProvider, true);
+        when(mockProvider.isRunning()).thenReturn(true);
+        when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>()));
+        task.setTimeout(500);
+        task.setSleepTime(10);
+        task.run();
+        //Setting this to at least 2 means that it was correctly set to perpetual mode
+        verify(mockProvider, atLeast(2)).readCurrent();
+        verify(mockProvider, atMost(1)).prepare(null);
+    }
+
+    @Test
+    public void flushes() {
+        Queue<StreamsDatum> out = new LinkedBlockingQueue<>();
+        StreamsProviderTask task = new StreamsProviderTask(mockProvider, true);
+        when(mockProvider.isRunning()).thenReturn(true);
+        when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(getQueue(3)));
+        task.setTimeout(100);
+        task.setSleepTime(10);
+        task.getOutputQueues().add(out);
+        task.run();
+        assertThat(out.size(), is(equalTo(3)));
+    }
+
+    protected Queue<StreamsDatum> getQueue(int numElems) {
+        Queue<StreamsDatum> results = new LinkedBlockingQueue<>();
+        for(int i=0; i<numElems; i++) {
+            results.add(new StreamsDatum(Math.random()));
+        }
+        return results;
+    }
+
+    @Test
+    public void runNonPerpetual() {
+        StreamsProviderTask task = new StreamsProviderTask(mockProvider, false);
+        when(mockProvider.isRunning()).thenReturn(true);
+        when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>()));
+        task.setTimeout(500);
+        task.setSleepTime(10);
+        task.run();
+        //In read current mode, this should only be called 1 time
+        verify(mockProvider, atLeast(1)).readCurrent();
+        verify(mockProvider, atMost(1)).prepare(null);
+    }
+
+    @Test
+    public void stoppable() throws InterruptedException {
+        StreamsProviderTask task = new StreamsProviderTask(mockProvider, true);
+        when(mockProvider.isRunning()).thenReturn(true);
+        when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>()));
+        task.setTimeout(-1);
+        task.setSleepTime(10);
+        Future<?> taskResult = pool.submit(task);
+
+        //After a few milliseconds, tell the task that it is to stop and wait until it says it isn't or a timeout happens
+        int count = 0;
+        do {
+            Thread.sleep(100);
+            if(count == 0) {
+                task.stopTask();
+            }
+        } while(++count < 10 && !taskResult.isDone());
+        verifyNotRunning(task, taskResult);
+
+    }
+
+    @Test
+    public void earlyException() throws InterruptedException {
+        StreamsProviderTask task = new StreamsProviderTask(mockProvider, true);
+        when(mockProvider.isRunning()).thenReturn(true);
+        doThrow(new RuntimeException()).when(mockProvider).prepare(null);
+        task.setTimeout(-1);
+        task.setSleepTime(10);
+        Future<?> taskResult = pool.submit(task);
+        int count = 0;
+        while(++count < 10 && !taskResult.isDone()) {
+            Thread.sleep(100);
+        }
+        verifyNotRunning(task, taskResult);
+    }
+
+    protected void verifyNotRunning(StreamsProviderTask task, Future<?> taskResult) {
+        //Make sure the task is reporting that it is complete and that the run method returned
+        if(taskResult.isDone()) {
+            assertThat(task.isRunning(), is(false));
+        } else {
+            ComponentUtils.shutdownExecutor(pool, 0, 10);
+            fail();
+        }
+    }
+}