You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/08/20 05:35:45 UTC

git commit: CAMEL-7723: Support async start and stop for consumers and producers

Repository: camel
Updated Branches:
  refs/heads/master 8a48ccf13 -> 11b328236


CAMEL-7723: Support async start and stop for consumers and producers


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/11b32823
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/11b32823
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/11b32823

Branch: refs/heads/master
Commit: 11b328236a1e0410c694bacc96830314f46c5869
Parents: 8a48ccf
Author: Cristiano Nicolai <cr...@gmail.com>
Authored: Tue Aug 19 23:21:00 2014 +1000
Committer: Willem Jiang <wi...@gmail.com>
Committed: Wed Aug 20 11:08:02 2014 +0800

----------------------------------------------------------------------
 .../camel/component/sjms/SjmsComponent.java     | 21 ++++++
 .../camel/component/sjms/SjmsConsumer.java      | 43 ++++++++++-
 .../camel/component/sjms/SjmsEndpoint.java      | 21 ++++++
 .../camel/component/sjms/SjmsProducer.java      | 48 +++++++++++-
 .../sjms/AsyncStartStopListenerTest.java        | 79 ++++++++++++++++++++
 5 files changed, 206 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/11b32823/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
index 8489d77..03d4506 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
@@ -17,6 +17,8 @@
 package org.apache.camel.component.sjms;
 
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
 import javax.jms.ConnectionFactory;
 
 import org.apache.camel.CamelException;
@@ -46,6 +48,7 @@ public class SjmsComponent extends UriEndpointComponent implements HeaderFilterS
     private Integer connectionCount = 1;
     private TransactionCommitStrategy transactionCommitStrategy;
     private TimedTaskManager timedTaskManager;
+    private ExecutorService asyncStartStopExecutorService;
 
     public SjmsComponent() {
         super(SjmsEndpoint.class);
@@ -152,6 +155,24 @@ public class SjmsComponent extends UriEndpointComponent implements HeaderFilterS
         super.doStop();
     }
 
+    @Override
+    protected void doShutdown() throws Exception {
+        if (asyncStartStopExecutorService != null) {
+            getCamelContext().getExecutorServiceManager().shutdownNow(asyncStartStopExecutorService);
+            asyncStartStopExecutorService = null;
+        }
+        super.doShutdown();
+    }
+
+    protected synchronized ExecutorService getAsyncStartStopExecutorService() {
+        if (asyncStartStopExecutorService == null) {
+            // use a cached thread pool for async start tasks as they can run for a while, and we need a dedicated thread
+            // for each task, and the thread pool will shrink when no more tasks running
+            asyncStartStopExecutorService = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "AsyncStartStopListener");
+        }
+        return asyncStartStopExecutorService;
+    }
+
     /**
      * Sets the ConnectionFactory value of connectionFactory for this instance
      * of SjmsComponent.

http://git-wip-us.apache.org/repos/asf/camel/blob/11b32823/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
index 017acbe..3c4b82f 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
@@ -147,15 +147,52 @@ public class SjmsConsumer extends DefaultConsumer {
         super.doStart();
         this.executor = getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "SjmsConsumer");
         consumers = new MessageConsumerPool();
-        consumers.fillPool();
+        if(getEndpoint().isAsyncStartListener()){
+            getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                    	consumers.fillPool();
+                    } catch (Throwable e) {
+                        log.warn("Error starting listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e);
+                    }
+                }
+
+                @Override
+                public String toString() {
+                    return "AsyncStartListenerTask[" + getDestinationName() + "]";
+                }
+            });
+        } else {
+            consumers.fillPool();
+        }
     }
 
     @Override
     protected void doStop() throws Exception {
         super.doStop();
         if (consumers != null) {
-            consumers.drainPool();
-            consumers = null;
+            if(getEndpoint().isAsyncStopListener()){
+                getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            consumers.drainPool();
+                            consumers = null;
+                        } catch (Throwable e) {
+                            log.warn("Error stopping listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e);
+                        }
+                    }
+
+                    @Override
+                    public String toString() {
+                        return "AsyncStopListenerTask[" + getDestinationName() + "]";
+                    }
+                });
+            } else {
+                consumers.drainPool();
+                consumers = null;
+            }
         }
         if (this.executor != null) {
             getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(this.executor);

http://git-wip-us.apache.org/repos/asf/camel/blob/11b32823/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
index 63118ec..8558a22 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
@@ -70,6 +70,10 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu
     private int transactionBatchCount = -1;
     @UriParam
     private long transactionBatchTimeout = 5000;
+    @UriParam
+    private boolean asyncStartListener;
+    @UriParam
+    private boolean asyncStopListener;
     private TransactionCommitStrategy transactionCommitStrategy;
 
     public SjmsEndpoint() {
@@ -448,4 +452,21 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu
         this.namedReplyTo = namedReplyTo;
         this.setExchangePattern(ExchangePattern.InOut);
     }
+
+    public void setAsyncStartListener(boolean asyncStartListener) {
+        this.asyncStartListener = asyncStartListener;
+    }
+
+    public void setAsyncStopListener(boolean asyncStopListener) {
+        this.asyncStopListener = asyncStopListener;
+    }
+
+    public boolean isAsyncStartListener() {
+        return asyncStartListener;
+    }
+
+    public boolean isAsyncStopListener() {
+        return asyncStopListener;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/11b32823/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
index 8e9e878..64a5914 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
@@ -135,7 +135,25 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
         this.executor = getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "SjmsProducer");
         if (getProducers() == null) {
             setProducers(new MessageProducerPool());
-            getProducers().fillPool();
+            if(getEndpoint().isAsyncStartListener()){
+                getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            getProducers().fillPool();
+                        } catch (Throwable e) {
+                            log.warn("Error starting listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e);
+                        }
+                    }
+
+                    @Override
+                    public String toString() {
+                        return "AsyncStartListenerTask[" + getDestinationName() + "]";
+                    }
+                });
+            } else {
+                getProducers().fillPool();
+            }
         }
     }
 
@@ -143,14 +161,38 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
     protected void doStop() throws Exception {
         super.doStop();
         if (getProducers() != null) {
-            getProducers().drainPool();
-            setProducers(null);
+            if(getEndpoint().isAsyncStopListener()){
+                getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            getProducers().drainPool();
+                            setProducers(null);
+                        } catch (Throwable e) {
+                            log.warn("Error stopping listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e);
+                        }
+                    }
+
+                    @Override
+                    public String toString() {
+                        return "AsyncStopListenerTask[" + getDestinationName() + "]";
+                    }
+                });
+            } else {
+                getProducers().drainPool();
+                setProducers(null);
+            }
         }
         if (this.executor != null) {
             getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(this.executor);
         }
     }
 
+    @Override
+    public SjmsEndpoint getEndpoint() {
+        return (SjmsEndpoint) super.getEndpoint();
+    }
+
     public abstract MessageProducerResources doCreateProducerModel() throws Exception;
 
     public abstract void sendMessage(Exchange exchange, final AsyncCallback callback) throws Exception;

http://git-wip-us.apache.org/repos/asf/camel/blob/11b32823/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/AsyncStartStopListenerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/AsyncStartStopListenerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/AsyncStartStopListenerTest.java
new file mode 100644
index 0000000..b10698f
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/AsyncStartStopListenerTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+import org.junit.Test;
+
+/**
+ * Testing with async start listener
+ */
+public class AsyncStartStopListenerTest extends JmsTestSupport {
+
+    @Test
+    public void testAsyncStartConsumer() throws Exception {
+        sendBodyAndAssert("sjms:queue:foo.start");
+    }
+
+    @Test
+    public void testAsyncStartStopConsumer() throws Exception {
+       sendBodyAndAssert("sjms:queue:foo.startstop");
+    }
+
+    @Test
+    public void testAsyncStopConsumer() throws Exception {
+        sendBodyAndAssert("sjms:queue:foo.stop");
+    }
+
+    @Test
+    public void testAsyncStopProducer() throws Exception {
+        sendBodyAndAssert("sjms:queue:foo?asyncStopListener=true");
+    }
+
+    @Test
+    public void testAsyncStartProducer() throws Exception {
+        sendBodyAndAssert("sjms:queue:foo?asyncStartListener=true");
+    }
+
+    @Test
+    public void testAsyncStartStopProducer() throws Exception {
+        sendBodyAndAssert("sjms:queue:foo?asyncStopListener=true&asyncStartListener=true");
+     }
+
+    private void sendBodyAndAssert(final String uri) throws InterruptedException {
+        String body1 = "Hello World";
+        String body2 = "G'day World";
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedBodiesReceived(body1, body2);
+        template.sendBody(uri, body1);
+        template.sendBody(uri, body2);
+        result.assertIsSatisfied();
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("sjms:queue:foo.startstop?asyncStartListener=true&asyncStopListener=true").to("mock:result");
+                from("sjms:queue:foo.start?asyncStartListener=true").to("mock:result");
+                from("sjms:queue:foo.stop?asyncStopListener=true").to("mock:result");
+                from("sjms:queue:foo").to("mock:result");
+            }
+        };
+    }
+}
\ No newline at end of file