You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/12/30 17:05:08 UTC

[1/4] camel git commit: ZooKeeperProducer does not stop ZooKeeper threads if Camel context ist stopped. Added overloaded doStop() method to shutdown ZooKeeperConnectionManager

Repository: camel
Updated Branches:
  refs/heads/camel-2.16.x 5c4f00876 -> cb6fc94c4
  refs/heads/master 5e2047aa1 -> bc5828966


ZooKeeperProducer does not stop ZooKeeper threads if Camel context ist stopped. Added overloaded doStop() method to shutdown ZooKeeperConnectionManager


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/249972fb
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/249972fb
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/249972fb

Branch: refs/heads/master
Commit: 249972fb796a7594d7c4574e7896bf9e550eda70
Parents: 5e2047a
Author: Klaus Schröder <kl...@next-audience.com>
Authored: Thu Dec 19 14:13:16 2013 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 30 16:56:21 2015 +0100

----------------------------------------------------------------------
 .../apache/camel/component/zookeeper/ZookeeperProducer.java | 9 +++++++++
 1 file changed, 9 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/249972fb/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
index d147b4f..aaf368b 100644
--- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
@@ -119,6 +119,15 @@ public class ZookeeperProducer extends DefaultProducer {
         }
     }
 
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        if (log.isTraceEnabled()) {
+            log.trace(String.format("Shutting down zookeeper producer of '%s'", configuration.getPath()));
+        }
+        zkm.shutdown();
+    }
+
     private void asynchronouslyDeleteNode(ZooKeeper connection, ProductionContext context) {
         if (log.isDebugEnabled()) {
             log.debug(format("Deleting node '%s', not waiting for confirmation", context.node));


[2/4] camel git commit: ZooKeeper should close connection when stopping producer/consumer. Fixes #72.

Posted by da...@apache.org.
ZooKeeper should close connection when stopping producer/consumer. Fixes #72.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bc582896
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bc582896
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bc582896

Branch: refs/heads/master
Commit: bc58289663cbdf2286fea926fd33579b8811a055
Parents: 249972f
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Dec 30 17:04:18 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 30 17:04:18 2015 +0100

----------------------------------------------------------------------
 .../component/zookeeper/ConnectionHolder.java    |  1 +
 .../zookeeper/ZooKeeperConnectionManager.java    |  3 ++-
 .../component/zookeeper/ZooKeeperConsumer.java   | 19 ++++++++-----------
 .../component/zookeeper/ZookeeperProducer.java   | 12 +-----------
 4 files changed, 12 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/bc582896/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java
index 2e69e31..bc92b99 100644
--- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java
@@ -87,6 +87,7 @@ public class ConnectionHolder implements Watcher {
         try {
             if (zookeeper != null) {
                 zookeeper.close();
+                zookeeper = null;
             }
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Shutting down connection to Zookeeper cluster {}", configuration.getConnectString());

http://git-wip-us.apache.org/repos/asf/camel/blob/bc582896/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java
index cabafdf..3b1ab7f 100644
--- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java
@@ -50,7 +50,7 @@ public class ZooKeeperConnectionManager {
 
         public DefaultZookeeperConnectionStrategy(ZooKeeperEndpoint endpoint) {
             this.configuration = endpoint.getConfiguration();
-            LOG.debug("Creating connection with static configuration of {}", configuration);
+            LOG.debug("Creating connection to ZooKeeper: {}", configuration);
             holder = new ConnectionHolder(configuration);
         }
 
@@ -59,6 +59,7 @@ public class ZooKeeperConnectionManager {
         }
 
         public void shutdown() {
+            LOG.debug("Shutting down connection to ZooKeeper: {}", configuration);
             holder.closeConnection();
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/bc582896/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
index 7fe40d5..bb9507f 100644
--- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
@@ -42,36 +42,32 @@ import org.apache.zookeeper.ZooKeeper;
 @SuppressWarnings("rawtypes")
 public class ZooKeeperConsumer extends DefaultConsumer {
 
-    private ZooKeeperConnectionManager connectionManager;
-
+    private final ZooKeeperConnectionManager zkm;
     private ZooKeeper connection;
-
     private ZooKeeperConfiguration configuration;
-
     private LinkedBlockingQueue<ZooKeeperOperation> operations = new LinkedBlockingQueue<ZooKeeperOperation>();
-
-    private boolean shuttingDown;
-
     private ExecutorService executor;
+    private volatile boolean shuttingDown;
 
     public ZooKeeperConsumer(ZooKeeperEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
-        this.connectionManager = endpoint.getConnectionManager();
+        this.zkm = endpoint.getConnectionManager();
         this.configuration = endpoint.getConfiguration();
     }
 
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        connection = connectionManager.getConnection();
+        connection = zkm.getConnection();
         if (log.isDebugEnabled()) {
             log.debug(String.format("Connected to Zookeeper cluster %s", configuration.getConnectString()));
         }
 
         initializeConsumer();
         executor = getEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(configuration.getPath(), "Camel-Zookeeper Ops executor", 1);
+
         OperationsExecutor opsService = new OperationsExecutor();
-        executor.execute(opsService);
+        executor.submit(opsService);
     }
 
     @Override
@@ -81,7 +77,8 @@ public class ZooKeeperConsumer extends DefaultConsumer {
         if (log.isTraceEnabled()) {
             log.trace(String.format("Shutting down zookeeper consumer of '%s'", configuration.getPath()));
         }
-        executor.shutdown();
+        getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(executor);
+        zkm.shutdown();
     }
 
     private void initializeConsumer() {

http://git-wip-us.apache.org/repos/asf/camel/blob/bc582896/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
index aaf368b..2280049 100644
--- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
@@ -52,10 +52,8 @@ public class ZookeeperProducer extends DefaultProducer {
     public static final String ZK_OPERATION_WRITE  = "WRITE";
     public static final String ZK_OPERATION_DELETE = "DELETE";
 
-    private ZooKeeperConfiguration configuration;
-
+    private final ZooKeeperConfiguration configuration;
     private ZooKeeperConnectionManager zkm;
-    
     private ZooKeeper connection;
 
     public ZookeeperProducer(ZooKeeperEndpoint endpoint) {
@@ -117,14 +115,6 @@ public class ZookeeperProducer extends DefaultProducer {
         if (log.isTraceEnabled()) {
             log.trace(String.format("Shutting down zookeeper producer of '%s'", configuration.getPath()));
         }
-    }
-
-    @Override
-    protected void doStop() throws Exception {
-        super.doStop();
-        if (log.isTraceEnabled()) {
-            log.trace(String.format("Shutting down zookeeper producer of '%s'", configuration.getPath()));
-        }
         zkm.shutdown();
     }
 


[4/4] camel git commit: ZooKeeper should close connection when stopping producer/consumer. Fixes #72.

Posted by da...@apache.org.
ZooKeeper should close connection when stopping producer/consumer. Fixes #72.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cb6fc94c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cb6fc94c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cb6fc94c

Branch: refs/heads/camel-2.16.x
Commit: cb6fc94c4b788aaef03f048e3899974393c67422
Parents: 3eee448
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Dec 30 17:04:18 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 30 17:04:59 2015 +0100

----------------------------------------------------------------------
 .../component/zookeeper/ConnectionHolder.java    |  1 +
 .../zookeeper/ZooKeeperConnectionManager.java    |  3 ++-
 .../component/zookeeper/ZooKeeperConsumer.java   | 19 ++++++++-----------
 .../component/zookeeper/ZookeeperProducer.java   | 12 +-----------
 4 files changed, 12 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/cb6fc94c/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java
index 2e69e31..bc92b99 100644
--- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java
@@ -87,6 +87,7 @@ public class ConnectionHolder implements Watcher {
         try {
             if (zookeeper != null) {
                 zookeeper.close();
+                zookeeper = null;
             }
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Shutting down connection to Zookeeper cluster {}", configuration.getConnectString());

http://git-wip-us.apache.org/repos/asf/camel/blob/cb6fc94c/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java
index cabafdf..3b1ab7f 100644
--- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java
@@ -50,7 +50,7 @@ public class ZooKeeperConnectionManager {
 
         public DefaultZookeeperConnectionStrategy(ZooKeeperEndpoint endpoint) {
             this.configuration = endpoint.getConfiguration();
-            LOG.debug("Creating connection with static configuration of {}", configuration);
+            LOG.debug("Creating connection to ZooKeeper: {}", configuration);
             holder = new ConnectionHolder(configuration);
         }
 
@@ -59,6 +59,7 @@ public class ZooKeeperConnectionManager {
         }
 
         public void shutdown() {
+            LOG.debug("Shutting down connection to ZooKeeper: {}", configuration);
             holder.closeConnection();
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/cb6fc94c/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
index 7fe40d5..bb9507f 100644
--- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
@@ -42,36 +42,32 @@ import org.apache.zookeeper.ZooKeeper;
 @SuppressWarnings("rawtypes")
 public class ZooKeeperConsumer extends DefaultConsumer {
 
-    private ZooKeeperConnectionManager connectionManager;
-
+    private final ZooKeeperConnectionManager zkm;
     private ZooKeeper connection;
-
     private ZooKeeperConfiguration configuration;
-
     private LinkedBlockingQueue<ZooKeeperOperation> operations = new LinkedBlockingQueue<ZooKeeperOperation>();
-
-    private boolean shuttingDown;
-
     private ExecutorService executor;
+    private volatile boolean shuttingDown;
 
     public ZooKeeperConsumer(ZooKeeperEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
-        this.connectionManager = endpoint.getConnectionManager();
+        this.zkm = endpoint.getConnectionManager();
         this.configuration = endpoint.getConfiguration();
     }
 
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        connection = connectionManager.getConnection();
+        connection = zkm.getConnection();
         if (log.isDebugEnabled()) {
             log.debug(String.format("Connected to Zookeeper cluster %s", configuration.getConnectString()));
         }
 
         initializeConsumer();
         executor = getEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(configuration.getPath(), "Camel-Zookeeper Ops executor", 1);
+
         OperationsExecutor opsService = new OperationsExecutor();
-        executor.execute(opsService);
+        executor.submit(opsService);
     }
 
     @Override
@@ -81,7 +77,8 @@ public class ZooKeeperConsumer extends DefaultConsumer {
         if (log.isTraceEnabled()) {
             log.trace(String.format("Shutting down zookeeper consumer of '%s'", configuration.getPath()));
         }
-        executor.shutdown();
+        getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(executor);
+        zkm.shutdown();
     }
 
     private void initializeConsumer() {

http://git-wip-us.apache.org/repos/asf/camel/blob/cb6fc94c/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
index aaf368b..2280049 100644
--- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
@@ -52,10 +52,8 @@ public class ZookeeperProducer extends DefaultProducer {
     public static final String ZK_OPERATION_WRITE  = "WRITE";
     public static final String ZK_OPERATION_DELETE = "DELETE";
 
-    private ZooKeeperConfiguration configuration;
-
+    private final ZooKeeperConfiguration configuration;
     private ZooKeeperConnectionManager zkm;
-    
     private ZooKeeper connection;
 
     public ZookeeperProducer(ZooKeeperEndpoint endpoint) {
@@ -117,14 +115,6 @@ public class ZookeeperProducer extends DefaultProducer {
         if (log.isTraceEnabled()) {
             log.trace(String.format("Shutting down zookeeper producer of '%s'", configuration.getPath()));
         }
-    }
-
-    @Override
-    protected void doStop() throws Exception {
-        super.doStop();
-        if (log.isTraceEnabled()) {
-            log.trace(String.format("Shutting down zookeeper producer of '%s'", configuration.getPath()));
-        }
         zkm.shutdown();
     }
 


[3/4] camel git commit: ZooKeeperProducer does not stop ZooKeeper threads if Camel context ist stopped. Added overloaded doStop() method to shutdown ZooKeeperConnectionManager

Posted by da...@apache.org.
ZooKeeperProducer does not stop ZooKeeper threads if Camel context ist stopped. Added overloaded doStop() method to shutdown ZooKeeperConnectionManager


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3eee448b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3eee448b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3eee448b

Branch: refs/heads/camel-2.16.x
Commit: 3eee448bc384491864a5ea53b206a9a3816077fe
Parents: 5c4f008
Author: Klaus Schröder <kl...@next-audience.com>
Authored: Thu Dec 19 14:13:16 2013 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 30 17:04:52 2015 +0100

----------------------------------------------------------------------
 .../apache/camel/component/zookeeper/ZookeeperProducer.java | 9 +++++++++
 1 file changed, 9 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3eee448b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
index d147b4f..aaf368b 100644
--- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
@@ -119,6 +119,15 @@ public class ZookeeperProducer extends DefaultProducer {
         }
     }
 
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        if (log.isTraceEnabled()) {
+            log.trace(String.format("Shutting down zookeeper producer of '%s'", configuration.getPath()));
+        }
+        zkm.shutdown();
+    }
+
     private void asynchronouslyDeleteNode(ZooKeeper connection, ProductionContext context) {
         if (log.isDebugEnabled()) {
             log.debug(format("Deleting node '%s', not waiting for confirmation", context.node));