You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/08/31 12:05:27 UTC

[1/5] git commit: Polished

Updated Branches:
  refs/heads/camel-2.10.x a14bdbf04 -> ef84e968d
  refs/heads/camel-2.11.x 8845baf9a -> bbab8282c
  refs/heads/camel-2.12.x 5f82ba3d5 -> 2e21ec701


Polished


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/759ed807
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/759ed807
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/759ed807

Branch: refs/heads/camel-2.12.x
Commit: 759ed807d4911773831219c84f709469cb55f31c
Parents: 5f82ba3
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Aug 30 16:01:48 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Aug 31 11:45:01 2013 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/camel/InvalidPayloadException.java   | 1 +
 .../java/org/apache/camel/InvalidPayloadRuntimeException.java | 7 ++++++-
 2 files changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/759ed807/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java b/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java
index dfdb830..ceb3ae2 100644
--- a/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java
+++ b/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java
@@ -25,6 +25,7 @@ package org.apache.camel;
 @Deprecated
 public class InvalidPayloadException extends CamelExchangeException {
 
+    // TODO: We should have NoSuchBodyException to be consistent with NoSuchHeaderException
     // TODO: We should remove this class in Camel 3.0, and just rely on the other exceptions for type conversion issues
 
     private static final long serialVersionUID = -1689157578733908632L;

http://git-wip-us.apache.org/repos/asf/camel/blob/759ed807/camel-core/src/main/java/org/apache/camel/InvalidPayloadRuntimeException.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/InvalidPayloadRuntimeException.java b/camel-core/src/main/java/org/apache/camel/InvalidPayloadRuntimeException.java
index b4b272c..55dcbde 100644
--- a/camel-core/src/main/java/org/apache/camel/InvalidPayloadRuntimeException.java
+++ b/camel-core/src/main/java/org/apache/camel/InvalidPayloadRuntimeException.java
@@ -19,9 +19,14 @@ package org.apache.camel;
 /**
  * Runtime version of the {@link InvalidPayloadException}.
  *
- * @version 
+ * @deprecated will be removed in Camel 3.0, use org.apache.camel.util.ObjectHelper#wrapRuntimeCamelException
+ * @version
  */
+@Deprecated
 public class InvalidPayloadRuntimeException extends RuntimeExchangeException {
+
+    // TODO: Use org.apache.camel.util.ObjectHelper#wrapRuntimeCamelException to wrap as runtime
+
     private static final long serialVersionUID = -155083097523464793L;
     private final transient Class<?> type;
 


[5/5] git commit: CAMEL-6093: Fixed having 2+ routes from the same JMS queue, not stop the endpoint if there are still active listeners when a route is stopped.

Posted by da...@apache.org.
CAMEL-6093: Fixed having 2+ routes from the same JMS queue, not stop the endpoint if there are still active listeners when a route is stopped.

Conflicts:
	components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
	components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ef84e968
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ef84e968
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ef84e968

Branch: refs/heads/camel-2.10.x
Commit: ef84e968d11fada25e1fb1a86963ecb4d7b1f4ed
Parents: a14bdbf
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Aug 31 11:29:25 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Aug 31 12:03:01 2013 +0200

----------------------------------------------------------------------
 .../apache/camel/component/jms/JmsConsumer.java |  11 +-
 .../apache/camel/component/jms/JmsEndpoint.java |  37 ++++--
 .../jms/reply/ReplyManagerSupport.java          |  12 +-
 .../jms/TwoConsumerOnSameQueueTest.java         | 115 +++++++++++++++++++
 4 files changed, 162 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ef84e968/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
index 50b7833..7bf6ab5 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
@@ -133,7 +133,8 @@ public class JmsConsumer extends DefaultConsumer implements SuspendableService {
         if (listenerContainer == null) {
             createMessageListenerContainer();
         }
-        
+        getEndpoint().onListenerContainerStarting(listenerContainer);
+
         if (getEndpoint().getConfiguration().isAsyncStartListener()) {
             getEndpoint().getAsyncStartStopExecutorService().submit(new Runnable() {
                 @Override
@@ -173,8 +174,12 @@ public class JmsConsumer extends DefaultConsumer implements SuspendableService {
 
     protected void stopAndDestroyListenerContainer() {
         if (listenerContainer != null) {
-            listenerContainer.stop();
-            listenerContainer.destroy();
+            try {
+                listenerContainer.stop();
+                listenerContainer.destroy();
+            } finally {
+                getEndpoint().onListenerConstainerStopped(listenerContainer);
+            }
         }
         // null container and listener so they are fully re created if this consumer is restarted
         // then we will use updated configuration from jms endpoint that may have been managed using JMX

http://git-wip-us.apache.org/repos/asf/camel/blob/ef84e968/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
index 509f10e..d18a7f1 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
@@ -18,7 +18,7 @@ package org.apache.camel.component.jms;
 
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
@@ -67,6 +67,7 @@ import org.springframework.util.ErrorHandler;
 @ManagedResource(description = "Managed JMS Endpoint")
 public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware, MultipleConsumersSupport, Service {
     protected final Logger log = LoggerFactory.getLogger(getClass());
+    private final AtomicInteger runningMessageListeners = new AtomicInteger();
     private HeaderFilterStrategy headerFilterStrategy;
     private boolean pubSubDomain;
     private JmsBinding binding;
@@ -74,7 +75,6 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
     private Destination destination;
     private String selector;
     private JmsConfiguration configuration;
-    private final AtomicBoolean running = new AtomicBoolean();
 
     public JmsEndpoint() {
         this(null, null);
@@ -431,21 +431,39 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
         return getComponent().getAsyncStartStopExecutorService();
     }
 
+    public void onListenerContainerStarting(AbstractMessageListenerContainer container) {
+        runningMessageListeners.incrementAndGet();
+    }
+
+    public void onListenerConstainerStopped(AbstractMessageListenerContainer container) {
+        runningMessageListeners.decrementAndGet();
+    }
+
     /**
      * State whether this endpoint is running (eg started)
      */
     protected boolean isRunning() {
-        return running.get();
+        return isStarted();
     }
 
     @Override
-    protected void doStart() throws Exception {
-        running.set(true);
+    public void stop() throws Exception {
+        int running = runningMessageListeners.get();
+        if (running <= 0) {
+            super.stop();
+        } else {
+            log.trace("There are still {} running message listeners. Cannot stop endpoint {}", running, this);
+        }
     }
 
     @Override
-    protected void doStop() throws Exception {
-        running.set(false);
+    public void shutdown() throws Exception {
+        int running = runningMessageListeners.get();
+        if (running <= 0) {
+            super.shutdown();
+        } else {
+            log.trace("There are still {} running message listeners. Cannot shutdown endpoint {}", running, this);
+        }
     }
 
     // Delegated properties from the configuration
@@ -1125,6 +1143,11 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
         return status.name();
     }
 
+    @ManagedAttribute(description = "Number of running message listeners")
+    public int getRunningMessageListeners() {
+        return runningMessageListeners.get();
+    }
+
     // Implementation methods
     //-------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/camel/blob/ef84e968/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
index bb4731c..b183e7b 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
@@ -224,6 +224,8 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
         listenerContainer = createListenerContainer();
         listenerContainer.afterPropertiesSet();
         log.debug("Starting reply listener container on endpoint: {}", endpoint);
+
+        endpoint.onListenerContainerStarting(listenerContainer);
         listenerContainer.start();
     }
 
@@ -233,9 +235,13 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
 
         if (listenerContainer != null) {
             log.debug("Stopping reply listener container on endpoint: {}", endpoint);
-            listenerContainer.stop();
-            listenerContainer.destroy();
-            listenerContainer = null;
+            try {
+                listenerContainer.stop();
+                listenerContainer.destroy();
+            } finally {
+                endpoint.onListenerConstainerStopped(listenerContainer);
+                listenerContainer = null;
+            }
         }
 
         // must also stop executor service

http://git-wip-us.apache.org/repos/asf/camel/blob/ef84e968/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java
new file mode 100644
index 0000000..32ce78c
--- /dev/null
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
+
+public class TwoConsumerOnSameQueueTest extends CamelTestSupport {
+
+    @Test
+    public void testTwoConsumerOnSameQueue() throws Exception {
+        sendTwoMessagesWhichShouldReceivedOnBothEndpointsAndAssert();
+    }
+
+    @Test
+    public void testStopAndStartOneRoute() throws Exception {
+        sendTwoMessagesWhichShouldReceivedOnBothEndpointsAndAssert();
+
+        // now stop route A
+        context.stopRoute("a");
+
+        // send new message should go to B only
+        resetMocks();
+
+        getMockEndpoint("mock:a").expectedMessageCount(0);
+        getMockEndpoint("mock:b").expectedBodiesReceived("Bye World", "Bye World");
+
+        template.sendBody("activemq:queue:foo", "Bye World");
+        template.sendBody("activemq:queue:foo", "Bye World");
+
+        assertMockEndpointsSatisfied();
+
+        // now start route A
+        context.startRoute("a");
+
+        // send new message should go to both A and B
+        resetMocks();
+
+        sendTwoMessagesWhichShouldReceivedOnBothEndpointsAndAssert();
+    }
+
+    @Test
+    public void testRemoveOneRoute() throws Exception {
+        sendTwoMessagesWhichShouldReceivedOnBothEndpointsAndAssert();
+
+        // now stop and remove route A
+        context.stopRoute("a");
+        assertTrue(context.removeRoute("a"));
+
+        // send new message should go to B only
+        resetMocks();
+
+        getMockEndpoint("mock:a").expectedMessageCount(0);
+        getMockEndpoint("mock:b").expectedBodiesReceived("Bye World", "Bye World");
+
+        template.sendBody("activemq:queue:foo", "Bye World");
+        template.sendBody("activemq:queue:foo", "Bye World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    private void sendTwoMessagesWhichShouldReceivedOnBothEndpointsAndAssert() throws InterruptedException {
+        getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:b").expectedBodiesReceived("Hello World");
+
+        template.sendBody("activemq:queue:foo", "Hello World");
+        template.sendBody("activemq:queue:foo", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); //CamelJmsTestHelper.createConnectionFactory();
+        camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory));
+
+        return camelContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("activemq:queue:foo").routeId("a")
+                     .to("mock:a");
+  
+                from("activemq:queue:foo").routeId("b")
+                     .to("mock:b");
+            }
+        };
+    }
+}


[2/5] git commit: Lets not deprecate InvalidPayloadException as its used for the getMandatoryBody and we would need an exception to be thrown if not body could be retrieved. Though the name of the class could possible be aligned better with the NoSuchHea

Posted by da...@apache.org.
Lets not deprecate InvalidPayloadException as its used for the getMandatoryBody and we would need an exception to be thrown if not body could be retrieved. Though the name of the class could possible be aligned better with the NoSuchHeaderException we have for getMandatoryHeader


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/16d81804
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/16d81804
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/16d81804

Branch: refs/heads/camel-2.12.x
Commit: 16d81804be134cf254ca1ae2698091afdba98f5a
Parents: 759ed80
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Aug 31 11:27:14 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Aug 31 11:45:07 2013 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/camel/InvalidPayloadException.java   | 7 +------
 .../java/org/apache/camel/InvalidPayloadRuntimeException.java | 4 ----
 2 files changed, 1 insertion(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/16d81804/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java b/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java
index ceb3ae2..37e6c3a 100644
--- a/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java
+++ b/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java
@@ -19,15 +19,10 @@ package org.apache.camel;
 /**
  * Is thrown if the payload from the exchange could not be retrieved because of being null, wrong class type etc.
  *
- * @deprecated will be removed in Camel 3.0
- * @version 
+ * @version
  */
-@Deprecated
 public class InvalidPayloadException extends CamelExchangeException {
 
-    // TODO: We should have NoSuchBodyException to be consistent with NoSuchHeaderException
-    // TODO: We should remove this class in Camel 3.0, and just rely on the other exceptions for type conversion issues
-
     private static final long serialVersionUID = -1689157578733908632L;
     private final transient Class<?> type;
 

http://git-wip-us.apache.org/repos/asf/camel/blob/16d81804/camel-core/src/main/java/org/apache/camel/InvalidPayloadRuntimeException.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/InvalidPayloadRuntimeException.java b/camel-core/src/main/java/org/apache/camel/InvalidPayloadRuntimeException.java
index 55dcbde..2b433bb 100644
--- a/camel-core/src/main/java/org/apache/camel/InvalidPayloadRuntimeException.java
+++ b/camel-core/src/main/java/org/apache/camel/InvalidPayloadRuntimeException.java
@@ -19,14 +19,10 @@ package org.apache.camel;
 /**
  * Runtime version of the {@link InvalidPayloadException}.
  *
- * @deprecated will be removed in Camel 3.0, use org.apache.camel.util.ObjectHelper#wrapRuntimeCamelException
  * @version
  */
-@Deprecated
 public class InvalidPayloadRuntimeException extends RuntimeExchangeException {
 
-    // TODO: Use org.apache.camel.util.ObjectHelper#wrapRuntimeCamelException to wrap as runtime
-
     private static final long serialVersionUID = -155083097523464793L;
     private final transient Class<?> type;
 


[3/5] git commit: CAMEL-6093: Fixed having 2+ routes from the same JMS queue, not stop the endpoint if there are still active listeners when a route is stopped.

Posted by da...@apache.org.
CAMEL-6093: Fixed having 2+ routes from the same JMS queue, not stop the endpoint if there are still active listeners when a route is stopped.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2e21ec70
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2e21ec70
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2e21ec70

Branch: refs/heads/camel-2.12.x
Commit: 2e21ec7012d3ab35ab8c344de0366436e4918bc5
Parents: 16d8180
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Aug 31 11:29:25 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Aug 31 11:45:13 2013 +0200

----------------------------------------------------------------------
 .../apache/camel/component/jms/JmsConsumer.java | 11 ++++--
 .../apache/camel/component/jms/JmsEndpoint.java | 37 ++++++++++++++++----
 .../jms/reply/ReplyManagerSupport.java          | 12 +++++--
 .../jms/TwoConsumerOnSameQueueTest.java         |  1 -
 4 files changed, 47 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2e21ec70/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
index 50b7833..7bf6ab5 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
@@ -133,7 +133,8 @@ public class JmsConsumer extends DefaultConsumer implements SuspendableService {
         if (listenerContainer == null) {
             createMessageListenerContainer();
         }
-        
+        getEndpoint().onListenerContainerStarting(listenerContainer);
+
         if (getEndpoint().getConfiguration().isAsyncStartListener()) {
             getEndpoint().getAsyncStartStopExecutorService().submit(new Runnable() {
                 @Override
@@ -173,8 +174,12 @@ public class JmsConsumer extends DefaultConsumer implements SuspendableService {
 
     protected void stopAndDestroyListenerContainer() {
         if (listenerContainer != null) {
-            listenerContainer.stop();
-            listenerContainer.destroy();
+            try {
+                listenerContainer.stop();
+                listenerContainer.destroy();
+            } finally {
+                getEndpoint().onListenerConstainerStopped(listenerContainer);
+            }
         }
         // null container and listener so they are fully re created if this consumer is restarted
         // then we will use updated configuration from jms endpoint that may have been managed using JMX

http://git-wip-us.apache.org/repos/asf/camel/blob/2e21ec70/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
index 701de7c..664da7c 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
@@ -18,7 +18,7 @@ package org.apache.camel.component.jms;
 
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
@@ -71,6 +71,7 @@ import org.springframework.util.ErrorHandler;
 @UriEndpoint(scheme = "jms", consumerClass = JmsConsumer.class)
 public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware, MultipleConsumersSupport, Service {
     protected final Logger log = LoggerFactory.getLogger(getClass());
+    private final AtomicInteger runningMessageListeners = new AtomicInteger();
     @UriParam
     private HeaderFilterStrategy headerFilterStrategy;
     private boolean pubSubDomain;
@@ -82,7 +83,6 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
     private String selector;
     @UriParam
     private JmsConfiguration configuration;
-    private final AtomicBoolean running = new AtomicBoolean();
 
     public JmsEndpoint() {
         this(null, null);
@@ -442,21 +442,39 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
         return getComponent().getAsyncStartStopExecutorService();
     }
 
+    public void onListenerContainerStarting(AbstractMessageListenerContainer container) {
+        runningMessageListeners.incrementAndGet();
+    }
+
+    public void onListenerConstainerStopped(AbstractMessageListenerContainer container) {
+        runningMessageListeners.decrementAndGet();
+    }
+
     /**
      * State whether this endpoint is running (eg started)
      */
     protected boolean isRunning() {
-        return running.get();
+        return isStarted();
     }
 
     @Override
-    protected void doStart() throws Exception {
-        running.set(true);
+    public void stop() throws Exception {
+        int running = runningMessageListeners.get();
+        if (running <= 0) {
+            super.stop();
+        } else {
+            log.trace("There are still {} running message listeners. Cannot stop endpoint {}", running, this);
+        }
     }
 
     @Override
-    protected void doStop() throws Exception {
-        running.set(false);
+    public void shutdown() throws Exception {
+        int running = runningMessageListeners.get();
+        if (running <= 0) {
+            super.shutdown();
+        } else {
+            log.trace("There are still {} running message listeners. Cannot shutdown endpoint {}", running, this);
+        }
     }
 
     // Delegated properties from the configuration
@@ -1146,6 +1164,11 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
         return status.name();
     }
 
+    @ManagedAttribute(description = "Number of running message listeners")
+    public int getRunningMessageListeners() {
+        return runningMessageListeners.get();
+    }
+
     // Implementation methods
     //-------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/camel/blob/2e21ec70/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
index 3828926..173d9c8 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
@@ -224,6 +224,8 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
         listenerContainer = createListenerContainer();
         listenerContainer.afterPropertiesSet();
         log.debug("Starting reply listener container on endpoint: {}", endpoint);
+
+        endpoint.onListenerContainerStarting(listenerContainer);
         listenerContainer.start();
     }
 
@@ -233,9 +235,13 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
 
         if (listenerContainer != null) {
             log.debug("Stopping reply listener container on endpoint: {}", endpoint);
-            listenerContainer.stop();
-            listenerContainer.destroy();
-            listenerContainer = null;
+            try {
+                listenerContainer.stop();
+                listenerContainer.destroy();
+            } finally {
+                endpoint.onListenerConstainerStopped(listenerContainer);
+                listenerContainer = null;
+            }
         }
 
         // must also stop executor service

http://git-wip-us.apache.org/repos/asf/camel/blob/2e21ec70/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java
index 3cdfd9e..55f73db 100644
--- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java
@@ -62,7 +62,6 @@ public class TwoConsumerOnSameQueueTest extends CamelTestSupport {
     }
 
     @Test
-    @Ignore
     public void testRemoveOneRoute() throws Exception {
         sendTwoMessagesWhichShouldReceivedOnBothEndpointsAndAssert();
 


[4/5] git commit: CAMEL-6093: Fixed having 2+ routes from the same JMS queue, not stop the endpoint if there are still active listeners when a route is stopped.

Posted by da...@apache.org.
CAMEL-6093: Fixed having 2+ routes from the same JMS queue, not stop the endpoint if there are still active listeners when a route is stopped.

Conflicts:
	components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
	components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bbab8282
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bbab8282
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bbab8282

Branch: refs/heads/camel-2.11.x
Commit: bbab8282cd74cfc04195aa500d621871b1e35232
Parents: 8845baf
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Aug 31 11:29:25 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Aug 31 11:50:09 2013 +0200

----------------------------------------------------------------------
 .../apache/camel/component/jms/JmsConsumer.java |  11 +-
 .../apache/camel/component/jms/JmsEndpoint.java |  37 ++++--
 .../jms/reply/ReplyManagerSupport.java          |  12 +-
 .../jms/TwoConsumerOnSameQueueTest.java         | 115 +++++++++++++++++++
 4 files changed, 162 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/bbab8282/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
index 50b7833..7bf6ab5 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
@@ -133,7 +133,8 @@ public class JmsConsumer extends DefaultConsumer implements SuspendableService {
         if (listenerContainer == null) {
             createMessageListenerContainer();
         }
-        
+        getEndpoint().onListenerContainerStarting(listenerContainer);
+
         if (getEndpoint().getConfiguration().isAsyncStartListener()) {
             getEndpoint().getAsyncStartStopExecutorService().submit(new Runnable() {
                 @Override
@@ -173,8 +174,12 @@ public class JmsConsumer extends DefaultConsumer implements SuspendableService {
 
     protected void stopAndDestroyListenerContainer() {
         if (listenerContainer != null) {
-            listenerContainer.stop();
-            listenerContainer.destroy();
+            try {
+                listenerContainer.stop();
+                listenerContainer.destroy();
+            } finally {
+                getEndpoint().onListenerConstainerStopped(listenerContainer);
+            }
         }
         // null container and listener so they are fully re created if this consumer is restarted
         // then we will use updated configuration from jms endpoint that may have been managed using JMX

http://git-wip-us.apache.org/repos/asf/camel/blob/bbab8282/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
index 45828f4..5f5570c 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
@@ -18,7 +18,7 @@ package org.apache.camel.component.jms;
 
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
@@ -67,6 +67,7 @@ import org.springframework.util.ErrorHandler;
 @ManagedResource(description = "Managed JMS Endpoint")
 public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware, MultipleConsumersSupport, Service {
     protected final Logger log = LoggerFactory.getLogger(getClass());
+    private final AtomicInteger runningMessageListeners = new AtomicInteger();
     private HeaderFilterStrategy headerFilterStrategy;
     private boolean pubSubDomain;
     private JmsBinding binding;
@@ -74,7 +75,6 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
     private Destination destination;
     private String selector;
     private JmsConfiguration configuration;
-    private final AtomicBoolean running = new AtomicBoolean();
 
     public JmsEndpoint() {
         this(null, null);
@@ -434,21 +434,39 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
         return getComponent().getAsyncStartStopExecutorService();
     }
 
+    public void onListenerContainerStarting(AbstractMessageListenerContainer container) {
+        runningMessageListeners.incrementAndGet();
+    }
+
+    public void onListenerConstainerStopped(AbstractMessageListenerContainer container) {
+        runningMessageListeners.decrementAndGet();
+    }
+
     /**
      * State whether this endpoint is running (eg started)
      */
     protected boolean isRunning() {
-        return running.get();
+        return isStarted();
     }
 
     @Override
-    protected void doStart() throws Exception {
-        running.set(true);
+    public void stop() throws Exception {
+        int running = runningMessageListeners.get();
+        if (running <= 0) {
+            super.stop();
+        } else {
+            log.trace("There are still {} running message listeners. Cannot stop endpoint {}", running, this);
+        }
     }
 
     @Override
-    protected void doStop() throws Exception {
-        running.set(false);
+    public void shutdown() throws Exception {
+        int running = runningMessageListeners.get();
+        if (running <= 0) {
+            super.shutdown();
+        } else {
+            log.trace("There are still {} running message listeners. Cannot shutdown endpoint {}", running, this);
+        }
     }
 
     // Delegated properties from the configuration
@@ -1138,6 +1156,11 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
         return status.name();
     }
 
+    @ManagedAttribute(description = "Number of running message listeners")
+    public int getRunningMessageListeners() {
+        return runningMessageListeners.get();
+    }
+
     // Implementation methods
     //-------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/camel/blob/bbab8282/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
index 3828926..173d9c8 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
@@ -224,6 +224,8 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
         listenerContainer = createListenerContainer();
         listenerContainer.afterPropertiesSet();
         log.debug("Starting reply listener container on endpoint: {}", endpoint);
+
+        endpoint.onListenerContainerStarting(listenerContainer);
         listenerContainer.start();
     }
 
@@ -233,9 +235,13 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
 
         if (listenerContainer != null) {
             log.debug("Stopping reply listener container on endpoint: {}", endpoint);
-            listenerContainer.stop();
-            listenerContainer.destroy();
-            listenerContainer = null;
+            try {
+                listenerContainer.stop();
+                listenerContainer.destroy();
+            } finally {
+                endpoint.onListenerConstainerStopped(listenerContainer);
+                listenerContainer = null;
+            }
         }
 
         // must also stop executor service

http://git-wip-us.apache.org/repos/asf/camel/blob/bbab8282/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java
new file mode 100644
index 0000000..32ce78c
--- /dev/null
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
+
+public class TwoConsumerOnSameQueueTest extends CamelTestSupport {
+
+    @Test
+    public void testTwoConsumerOnSameQueue() throws Exception {
+        sendTwoMessagesWhichShouldReceivedOnBothEndpointsAndAssert();
+    }
+
+    @Test
+    public void testStopAndStartOneRoute() throws Exception {
+        sendTwoMessagesWhichShouldReceivedOnBothEndpointsAndAssert();
+
+        // now stop route A
+        context.stopRoute("a");
+
+        // send new message should go to B only
+        resetMocks();
+
+        getMockEndpoint("mock:a").expectedMessageCount(0);
+        getMockEndpoint("mock:b").expectedBodiesReceived("Bye World", "Bye World");
+
+        template.sendBody("activemq:queue:foo", "Bye World");
+        template.sendBody("activemq:queue:foo", "Bye World");
+
+        assertMockEndpointsSatisfied();
+
+        // now start route A
+        context.startRoute("a");
+
+        // send new message should go to both A and B
+        resetMocks();
+
+        sendTwoMessagesWhichShouldReceivedOnBothEndpointsAndAssert();
+    }
+
+    @Test
+    public void testRemoveOneRoute() throws Exception {
+        sendTwoMessagesWhichShouldReceivedOnBothEndpointsAndAssert();
+
+        // now stop and remove route A
+        context.stopRoute("a");
+        assertTrue(context.removeRoute("a"));
+
+        // send new message should go to B only
+        resetMocks();
+
+        getMockEndpoint("mock:a").expectedMessageCount(0);
+        getMockEndpoint("mock:b").expectedBodiesReceived("Bye World", "Bye World");
+
+        template.sendBody("activemq:queue:foo", "Bye World");
+        template.sendBody("activemq:queue:foo", "Bye World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    private void sendTwoMessagesWhichShouldReceivedOnBothEndpointsAndAssert() throws InterruptedException {
+        getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:b").expectedBodiesReceived("Hello World");
+
+        template.sendBody("activemq:queue:foo", "Hello World");
+        template.sendBody("activemq:queue:foo", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); //CamelJmsTestHelper.createConnectionFactory();
+        camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory));
+
+        return camelContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("activemq:queue:foo").routeId("a")
+                     .to("mock:a");
+  
+                from("activemq:queue:foo").routeId("b")
+                     .to("mock:b");
+            }
+        };
+    }
+}