You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/08/20 05:35:45 UTC
git commit: CAMEL-7723: Support async start and stop for consumers
and producers
Repository: camel
Updated Branches:
refs/heads/master 8a48ccf13 -> 11b328236
CAMEL-7723: Support async start and stop for consumers and producers
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/11b32823
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/11b32823
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/11b32823
Branch: refs/heads/master
Commit: 11b328236a1e0410c694bacc96830314f46c5869
Parents: 8a48ccf
Author: Cristiano Nicolai <cr...@gmail.com>
Authored: Tue Aug 19 23:21:00 2014 +1000
Committer: Willem Jiang <wi...@gmail.com>
Committed: Wed Aug 20 11:08:02 2014 +0800
----------------------------------------------------------------------
.../camel/component/sjms/SjmsComponent.java | 21 ++++++
.../camel/component/sjms/SjmsConsumer.java | 43 ++++++++++-
.../camel/component/sjms/SjmsEndpoint.java | 21 ++++++
.../camel/component/sjms/SjmsProducer.java | 48 +++++++++++-
.../sjms/AsyncStartStopListenerTest.java | 79 ++++++++++++++++++++
5 files changed, 206 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/11b32823/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
index 8489d77..03d4506 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
@@ -17,6 +17,8 @@
package org.apache.camel.component.sjms;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
import javax.jms.ConnectionFactory;
import org.apache.camel.CamelException;
@@ -46,6 +48,7 @@ public class SjmsComponent extends UriEndpointComponent implements HeaderFilterS
private Integer connectionCount = 1;
private TransactionCommitStrategy transactionCommitStrategy;
private TimedTaskManager timedTaskManager;
+ private ExecutorService asyncStartStopExecutorService;
public SjmsComponent() {
super(SjmsEndpoint.class);
@@ -152,6 +155,24 @@ public class SjmsComponent extends UriEndpointComponent implements HeaderFilterS
super.doStop();
}
+ @Override
+ protected void doShutdown() throws Exception {
+ if (asyncStartStopExecutorService != null) {
+ getCamelContext().getExecutorServiceManager().shutdownNow(asyncStartStopExecutorService);
+ asyncStartStopExecutorService = null;
+ }
+ super.doShutdown();
+ }
+
+ protected synchronized ExecutorService getAsyncStartStopExecutorService() {
+ if (asyncStartStopExecutorService == null) {
+ // use a cached thread pool for async start tasks as they can run for a while, and we need a dedicated thread
+ // for each task, and the thread pool will shrink when no more tasks running
+ asyncStartStopExecutorService = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "AsyncStartStopListener");
+ }
+ return asyncStartStopExecutorService;
+ }
+
/**
* Sets the ConnectionFactory value of connectionFactory for this instance
* of SjmsComponent.
http://git-wip-us.apache.org/repos/asf/camel/blob/11b32823/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
index 017acbe..3c4b82f 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
@@ -147,15 +147,52 @@ public class SjmsConsumer extends DefaultConsumer {
super.doStart();
this.executor = getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "SjmsConsumer");
consumers = new MessageConsumerPool();
- consumers.fillPool();
+ if(getEndpoint().isAsyncStartListener()){
+ getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ consumers.fillPool();
+ } catch (Throwable e) {
+ log.warn("Error starting listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "AsyncStartListenerTask[" + getDestinationName() + "]";
+ }
+ });
+ } else {
+ consumers.fillPool();
+ }
}
@Override
protected void doStop() throws Exception {
super.doStop();
if (consumers != null) {
- consumers.drainPool();
- consumers = null;
+ if(getEndpoint().isAsyncStopListener()){
+ getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ consumers.drainPool();
+ consumers = null;
+ } catch (Throwable e) {
+ log.warn("Error stopping listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "AsyncStopListenerTask[" + getDestinationName() + "]";
+ }
+ });
+ } else {
+ consumers.drainPool();
+ consumers = null;
+ }
}
if (this.executor != null) {
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(this.executor);
http://git-wip-us.apache.org/repos/asf/camel/blob/11b32823/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
index 63118ec..8558a22 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
@@ -70,6 +70,10 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu
private int transactionBatchCount = -1;
@UriParam
private long transactionBatchTimeout = 5000;
+ @UriParam
+ private boolean asyncStartListener;
+ @UriParam
+ private boolean asyncStopListener;
private TransactionCommitStrategy transactionCommitStrategy;
public SjmsEndpoint() {
@@ -448,4 +452,21 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu
this.namedReplyTo = namedReplyTo;
this.setExchangePattern(ExchangePattern.InOut);
}
+
+ public void setAsyncStartListener(boolean asyncStartListener) {
+ this.asyncStartListener = asyncStartListener;
+ }
+
+ public void setAsyncStopListener(boolean asyncStopListener) {
+ this.asyncStopListener = asyncStopListener;
+ }
+
+ public boolean isAsyncStartListener() {
+ return asyncStartListener;
+ }
+
+ public boolean isAsyncStopListener() {
+ return asyncStopListener;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/11b32823/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
index 8e9e878..64a5914 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
@@ -135,7 +135,25 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
this.executor = getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "SjmsProducer");
if (getProducers() == null) {
setProducers(new MessageProducerPool());
- getProducers().fillPool();
+ if(getEndpoint().isAsyncStartListener()){
+ getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ getProducers().fillPool();
+ } catch (Throwable e) {
+ log.warn("Error starting listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "AsyncStartListenerTask[" + getDestinationName() + "]";
+ }
+ });
+ } else {
+ getProducers().fillPool();
+ }
}
}
@@ -143,14 +161,38 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
protected void doStop() throws Exception {
super.doStop();
if (getProducers() != null) {
- getProducers().drainPool();
- setProducers(null);
+ if(getEndpoint().isAsyncStopListener()){
+ getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ getProducers().drainPool();
+ setProducers(null);
+ } catch (Throwable e) {
+ log.warn("Error stopping listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "AsyncStopListenerTask[" + getDestinationName() + "]";
+ }
+ });
+ } else {
+ getProducers().drainPool();
+ setProducers(null);
+ }
}
if (this.executor != null) {
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(this.executor);
}
}
+ @Override
+ public SjmsEndpoint getEndpoint() {
+ return (SjmsEndpoint) super.getEndpoint();
+ }
+
public abstract MessageProducerResources doCreateProducerModel() throws Exception;
public abstract void sendMessage(Exchange exchange, final AsyncCallback callback) throws Exception;
http://git-wip-us.apache.org/repos/asf/camel/blob/11b32823/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/AsyncStartStopListenerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/AsyncStartStopListenerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/AsyncStartStopListenerTest.java
new file mode 100644
index 0000000..b10698f
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/AsyncStartStopListenerTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+import org.junit.Test;
+
+/**
+ * Testing with async start listener
+ */
+public class AsyncStartStopListenerTest extends JmsTestSupport {
+
+ @Test
+ public void testAsyncStartConsumer() throws Exception {
+ sendBodyAndAssert("sjms:queue:foo.start");
+ }
+
+ @Test
+ public void testAsyncStartStopConsumer() throws Exception {
+ sendBodyAndAssert("sjms:queue:foo.startstop");
+ }
+
+ @Test
+ public void testAsyncStopConsumer() throws Exception {
+ sendBodyAndAssert("sjms:queue:foo.stop");
+ }
+
+ @Test
+ public void testAsyncStopProducer() throws Exception {
+ sendBodyAndAssert("sjms:queue:foo?asyncStopListener=true");
+ }
+
+ @Test
+ public void testAsyncStartProducer() throws Exception {
+ sendBodyAndAssert("sjms:queue:foo?asyncStartListener=true");
+ }
+
+ @Test
+ public void testAsyncStartStopProducer() throws Exception {
+ sendBodyAndAssert("sjms:queue:foo?asyncStopListener=true&asyncStartListener=true");
+ }
+
+ private void sendBodyAndAssert(final String uri) throws InterruptedException {
+ String body1 = "Hello World";
+ String body2 = "G'day World";
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedBodiesReceived(body1, body2);
+ template.sendBody(uri, body1);
+ template.sendBody(uri, body2);
+ result.assertIsSatisfied();
+ }
+
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ from("sjms:queue:foo.startstop?asyncStartListener=true&asyncStopListener=true").to("mock:result");
+ from("sjms:queue:foo.start?asyncStartListener=true").to("mock:result");
+ from("sjms:queue:foo.stop?asyncStopListener=true").to("mock:result");
+ from("sjms:queue:foo").to("mock:result");
+ }
+ };
+ }
+}
\ No newline at end of file