You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2015/10/23 23:26:53 UTC

[1/3] incubator-streams git commit: resolves STREAMS-362

Repository: incubator-streams
Updated Branches:
  refs/heads/master 8154b0406 -> f8e508c1f


resolves STREAMS-362


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/bf70f28a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/bf70f28a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/bf70f28a

Branch: refs/heads/master
Commit: bf70f28a1c5331f0f4665ddc915df5f61c68bd16
Parents: a8e1cc7
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Thu Sep 3 20:27:01 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Thu Sep 3 20:27:01 2015 -0500

----------------------------------------------------------------------
 .../local/builders/LocalStreamBuilder.java      | 14 ++++-----
 .../jsonschema/LocalRuntimeConfiguration.json   | 30 ++++++++++++++++++++
 2 files changed, 37 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bf70f28a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index f31c7ed..3f65c65 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -280,7 +280,7 @@ public class LocalStreamBuilder implements StreamBuilder {
             forcedShutDown = true;
         } finally{
             LOGGER.info("Stream has completed, pausing @ {}", System.currentTimeMillis());
-            Uninterruptibles.sleepUninterruptibly(5000, TimeUnit.MILLISECONDS);
+            Uninterruptibles.sleepUninterruptibly(streamConfig.getShutdownPauseMs(), TimeUnit.MILLISECONDS);
             LOGGER.info("Stream has completed, shutting down @ {}", System.currentTimeMillis());
             stopInternal(forcedShutDown);
         }
@@ -310,10 +310,10 @@ public class LocalStreamBuilder implements StreamBuilder {
         this.executor.shutdown();
         this.monitor.shutdown();
         try {
-            if(!this.executor.awaitTermination(3, TimeUnit.SECONDS)){
+            if(!this.executor.awaitTermination(streamConfig.getExecutorShutdownPauseMs(), TimeUnit.MILLISECONDS)){
                 this.executor.shutdownNow();
             }
-            if(!this.monitor.awaitTermination(3, TimeUnit.SECONDS)){
+            if(!this.monitor.awaitTermination(streamConfig.getMonitorShutdownPauseMs(), TimeUnit.MILLISECONDS)){
                 this.monitor.shutdownNow();
             }
         }catch (InterruptedException ie) {
@@ -334,13 +334,13 @@ public class LocalStreamBuilder implements StreamBuilder {
             shutDownTask(prov, streamsTasks);
         }
         //need to make this configurable
-        if(!this.executor.awaitTermination(10, TimeUnit.SECONDS)) { // all threads should have terminated already.
+        if(!this.executor.awaitTermination(streamConfig.getExecutorShutdownWaitMs(), TimeUnit.MILLISECONDS)) { // all threads should have terminated already.
             this.executor.shutdownNow();
-            this.executor.awaitTermination(10, TimeUnit.SECONDS);
+            this.executor.awaitTermination(streamConfig.getExecutorShutdownWaitMs(), TimeUnit.MILLISECONDS);
         }
-        if(!this.monitor.awaitTermination(5, TimeUnit.SECONDS)) { // all threads should have terminated already.
+        if(!this.monitor.awaitTermination(streamConfig.getMonitorShutdownWaitMs(), TimeUnit.MILLISECONDS)) { // all threads should have terminated already.
             this.monitor.shutdownNow();
-            this.monitor.awaitTermination(5, TimeUnit.SECONDS);
+            this.monitor.awaitTermination(streamConfig.getMonitorShutdownWaitMs(), TimeUnit.MILLISECONDS);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bf70f28a/streams-runtimes/streams-runtime-local/src/main/jsonschema/LocalRuntimeConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/jsonschema/LocalRuntimeConfiguration.json b/streams-runtimes/streams-runtime-local/src/main/jsonschema/LocalRuntimeConfiguration.json
index 36177d9..1ba9bfa 100644
--- a/streams-runtimes/streams-runtime-local/src/main/jsonschema/LocalRuntimeConfiguration.json
+++ b/streams-runtimes/streams-runtime-local/src/main/jsonschema/LocalRuntimeConfiguration.json
@@ -14,6 +14,36 @@
         "monitoring": {
             "type" : "object",
             "javaType" : "org.apache.streams.local.monitoring.MonitoringConfiguration"
+        },
+        "executorShutdownPauseMs": {
+            "type": "integer",
+            "description": "Duration of pause before shutting down",
+            "default": 3000
+        },
+        "monitorShutdownPauseMs": {
+            "type": "integer",
+            "description": "Duration of pause before shutting down",
+            "default": 3000
+        },
+        "executorShutdownWaitMs": {
+            "type": "integer",
+            "description": "Duration of wait for shut down",
+            "default": 10000
+        },
+        "monitorShutdownWaitMs": {
+            "type": "integer",
+            "description": "Duration of wait for shut down",
+            "default": 5000
+        },
+        "shutdownPauseMs": {
+            "type": "integer",
+            "description": "Duration of pause before shutting down",
+            "default": 1000
+        },
+        "taskTimeoutMs": {
+            "type": "integer",
+            "description": "Max duration to allow tasks to terminate",
+            "default": 1000
         }
     }
 }
\ No newline at end of file


[3/3] incubator-streams git commit: Merge commit 'a3c2ef850d16a300d9e70e75548099ad31493754'

Posted by sb...@apache.org.
Merge commit 'a3c2ef850d16a300d9e70e75548099ad31493754'

* commit 'a3c2ef850d16a300d9e70e75548099ad31493754':
  one hunk was rejected
  resolves STREAMS-362


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f8e508c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f8e508c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f8e508c1

Branch: refs/heads/master
Commit: f8e508c1fa903bf4aca8c25bd387f2b86af8822a
Parents: 8154b04 a3c2ef8
Author: Steve Blackmon <sb...@apache.org>
Authored: Fri Oct 23 16:26:49 2015 -0500
Committer: Steve Blackmon <sb...@apache.org>
Committed: Fri Oct 23 16:26:49 2015 -0500

----------------------------------------------------------------------
 .../local/builders/LocalStreamBuilder.java      | 28 +++++++++++-------
 .../jsonschema/LocalRuntimeConfiguration.json   | 30 ++++++++++++++++++++
 2 files changed, 47 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f8e508c1/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------


[2/3] incubator-streams git commit: one hunk was rejected

Posted by sb...@apache.org.
one hunk was rejected


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a3c2ef85
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a3c2ef85
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a3c2ef85

Branch: refs/heads/master
Commit: a3c2ef850d16a300d9e70e75548099ad31493754
Parents: bf70f28
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Sun Sep 13 17:31:07 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Sun Sep 13 17:31:07 2015 -0500

----------------------------------------------------------------------
 .../streams/local/builders/LocalStreamBuilder.java    | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a3c2ef85/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 3f65c65..71dc7a6 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -409,10 +409,11 @@ public class LocalStreamBuilder implements StreamBuilder {
                 }
                 for(StreamsTask task : tasks) {
                     int count = 0;
-                    while(count < 20 && task.isRunning()) {
-                        Thread.sleep(500);
+                    while(count < streamConfig.getTaskTimeoutMs() / 1000 && task.isRunning()) {
+                        Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
                         count++;
                     }
+
                     if(task.isRunning()) {
                         LOGGER.warn("Task {} failed to terminate in allotted timeframe", task.toString());
                     }
@@ -444,8 +445,13 @@ public class LocalStreamBuilder implements StreamBuilder {
             LOGGER.error("Exception while trying to shutdown Stream: {}", e);
             forceShutdown(tasks);
         } finally {
-            if(!systemExiting) {
-                detachShutdownHandler();
+            try {
+                 if(!systemExiting) {
+                      detachShutdownHandler();
+                  }
+               } catch( Throwable e3 ) {
+                 LOGGER.error("StopInternal caught Throwable: {}", e3);
+                       System.exit(1);
             }
         }
     }