You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/07/10 19:00:26 UTC

[1/6] git commit: STREAMS-131 | Updated stop to cleanup tasks and shutdown handler

Repository: incubator-streams
Updated Branches:
  refs/heads/STREAMS-131 [created] 32bf3c3fa


STREAMS-131 | Updated stop to cleanup tasks and shutdown handler


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a2e0bfc2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a2e0bfc2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a2e0bfc2

Branch: refs/heads/STREAMS-131
Commit: a2e0bfc248525a80f6db21540b37c1ba2aa8d274
Parents: 38ed41a
Author: mfranklin <mf...@apache.org>
Authored: Thu Jul 10 09:32:00 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jul 10 09:32:00 2014 -0400

----------------------------------------------------------------------
 .../local/builders/LocalStreamBuilder.java      | 34 +++++++++++++-------
 .../local/tasks/StreamsPersistWriterTask.java   |  1 +
 .../local/tasks/StreamsProcessorTask.java       |  1 +
 .../local/tasks/StreamsProviderTask.java        |  1 +
 4 files changed, 25 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a2e0bfc2/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index c287a34..81d325f 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -55,6 +55,7 @@ public class LocalStreamBuilder implements StreamBuilder {
     private int monitorTasks;
     private LocalStreamProcessMonitorThread monitorThread;
     private Map<String, List<StreamsTask>> tasks;
+    private Thread shutdownHook;
 
     /**
      *
@@ -91,6 +92,14 @@ public class LocalStreamBuilder implements StreamBuilder {
         this.streamConfig = streamConfig;
         this.totalTasks = 0;
         this.monitorTasks = 0;
+        final LocalStreamBuilder self = this;
+        this.shutdownHook = new Thread() {
+            @Override
+            public void run() {
+                LOGGER.debug("Shutdown hook received.  Beginning shutdown");
+                self.stop();
+            }
+        };
     }
 
     @Override
@@ -186,24 +195,23 @@ public class LocalStreamBuilder implements StreamBuilder {
                     Thread.sleep(3000);
                 }
             }
-            LOGGER.debug("Components are no longer running or timed out due to completion");
-            shutdown(tasks);
+            LOGGER.debug("Components are no longer running or timed out");
         } catch (InterruptedException e){
-            forceShutdown(tasks);
+            LOGGER.warn("Runtime interrupted.  Beginning shutdown");
+        } finally{
+            stop();
         }
 
     }
 
     private void attachShutdownHandler() {
-        final LocalStreamBuilder self = this;
         LOGGER.debug("Attaching shutdown handler");
-        Runtime.getRuntime().addShutdownHook(new Thread(){
-            @Override
-            public void run() {
-                LOGGER.debug("Shutdown hook received.  Beginning shutdown");
-                self.stop();
-            }
-        });
+        Runtime.getRuntime().addShutdownHook(shutdownHook);
+    }
+
+    private void detachShutdownHandler() {
+        LOGGER.debug("Detaching shutdown handler");
+        Runtime.getRuntime().removeShutdownHook(shutdownHook);
     }
 
     protected void forceShutdown(Map<String, List<StreamsTask>> streamsTasks) {
@@ -232,7 +240,7 @@ public class LocalStreamBuilder implements StreamBuilder {
 
     protected void shutdown(Map<String, List<StreamsTask>> streamsTasks) throws InterruptedException {
         LOGGER.info("Attempting to shutdown tasks");
-        monitorThread.shutdown();
+        this.monitorThread.shutdown();
         this.executor.shutdown();
         //complete stream shut down gracfully
         for(StreamComponent prov : this.providers.values()) {
@@ -333,6 +341,8 @@ public class LocalStreamBuilder implements StreamBuilder {
             shutdown(tasks);
         } catch (Exception e) {
             forceShutdown(tasks);
+        } finally {
+            detachShutdownHandler();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a2e0bfc2/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
index 4ec5fe9..bdda66a 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
@@ -123,6 +123,7 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
     @Override
     public void stopTask() {
         this.keepRunning.set(false);
+        this.writer.cleanUp();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a2e0bfc2/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
index 0d3f54a..7fbad16 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
@@ -63,6 +63,7 @@ public class StreamsProcessorTask extends BaseStreamsTask {
     @Override
     public void stopTask() {
         this.keepRunning.set(false);
+        this.processor.cleanUp();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a2e0bfc2/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index 88f6f48..9eba6cd 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -110,6 +110,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
     public void stopTask() {
         LOGGER.debug("Stopping Provider Task for {}", this.provider.getClass().getSimpleName());
         this.keepRunning.set(false);
+        this.provider.cleanUp();
     }
 
     @Override


[3/6] git commit: STREAMS-131 | Closing connections

Posted by mf...@apache.org.
STREAMS-131 | Closing connections


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/78aa10f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/78aa10f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/78aa10f4

Branch: refs/heads/STREAMS-131
Commit: 78aa10f4f371b1b7aee1c4e21231956dad015268
Parents: a5e6012
Author: mfranklin <mf...@apache.org>
Authored: Thu Jul 10 10:47:29 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jul 10 10:47:29 2014 -0400

----------------------------------------------------------------------
 .../apache/streams/elasticsearch/ElasticsearchPersistReader.java | 3 +++
 .../apache/streams/elasticsearch/ElasticsearchPersistWriter.java | 4 +++-
 .../org/apache/streams/elasticsearch/ElasticsearchQuery.java     | 4 ++++
 3 files changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/78aa10f4/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
index 4196a46..7ba9b33 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
@@ -128,6 +128,9 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Seriali
     public void cleanUp() {
         this.shutdownAndAwaitTermination(executor);
         LOGGER.info("PersistReader done");
+        if(elasticsearchQuery != null) {
+            elasticsearchQuery.cleanUp();
+        }
     }
 
     //The locking may appear to be counter intuitive but we really don't care if multiple threads offer to the queue

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/78aa10f4/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 72145e1..caf5ec3 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -175,6 +175,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
             refreshIndexes();
 
             LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] Orphaned[{}]", this.totalOk.get(), this.totalFailed.get(), this.getTotalOutstanding());
+            manager.stop();
 
         } catch (Throwable e) {
             // this line of code should be logically unreachable.
@@ -248,7 +249,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
                 Thread.sleep(1);
                 counter++;
             } catch(InterruptedException ie) {
-                // No Operation
+                LOGGER.warn("Catchup was interrupted.  Data may be lost");
+                return;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/78aa10f4/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
index fbc62c4..4dd3d28 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
@@ -247,6 +247,10 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
     public void remove() {
     }
 
+    public void cleanUp() {
+        this.elasticsearchClientManager.stop();
+    }
+
     protected boolean isCompleted() {
         return totalRead >= this.limit && hasRecords();
     }


[6/6] git commit: STREAMS-131 | Updated mongo persist writer to close background flush task on cleanup

Posted by mf...@apache.org.
STREAMS-131 | Updated mongo persist writer to close background flush task on cleanup


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/32bf3c3f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/32bf3c3f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/32bf3c3f

Branch: refs/heads/STREAMS-131
Commit: 32bf3c3fa97473b0eea0e272fd67023eef0a0f65
Parents: b45256c
Author: mfranklin <mf...@apache.org>
Authored: Thu Jul 10 13:00:07 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jul 10 13:00:07 2014 -0400

----------------------------------------------------------------------
 .../streams/mongo/MongoPersistWriter.java       | 21 ++++++++++++++++++--
 1 file changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/32bf3c3f/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
index dc8d9cf..773327c 100644
--- a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
+++ b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
@@ -130,13 +130,30 @@ public class MongoPersistWriter implements StreamsPersistWriter, Runnable {
         try {
             flush();
         } catch (IOException e) {
-            e.printStackTrace();
+            LOGGER.error("Error flushing", e);
         }
         try {
             close();
         } catch (IOException e) {
-            e.printStackTrace();
+            LOGGER.error("Error closing", e);
         }
+        try {
+            backgroundFlushTask.shutdown();
+            // Wait a while for existing tasks to terminate
+            if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) {
+                backgroundFlushTask.shutdownNow(); // Cancel currently executing tasks
+                // Wait a while for tasks to respond to being cancelled
+                if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) {
+                    LOGGER.error("Stream did not terminate");
+                }
+            }
+        } catch (InterruptedException ie) {
+            // (Re-)Cancel if current thread also interrupted
+            backgroundFlushTask.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+
     }
 
     @Override


[4/6] git commit: STREAMS-131 | Fixed bug in elasticsearch client manager close

Posted by mf...@apache.org.
STREAMS-131 | Fixed bug in elasticsearch client manager close


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0d1fc31a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0d1fc31a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0d1fc31a

Branch: refs/heads/STREAMS-131
Commit: 0d1fc31a87d50fed37860667e34b5d0e1fff6b35
Parents: 78aa10f
Author: mfranklin <mf...@apache.org>
Authored: Thu Jul 10 11:12:18 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jul 10 11:12:18 2014 -0400

----------------------------------------------------------------------
 .../streams/elasticsearch/ElasticsearchClientManager.java    | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0d1fc31a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
index 340d2b9..d107e70 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
@@ -80,7 +80,7 @@ public class ElasticsearchClientManager {
     }
 
     public boolean isOnOrAfterVersion(Version version) {
-        return ALL_CLIENTS.get(this.elasticsearchConfiguration.toString()).getVersion().onOrAfter(version);
+        return ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getVersion().onOrAfter(version);
     }
 
     public void start() throws Exception {
@@ -110,12 +110,12 @@ public class ElasticsearchClientManager {
     public synchronized void stop() {
         // Terminate the elasticsearch cluster
         // Check to see if we have a client.
-        if (ALL_CLIENTS.containsKey(this.elasticsearchConfiguration.toString())) {
+        if (ALL_CLIENTS.containsKey(this.elasticsearchConfiguration.getClusterName())) {
             // Close the client
-            ALL_CLIENTS.get(this.elasticsearchConfiguration.toString()).getClient().close();
+            ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getClient().close();
 
             // Remove it so that it isn't in memory any more.
-            ALL_CLIENTS.remove(this.elasticsearchConfiguration.toString());
+            ALL_CLIENTS.remove(this.elasticsearchConfiguration.getClusterName());
         }
     }
 


[5/6] git commit: STREAMS-131 | Updated elasticsearch persist writer to close timer

Posted by mf...@apache.org.
STREAMS-131 | Updated elasticsearch persist writer to close timer


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b45256c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b45256c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b45256c3

Branch: refs/heads/STREAMS-131
Commit: b45256c3ab87ba385fdee5170528dcffabecac57
Parents: 0d1fc31
Author: mfranklin <mf...@apache.org>
Authored: Thu Jul 10 11:20:03 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jul 10 11:20:03 2014 -0400

----------------------------------------------------------------------
 .../apache/streams/elasticsearch/ElasticsearchPersistWriter.java    | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b45256c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index caf5ec3..5b90e8b 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -176,6 +176,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
 
             LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] Orphaned[{}]", this.totalOk.get(), this.totalFailed.get(), this.getTotalOutstanding());
             manager.stop();
+            timer.cancel();
 
         } catch (Throwable e) {
             // this line of code should be logically unreachable.


[2/6] git commit: STREAMS-131 | Added flag to shutdown to indicate if the JVM is shutting down

Posted by mf...@apache.org.
STREAMS-131 | Added flag to shutdown to indicate if the JVM is shutting down


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a5e60126
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a5e60126
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a5e60126

Branch: refs/heads/STREAMS-131
Commit: a5e60126847d40c079ac6d05dc5ece92f62607e9
Parents: a2e0bfc
Author: mfranklin <mf...@apache.org>
Authored: Thu Jul 10 09:52:46 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jul 10 09:52:46 2014 -0400

----------------------------------------------------------------------
 .../apache/streams/local/builders/LocalStreamBuilder.java | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a5e60126/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 81d325f..25f9fe7 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -97,7 +97,7 @@ public class LocalStreamBuilder implements StreamBuilder {
             @Override
             public void run() {
                 LOGGER.debug("Shutdown hook received.  Beginning shutdown");
-                self.stop();
+                self.stopInternal(true);
             }
         };
     }
@@ -337,12 +337,18 @@ public class LocalStreamBuilder implements StreamBuilder {
      */
     @Override
     public void stop() {
+        stopInternal(false);
+    }
+
+    protected void stopInternal(boolean systemExiting) {
         try {
             shutdown(tasks);
         } catch (Exception e) {
             forceShutdown(tasks);
         } finally {
-            detachShutdownHandler();
+            if(!systemExiting) {
+                detachShutdownHandler();
+            }
         }
     }