You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/12/30 17:05:08 UTC
[1/4] camel git commit: ZooKeeperProducer does not stop ZooKeeper
threads if Camel context ist stopped. Added overloaded doStop() method to
shutdown ZooKeeperConnectionManager
Repository: camel
Updated Branches:
refs/heads/camel-2.16.x 5c4f00876 -> cb6fc94c4
refs/heads/master 5e2047aa1 -> bc5828966
ZooKeeperProducer does not stop ZooKeeper threads if Camel context ist stopped. Added overloaded doStop() method to shutdown ZooKeeperConnectionManager
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/249972fb
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/249972fb
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/249972fb
Branch: refs/heads/master
Commit: 249972fb796a7594d7c4574e7896bf9e550eda70
Parents: 5e2047a
Author: Klaus Schröder <kl...@next-audience.com>
Authored: Thu Dec 19 14:13:16 2013 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 30 16:56:21 2015 +0100
----------------------------------------------------------------------
.../apache/camel/component/zookeeper/ZookeeperProducer.java | 9 +++++++++
1 file changed, 9 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/249972fb/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
index d147b4f..aaf368b 100644
--- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
@@ -119,6 +119,15 @@ public class ZookeeperProducer extends DefaultProducer {
}
}
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+ if (log.isTraceEnabled()) {
+ log.trace(String.format("Shutting down zookeeper producer of '%s'", configuration.getPath()));
+ }
+ zkm.shutdown();
+ }
+
private void asynchronouslyDeleteNode(ZooKeeper connection, ProductionContext context) {
if (log.isDebugEnabled()) {
log.debug(format("Deleting node '%s', not waiting for confirmation", context.node));
[2/4] camel git commit: ZooKeeper should close connection when
stopping producer/consumer. Fixes #72.
Posted by da...@apache.org.
ZooKeeper should close connection when stopping producer/consumer. Fixes #72.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bc582896
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bc582896
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bc582896
Branch: refs/heads/master
Commit: bc58289663cbdf2286fea926fd33579b8811a055
Parents: 249972f
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Dec 30 17:04:18 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 30 17:04:18 2015 +0100
----------------------------------------------------------------------
.../component/zookeeper/ConnectionHolder.java | 1 +
.../zookeeper/ZooKeeperConnectionManager.java | 3 ++-
.../component/zookeeper/ZooKeeperConsumer.java | 19 ++++++++-----------
.../component/zookeeper/ZookeeperProducer.java | 12 +-----------
4 files changed, 12 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/bc582896/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java
index 2e69e31..bc92b99 100644
--- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java
@@ -87,6 +87,7 @@ public class ConnectionHolder implements Watcher {
try {
if (zookeeper != null) {
zookeeper.close();
+ zookeeper = null;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Shutting down connection to Zookeeper cluster {}", configuration.getConnectString());
http://git-wip-us.apache.org/repos/asf/camel/blob/bc582896/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java
index cabafdf..3b1ab7f 100644
--- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java
@@ -50,7 +50,7 @@ public class ZooKeeperConnectionManager {
public DefaultZookeeperConnectionStrategy(ZooKeeperEndpoint endpoint) {
this.configuration = endpoint.getConfiguration();
- LOG.debug("Creating connection with static configuration of {}", configuration);
+ LOG.debug("Creating connection to ZooKeeper: {}", configuration);
holder = new ConnectionHolder(configuration);
}
@@ -59,6 +59,7 @@ public class ZooKeeperConnectionManager {
}
public void shutdown() {
+ LOG.debug("Shutting down connection to ZooKeeper: {}", configuration);
holder.closeConnection();
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/bc582896/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
index 7fe40d5..bb9507f 100644
--- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
@@ -42,36 +42,32 @@ import org.apache.zookeeper.ZooKeeper;
@SuppressWarnings("rawtypes")
public class ZooKeeperConsumer extends DefaultConsumer {
- private ZooKeeperConnectionManager connectionManager;
-
+ private final ZooKeeperConnectionManager zkm;
private ZooKeeper connection;
-
private ZooKeeperConfiguration configuration;
-
private LinkedBlockingQueue<ZooKeeperOperation> operations = new LinkedBlockingQueue<ZooKeeperOperation>();
-
- private boolean shuttingDown;
-
private ExecutorService executor;
+ private volatile boolean shuttingDown;
public ZooKeeperConsumer(ZooKeeperEndpoint endpoint, Processor processor) {
super(endpoint, processor);
- this.connectionManager = endpoint.getConnectionManager();
+ this.zkm = endpoint.getConnectionManager();
this.configuration = endpoint.getConfiguration();
}
@Override
protected void doStart() throws Exception {
super.doStart();
- connection = connectionManager.getConnection();
+ connection = zkm.getConnection();
if (log.isDebugEnabled()) {
log.debug(String.format("Connected to Zookeeper cluster %s", configuration.getConnectString()));
}
initializeConsumer();
executor = getEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(configuration.getPath(), "Camel-Zookeeper Ops executor", 1);
+
OperationsExecutor opsService = new OperationsExecutor();
- executor.execute(opsService);
+ executor.submit(opsService);
}
@Override
@@ -81,7 +77,8 @@ public class ZooKeeperConsumer extends DefaultConsumer {
if (log.isTraceEnabled()) {
log.trace(String.format("Shutting down zookeeper consumer of '%s'", configuration.getPath()));
}
- executor.shutdown();
+ getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(executor);
+ zkm.shutdown();
}
private void initializeConsumer() {
http://git-wip-us.apache.org/repos/asf/camel/blob/bc582896/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
index aaf368b..2280049 100644
--- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
@@ -52,10 +52,8 @@ public class ZookeeperProducer extends DefaultProducer {
public static final String ZK_OPERATION_WRITE = "WRITE";
public static final String ZK_OPERATION_DELETE = "DELETE";
- private ZooKeeperConfiguration configuration;
-
+ private final ZooKeeperConfiguration configuration;
private ZooKeeperConnectionManager zkm;
-
private ZooKeeper connection;
public ZookeeperProducer(ZooKeeperEndpoint endpoint) {
@@ -117,14 +115,6 @@ public class ZookeeperProducer extends DefaultProducer {
if (log.isTraceEnabled()) {
log.trace(String.format("Shutting down zookeeper producer of '%s'", configuration.getPath()));
}
- }
-
- @Override
- protected void doStop() throws Exception {
- super.doStop();
- if (log.isTraceEnabled()) {
- log.trace(String.format("Shutting down zookeeper producer of '%s'", configuration.getPath()));
- }
zkm.shutdown();
}
[4/4] camel git commit: ZooKeeper should close connection when
stopping producer/consumer. Fixes #72.
Posted by da...@apache.org.
ZooKeeper should close connection when stopping producer/consumer. Fixes #72.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cb6fc94c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cb6fc94c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cb6fc94c
Branch: refs/heads/camel-2.16.x
Commit: cb6fc94c4b788aaef03f048e3899974393c67422
Parents: 3eee448
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Dec 30 17:04:18 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 30 17:04:59 2015 +0100
----------------------------------------------------------------------
.../component/zookeeper/ConnectionHolder.java | 1 +
.../zookeeper/ZooKeeperConnectionManager.java | 3 ++-
.../component/zookeeper/ZooKeeperConsumer.java | 19 ++++++++-----------
.../component/zookeeper/ZookeeperProducer.java | 12 +-----------
4 files changed, 12 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/cb6fc94c/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java
index 2e69e31..bc92b99 100644
--- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ConnectionHolder.java
@@ -87,6 +87,7 @@ public class ConnectionHolder implements Watcher {
try {
if (zookeeper != null) {
zookeeper.close();
+ zookeeper = null;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Shutting down connection to Zookeeper cluster {}", configuration.getConnectString());
http://git-wip-us.apache.org/repos/asf/camel/blob/cb6fc94c/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java
index cabafdf..3b1ab7f 100644
--- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConnectionManager.java
@@ -50,7 +50,7 @@ public class ZooKeeperConnectionManager {
public DefaultZookeeperConnectionStrategy(ZooKeeperEndpoint endpoint) {
this.configuration = endpoint.getConfiguration();
- LOG.debug("Creating connection with static configuration of {}", configuration);
+ LOG.debug("Creating connection to ZooKeeper: {}", configuration);
holder = new ConnectionHolder(configuration);
}
@@ -59,6 +59,7 @@ public class ZooKeeperConnectionManager {
}
public void shutdown() {
+ LOG.debug("Shutting down connection to ZooKeeper: {}", configuration);
holder.closeConnection();
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/cb6fc94c/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
index 7fe40d5..bb9507f 100644
--- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
@@ -42,36 +42,32 @@ import org.apache.zookeeper.ZooKeeper;
@SuppressWarnings("rawtypes")
public class ZooKeeperConsumer extends DefaultConsumer {
- private ZooKeeperConnectionManager connectionManager;
-
+ private final ZooKeeperConnectionManager zkm;
private ZooKeeper connection;
-
private ZooKeeperConfiguration configuration;
-
private LinkedBlockingQueue<ZooKeeperOperation> operations = new LinkedBlockingQueue<ZooKeeperOperation>();
-
- private boolean shuttingDown;
-
private ExecutorService executor;
+ private volatile boolean shuttingDown;
public ZooKeeperConsumer(ZooKeeperEndpoint endpoint, Processor processor) {
super(endpoint, processor);
- this.connectionManager = endpoint.getConnectionManager();
+ this.zkm = endpoint.getConnectionManager();
this.configuration = endpoint.getConfiguration();
}
@Override
protected void doStart() throws Exception {
super.doStart();
- connection = connectionManager.getConnection();
+ connection = zkm.getConnection();
if (log.isDebugEnabled()) {
log.debug(String.format("Connected to Zookeeper cluster %s", configuration.getConnectString()));
}
initializeConsumer();
executor = getEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(configuration.getPath(), "Camel-Zookeeper Ops executor", 1);
+
OperationsExecutor opsService = new OperationsExecutor();
- executor.execute(opsService);
+ executor.submit(opsService);
}
@Override
@@ -81,7 +77,8 @@ public class ZooKeeperConsumer extends DefaultConsumer {
if (log.isTraceEnabled()) {
log.trace(String.format("Shutting down zookeeper consumer of '%s'", configuration.getPath()));
}
- executor.shutdown();
+ getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(executor);
+ zkm.shutdown();
}
private void initializeConsumer() {
http://git-wip-us.apache.org/repos/asf/camel/blob/cb6fc94c/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
index aaf368b..2280049 100644
--- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
@@ -52,10 +52,8 @@ public class ZookeeperProducer extends DefaultProducer {
public static final String ZK_OPERATION_WRITE = "WRITE";
public static final String ZK_OPERATION_DELETE = "DELETE";
- private ZooKeeperConfiguration configuration;
-
+ private final ZooKeeperConfiguration configuration;
private ZooKeeperConnectionManager zkm;
-
private ZooKeeper connection;
public ZookeeperProducer(ZooKeeperEndpoint endpoint) {
@@ -117,14 +115,6 @@ public class ZookeeperProducer extends DefaultProducer {
if (log.isTraceEnabled()) {
log.trace(String.format("Shutting down zookeeper producer of '%s'", configuration.getPath()));
}
- }
-
- @Override
- protected void doStop() throws Exception {
- super.doStop();
- if (log.isTraceEnabled()) {
- log.trace(String.format("Shutting down zookeeper producer of '%s'", configuration.getPath()));
- }
zkm.shutdown();
}
[3/4] camel git commit: ZooKeeperProducer does not stop ZooKeeper
threads if Camel context ist stopped. Added overloaded doStop() method to
shutdown ZooKeeperConnectionManager
Posted by da...@apache.org.
ZooKeeperProducer does not stop ZooKeeper threads if Camel context ist stopped. Added overloaded doStop() method to shutdown ZooKeeperConnectionManager
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3eee448b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3eee448b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3eee448b
Branch: refs/heads/camel-2.16.x
Commit: 3eee448bc384491864a5ea53b206a9a3816077fe
Parents: 5c4f008
Author: Klaus Schröder <kl...@next-audience.com>
Authored: Thu Dec 19 14:13:16 2013 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 30 17:04:52 2015 +0100
----------------------------------------------------------------------
.../apache/camel/component/zookeeper/ZookeeperProducer.java | 9 +++++++++
1 file changed, 9 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/3eee448b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
index d147b4f..aaf368b 100644
--- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
@@ -119,6 +119,15 @@ public class ZookeeperProducer extends DefaultProducer {
}
}
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+ if (log.isTraceEnabled()) {
+ log.trace(String.format("Shutting down zookeeper producer of '%s'", configuration.getPath()));
+ }
+ zkm.shutdown();
+ }
+
private void asynchronouslyDeleteNode(ZooKeeper connection, ProductionContext context) {
if (log.isDebugEnabled()) {
log.debug(format("Deleting node '%s', not waiting for confirmation", context.node));