You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/07/21 20:29:54 UTC

[01/11] git commit: STREAMS-131 | Updated stop to cleanup tasks and shutdown handler

Repository: incubator-streams
Updated Branches:
  refs/heads/master 7c5e29007 -> da3bc91a8


STREAMS-131 | Updated stop to cleanup tasks and shutdown handler


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a2e0bfc2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a2e0bfc2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a2e0bfc2

Branch: refs/heads/master
Commit: a2e0bfc248525a80f6db21540b37c1ba2aa8d274
Parents: 38ed41a
Author: mfranklin <mf...@apache.org>
Authored: Thu Jul 10 09:32:00 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jul 10 09:32:00 2014 -0400

----------------------------------------------------------------------
 .../local/builders/LocalStreamBuilder.java      | 34 +++++++++++++-------
 .../local/tasks/StreamsPersistWriterTask.java   |  1 +
 .../local/tasks/StreamsProcessorTask.java       |  1 +
 .../local/tasks/StreamsProviderTask.java        |  1 +
 4 files changed, 25 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a2e0bfc2/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index c287a34..81d325f 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -55,6 +55,7 @@ public class LocalStreamBuilder implements StreamBuilder {
     private int monitorTasks;
     private LocalStreamProcessMonitorThread monitorThread;
     private Map<String, List<StreamsTask>> tasks;
+    private Thread shutdownHook;
 
     /**
      *
@@ -91,6 +92,14 @@ public class LocalStreamBuilder implements StreamBuilder {
         this.streamConfig = streamConfig;
         this.totalTasks = 0;
         this.monitorTasks = 0;
+        final LocalStreamBuilder self = this;
+        this.shutdownHook = new Thread() {
+            @Override
+            public void run() {
+                LOGGER.debug("Shutdown hook received.  Beginning shutdown");
+                self.stop();
+            }
+        };
     }
 
     @Override
@@ -186,24 +195,23 @@ public class LocalStreamBuilder implements StreamBuilder {
                     Thread.sleep(3000);
                 }
             }
-            LOGGER.debug("Components are no longer running or timed out due to completion");
-            shutdown(tasks);
+            LOGGER.debug("Components are no longer running or timed out");
         } catch (InterruptedException e){
-            forceShutdown(tasks);
+            LOGGER.warn("Runtime interrupted.  Beginning shutdown");
+        } finally{
+            stop();
         }
 
     }
 
     private void attachShutdownHandler() {
-        final LocalStreamBuilder self = this;
         LOGGER.debug("Attaching shutdown handler");
-        Runtime.getRuntime().addShutdownHook(new Thread(){
-            @Override
-            public void run() {
-                LOGGER.debug("Shutdown hook received.  Beginning shutdown");
-                self.stop();
-            }
-        });
+        Runtime.getRuntime().addShutdownHook(shutdownHook);
+    }
+
+    private void detachShutdownHandler() {
+        LOGGER.debug("Detaching shutdown handler");
+        Runtime.getRuntime().removeShutdownHook(shutdownHook);
     }
 
     protected void forceShutdown(Map<String, List<StreamsTask>> streamsTasks) {
@@ -232,7 +240,7 @@ public class LocalStreamBuilder implements StreamBuilder {
 
     protected void shutdown(Map<String, List<StreamsTask>> streamsTasks) throws InterruptedException {
         LOGGER.info("Attempting to shutdown tasks");
-        monitorThread.shutdown();
+        this.monitorThread.shutdown();
         this.executor.shutdown();
         //complete stream shut down gracfully
         for(StreamComponent prov : this.providers.values()) {
@@ -333,6 +341,8 @@ public class LocalStreamBuilder implements StreamBuilder {
             shutdown(tasks);
         } catch (Exception e) {
             forceShutdown(tasks);
+        } finally {
+            detachShutdownHandler();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a2e0bfc2/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
index 4ec5fe9..bdda66a 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
@@ -123,6 +123,7 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
     @Override
     public void stopTask() {
         this.keepRunning.set(false);
+        this.writer.cleanUp();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a2e0bfc2/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
index 0d3f54a..7fbad16 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
@@ -63,6 +63,7 @@ public class StreamsProcessorTask extends BaseStreamsTask {
     @Override
     public void stopTask() {
         this.keepRunning.set(false);
+        this.processor.cleanUp();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a2e0bfc2/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index 88f6f48..9eba6cd 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -110,6 +110,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
     public void stopTask() {
         LOGGER.debug("Stopping Provider Task for {}", this.provider.getClass().getSimpleName());
         this.keepRunning.set(false);
+        this.provider.cleanUp();
     }
 
     @Override


[07/11] git commit: STREAMS-131 | Removed errant change of cleanup call location

Posted by mf...@apache.org.
STREAMS-131 | Removed errant change of cleanup call location


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d7c073c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d7c073c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d7c073c5

Branch: refs/heads/master
Commit: d7c073c58888a0c61fa7d0e4eed4ab8277ca2a6d
Parents: 32bf3c3
Author: mfranklin <mf...@apache.org>
Authored: Thu Jul 10 14:55:25 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jul 10 14:55:25 2014 -0400

----------------------------------------------------------------------
 .../org/apache/streams/local/tasks/StreamsPersistWriterTask.java  | 1 -
 .../java/org/apache/streams/local/tasks/StreamsProcessorTask.java | 3 +--
 .../java/org/apache/streams/local/tasks/StreamsProviderTask.java  | 1 -
 3 files changed, 1 insertion(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d7c073c5/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
index bdda66a..4ec5fe9 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
@@ -123,7 +123,6 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
     @Override
     public void stopTask() {
         this.keepRunning.set(false);
-        this.writer.cleanUp();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d7c073c5/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
index 7fbad16..f3004b4 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
@@ -63,7 +63,6 @@ public class StreamsProcessorTask extends BaseStreamsTask {
     @Override
     public void stopTask() {
         this.keepRunning.set(false);
-        this.processor.cleanUp();
     }
 
     @Override
@@ -106,8 +105,8 @@ public class StreamsProcessorTask extends BaseStreamsTask {
             }
 
         } finally {
-            this.processor.cleanUp();
             this.isRunning.set(false);
+            this.processor.cleanUp();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d7c073c5/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index 9eba6cd..88f6f48 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -110,7 +110,6 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
     public void stopTask() {
         LOGGER.debug("Stopping Provider Task for {}", this.provider.getClass().getSimpleName());
         this.keepRunning.set(false);
-        this.provider.cleanUp();
     }
 
     @Override


[06/11] git commit: STREAMS-131 | Updated mongo persist writer to close background flush task on cleanup

Posted by mf...@apache.org.
STREAMS-131 | Updated mongo persist writer to close background flush task on cleanup


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/32bf3c3f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/32bf3c3f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/32bf3c3f

Branch: refs/heads/master
Commit: 32bf3c3fa97473b0eea0e272fd67023eef0a0f65
Parents: b45256c
Author: mfranklin <mf...@apache.org>
Authored: Thu Jul 10 13:00:07 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jul 10 13:00:07 2014 -0400

----------------------------------------------------------------------
 .../streams/mongo/MongoPersistWriter.java       | 21 ++++++++++++++++++--
 1 file changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/32bf3c3f/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
index dc8d9cf..773327c 100644
--- a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
+++ b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
@@ -130,13 +130,30 @@ public class MongoPersistWriter implements StreamsPersistWriter, Runnable {
         try {
             flush();
         } catch (IOException e) {
-            e.printStackTrace();
+            LOGGER.error("Error flushing", e);
         }
         try {
             close();
         } catch (IOException e) {
-            e.printStackTrace();
+            LOGGER.error("Error closing", e);
         }
+        try {
+            backgroundFlushTask.shutdown();
+            // Wait a while for existing tasks to terminate
+            if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) {
+                backgroundFlushTask.shutdownNow(); // Cancel currently executing tasks
+                // Wait a while for tasks to respond to being cancelled
+                if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) {
+                    LOGGER.error("Stream did not terminate");
+                }
+            }
+        } catch (InterruptedException ie) {
+            // (Re-)Cancel if current thread also interrupted
+            backgroundFlushTask.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+
     }
 
     @Override


[10/11] git commit: STREAMS-131 | Updated per code review comment

Posted by mf...@apache.org.
STREAMS-131 | Updated per code review comment


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e1f8eea7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e1f8eea7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e1f8eea7

Branch: refs/heads/master
Commit: e1f8eea7c1eb13608f6396472f7a9df3a07a4b44
Parents: c343a1b
Author: mfranklin <mf...@apache.org>
Authored: Fri Jul 18 12:13:23 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Fri Jul 18 12:13:23 2014 -0400

----------------------------------------------------------------------
 .../java/org/apache/streams/local/tasks/StreamsProviderTask.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e1f8eea7/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index 88f6f48..f5189eb 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -174,8 +174,8 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
             LOGGER.error("Error in processing provider stream", e);
         } finally {
             LOGGER.debug("Complete Provider Task execution for {}", this.provider.getClass().getSimpleName());
-            this.keepRunning.set(false);
             this.provider.cleanUp();
+            this.keepRunning.set(false);
         }
     }
 


[05/11] git commit: STREAMS-131 | Updated elasticsearch persist writer to close timer

Posted by mf...@apache.org.
STREAMS-131 | Updated elasticsearch persist writer to close timer


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b45256c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b45256c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b45256c3

Branch: refs/heads/master
Commit: b45256c3ab87ba385fdee5170528dcffabecac57
Parents: 0d1fc31
Author: mfranklin <mf...@apache.org>
Authored: Thu Jul 10 11:20:03 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jul 10 11:20:03 2014 -0400

----------------------------------------------------------------------
 .../apache/streams/elasticsearch/ElasticsearchPersistWriter.java    | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b45256c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index caf5ec3..5b90e8b 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -176,6 +176,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
 
             LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] Orphaned[{}]", this.totalOk.get(), this.totalFailed.get(), this.getTotalOutstanding());
             manager.stop();
+            timer.cancel();
 
         } catch (Throwable e) {
             // this line of code should be logically unreachable.


[09/11] git commit: STREAMS-131 | Fixed incorrect closing of Sysomos reader task

Posted by mf...@apache.org.
STREAMS-131 | Fixed incorrect closing of Sysomos reader task


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c343a1b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c343a1b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c343a1b0

Branch: refs/heads/master
Commit: c343a1b0063eeaac8f08ee84cc4fafcc5923d266
Parents: d41890d
Author: mfranklin <mf...@apache.org>
Authored: Mon Jul 14 09:02:46 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jul 14 09:02:46 2014 -0400

----------------------------------------------------------------------
 .../org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java | 1 -
 1 file changed, 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c343a1b0/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
index 1dde4fa..3478671 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
@@ -79,7 +79,6 @@ public class SysomosHeartbeatStream implements Runnable {
             executeRun();
         } catch (Exception e) {
             LOGGER.error("Error executing heartbeat stream", e);
-        } finally {
             shutdown();
         }
     }


[11/11] git commit: Merge remote-tracking branch 'origin/STREAMS-131'

Posted by mf...@apache.org.
Merge remote-tracking branch 'origin/STREAMS-131'


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/da3bc91a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/da3bc91a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/da3bc91a

Branch: refs/heads/master
Commit: da3bc91a888df7c4d2fe3fb38cf340cc7de00477
Parents: 7c5e290 e1f8eea
Author: mfranklin <mf...@apache.org>
Authored: Mon Jul 21 13:31:11 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jul 21 13:31:11 2014 -0400

----------------------------------------------------------------------
 .../ElasticsearchClientManager.java             |  8 ++--
 .../ElasticsearchPersistReader.java             |  3 ++
 .../ElasticsearchPersistWriter.java             |  4 +-
 .../elasticsearch/ElasticsearchQuery.java       |  3 ++
 .../streams/mongo/MongoPersistWriter.java       | 21 +++++++++-
 .../provider/SysomosHeartbeatStream.java        |  1 -
 .../local/builders/LocalStreamBuilder.java      | 40 ++++++++++++++------
 .../local/tasks/StreamsProcessorTask.java       |  2 +-
 .../local/tasks/StreamsProviderTask.java        |  2 +-
 9 files changed, 62 insertions(+), 22 deletions(-)
----------------------------------------------------------------------



[08/11] git commit: STREAMS-131 | Because the elastic search components use the same static client, closing the reader can close the writer

Posted by mf...@apache.org.
STREAMS-131 | Because the elastic search components use the same static client, closing the reader can close the writer


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d41890d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d41890d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d41890d9

Branch: refs/heads/master
Commit: d41890d9adafc94a7d09227e57d0b811f293dbb9
Parents: d7c073c
Author: mfranklin <mf...@apache.org>
Authored: Mon Jul 14 08:23:01 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jul 14 08:23:01 2014 -0400

----------------------------------------------------------------------
 .../apache/streams/elasticsearch/ElasticsearchPersistWriter.java    | 1 -
 .../java/org/apache/streams/elasticsearch/ElasticsearchQuery.java   | 1 -
 2 files changed, 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d41890d9/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 5b90e8b..05b0ef2 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -175,7 +175,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
             refreshIndexes();
 
             LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] Orphaned[{}]", this.totalOk.get(), this.totalFailed.get(), this.getTotalOutstanding());
-            manager.stop();
             timer.cancel();
 
         } catch (Throwable e) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d41890d9/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
index 4dd3d28..defd9dc 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
@@ -248,7 +248,6 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
     }
 
     public void cleanUp() {
-        this.elasticsearchClientManager.stop();
     }
 
     protected boolean isCompleted() {


[02/11] git commit: STREAMS-131 | Added flag to shutdown to indicate if the JVM is shutting down

Posted by mf...@apache.org.
STREAMS-131 | Added flag to shutdown to indicate if the JVM is shutting down


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a5e60126
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a5e60126
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a5e60126

Branch: refs/heads/master
Commit: a5e60126847d40c079ac6d05dc5ece92f62607e9
Parents: a2e0bfc
Author: mfranklin <mf...@apache.org>
Authored: Thu Jul 10 09:52:46 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jul 10 09:52:46 2014 -0400

----------------------------------------------------------------------
 .../apache/streams/local/builders/LocalStreamBuilder.java | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a5e60126/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 81d325f..25f9fe7 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -97,7 +97,7 @@ public class LocalStreamBuilder implements StreamBuilder {
             @Override
             public void run() {
                 LOGGER.debug("Shutdown hook received.  Beginning shutdown");
-                self.stop();
+                self.stopInternal(true);
             }
         };
     }
@@ -337,12 +337,18 @@ public class LocalStreamBuilder implements StreamBuilder {
      */
     @Override
     public void stop() {
+        stopInternal(false);
+    }
+
+    protected void stopInternal(boolean systemExiting) {
         try {
             shutdown(tasks);
         } catch (Exception e) {
             forceShutdown(tasks);
         } finally {
-            detachShutdownHandler();
+            if(!systemExiting) {
+                detachShutdownHandler();
+            }
         }
     }
 


[03/11] git commit: STREAMS-131 | Closing connections

Posted by mf...@apache.org.
STREAMS-131 | Closing connections


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/78aa10f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/78aa10f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/78aa10f4

Branch: refs/heads/master
Commit: 78aa10f4f371b1b7aee1c4e21231956dad015268
Parents: a5e6012
Author: mfranklin <mf...@apache.org>
Authored: Thu Jul 10 10:47:29 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jul 10 10:47:29 2014 -0400

----------------------------------------------------------------------
 .../apache/streams/elasticsearch/ElasticsearchPersistReader.java | 3 +++
 .../apache/streams/elasticsearch/ElasticsearchPersistWriter.java | 4 +++-
 .../org/apache/streams/elasticsearch/ElasticsearchQuery.java     | 4 ++++
 3 files changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/78aa10f4/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
index 4196a46..7ba9b33 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
@@ -128,6 +128,9 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Seriali
     public void cleanUp() {
         this.shutdownAndAwaitTermination(executor);
         LOGGER.info("PersistReader done");
+        if(elasticsearchQuery != null) {
+            elasticsearchQuery.cleanUp();
+        }
     }
 
     //The locking may appear to be counter intuitive but we really don't care if multiple threads offer to the queue

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/78aa10f4/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 72145e1..caf5ec3 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -175,6 +175,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
             refreshIndexes();
 
             LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] Orphaned[{}]", this.totalOk.get(), this.totalFailed.get(), this.getTotalOutstanding());
+            manager.stop();
 
         } catch (Throwable e) {
             // this line of code should be logically unreachable.
@@ -248,7 +249,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
                 Thread.sleep(1);
                 counter++;
             } catch(InterruptedException ie) {
-                // No Operation
+                LOGGER.warn("Catchup was interrupted.  Data may be lost");
+                return;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/78aa10f4/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
index fbc62c4..4dd3d28 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
@@ -247,6 +247,10 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
     public void remove() {
     }
 
+    public void cleanUp() {
+        this.elasticsearchClientManager.stop();
+    }
+
     protected boolean isCompleted() {
         return totalRead >= this.limit && hasRecords();
     }


[04/11] git commit: STREAMS-131 | Fixed bug in elasticsearch client manager close

Posted by mf...@apache.org.
STREAMS-131 | Fixed bug in elasticsearch client manager close


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0d1fc31a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0d1fc31a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0d1fc31a

Branch: refs/heads/master
Commit: 0d1fc31a87d50fed37860667e34b5d0e1fff6b35
Parents: 78aa10f
Author: mfranklin <mf...@apache.org>
Authored: Thu Jul 10 11:12:18 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jul 10 11:12:18 2014 -0400

----------------------------------------------------------------------
 .../streams/elasticsearch/ElasticsearchClientManager.java    | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0d1fc31a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
index 340d2b9..d107e70 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
@@ -80,7 +80,7 @@ public class ElasticsearchClientManager {
     }
 
     public boolean isOnOrAfterVersion(Version version) {
-        return ALL_CLIENTS.get(this.elasticsearchConfiguration.toString()).getVersion().onOrAfter(version);
+        return ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getVersion().onOrAfter(version);
     }
 
     public void start() throws Exception {
@@ -110,12 +110,12 @@ public class ElasticsearchClientManager {
     public synchronized void stop() {
         // Terminate the elasticsearch cluster
         // Check to see if we have a client.
-        if (ALL_CLIENTS.containsKey(this.elasticsearchConfiguration.toString())) {
+        if (ALL_CLIENTS.containsKey(this.elasticsearchConfiguration.getClusterName())) {
             // Close the client
-            ALL_CLIENTS.get(this.elasticsearchConfiguration.toString()).getClient().close();
+            ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getClient().close();
 
             // Remove it so that it isn't in memory any more.
-            ALL_CLIENTS.remove(this.elasticsearchConfiguration.toString());
+            ALL_CLIENTS.remove(this.elasticsearchConfiguration.getClusterName());
         }
     }