You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/03/18 08:33:46 UTC

[1/2] camel git commit: CAMEL-8491: Camel POJO producer/consumer should defer starting until CamelContext is starting

Repository: camel
Updated Branches:
  refs/heads/master e069d74fc -> b3afcac20


CAMEL-8491: Camel POJO producer/consumer should defer starting until CamelContext is starting


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ee04384f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ee04384f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ee04384f

Branch: refs/heads/master
Commit: ee04384fea31a23a8e50220530e546983f26fc82
Parents: e069d74
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Mar 18 08:29:58 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Mar 18 08:29:58 2015 +0100

----------------------------------------------------------------------
 .../apache/camel/component/jms/JmsRequestReplyManualReplyTest.java | 2 ++
 .../component/jms/JmsRequestReplyManualWithJMSReplyToTest.java     | 2 ++
 2 files changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ee04384f/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyManualReplyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyManualReplyTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyManualReplyTest.java
index fa7127b..e61c343 100644
--- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyManualReplyTest.java
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyManualReplyTest.java
@@ -66,6 +66,8 @@ public class JmsRequestReplyManualReplyTest extends CamelTestSupport {
 
     @Test
     public void testManualRequestReply() throws Exception {
+        context.start();
+
         // send using pure JMS API to set a custom JMSReplyTo
         jms.send(new ActiveMQQueue("foo"), new MessageCreator() {
             public Message createMessage(Session session) throws JMSException {

http://git-wip-us.apache.org/repos/asf/camel/blob/ee04384f/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyManualWithJMSReplyToTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyManualWithJMSReplyToTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyManualWithJMSReplyToTest.java
index de54d7f..e69487c 100644
--- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyManualWithJMSReplyToTest.java
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyManualWithJMSReplyToTest.java
@@ -50,6 +50,8 @@ public class JmsRequestReplyManualWithJMSReplyToTest extends CamelTestSupport {
 
     @Test
     public void testManualRequestReply() throws Exception {
+        context.start();
+
         // send an InOnly but force Camel to pass JMSReplyTo
         template.send("activemq:queue:foo?preserveMessageQos=true", new Processor() {
             public void process(Exchange exchange) throws Exception {


[2/2] camel git commit: CAMEL-8503: camel-jms - Have replyTo options for concurrent consumers

Posted by da...@apache.org.
CAMEL-8503: camel-jms - Have replyTo options for concurrent consumers


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b3afcac2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b3afcac2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b3afcac2

Branch: refs/heads/master
Commit: b3afcac209e1c18dac2bc64e9bc043d8c5e4a8ec
Parents: ee04384
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Mar 18 08:35:34 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Mar 18 08:35:34 2015 +0100

----------------------------------------------------------------------
 .../camel/component/jms/JmsComponent.java       |  8 +++++
 .../camel/component/jms/JmsConfiguration.java   | 36 ++++++++++++++++++++
 .../apache/camel/component/jms/JmsEndpoint.java | 20 +++++++++++
 .../component/jms/reply/QueueReplyManager.java  |  6 ++--
 .../jms/reply/TemporaryQueueReplyManager.java   |  6 ++--
 ...uestReplyTempQueueMultipleConsumersTest.java |  3 +-
 6 files changed, 71 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b3afcac2/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
index ed2022c..070c44d 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
@@ -205,6 +205,10 @@ public class JmsComponent extends UriEndpointComponent implements ApplicationCon
         getConfiguration().setConcurrentConsumers(concurrentConsumers);
     }
 
+    public void setReplyToConcurrentConsumers(int concurrentConsumers) {
+        getConfiguration().setReplyToConcurrentConsumers(concurrentConsumers);
+    }
+
     public void setConnectionFactory(ConnectionFactory connectionFactory) {
         getConfiguration().setConnectionFactory(connectionFactory);
     }
@@ -257,6 +261,10 @@ public class JmsComponent extends UriEndpointComponent implements ApplicationCon
         getConfiguration().setMaxConcurrentConsumers(maxConcurrentConsumers);
     }
 
+    public void setReplyToMaxConcurrentConsumers(int maxConcurrentConsumers) {
+        getConfiguration().setReplyToMaxConcurrentConsumers(maxConcurrentConsumers);
+    }
+
     public void setMaxMessagesPerTask(int maxMessagesPerTask) {
         getConfiguration().setMaxMessagesPerTask(maxMessagesPerTask);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/b3afcac2/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
index 65350d9..6229517 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
@@ -97,6 +97,8 @@ public class JmsConfiguration implements Cloneable {
     private boolean pubSubNoLocal;
     @UriParam(defaultValue = "1")
     private int concurrentConsumers = 1;
+    @UriParam(defaultValue = "1")
+    private int replyToConcurrentConsumers = 1;
     @UriParam(defaultValue = "-1")
     private int maxMessagesPerTask = -1;
     private int cacheLevel = -1;
@@ -116,6 +118,8 @@ public class JmsConfiguration implements Cloneable {
     private int idleConsumerLimit = 1;
     @UriParam
     private int maxConcurrentConsumers;
+    @UriParam
+    private int replyToMaxConcurrentConsumers;
     // JmsTemplate only
     @UriParam(defaultValue = "false")
     private Boolean explicitQosEnabled;
@@ -643,10 +647,26 @@ public class JmsConfiguration implements Cloneable {
         return concurrentConsumers;
     }
 
+    /**
+     * Specifies the default number of concurrent consumers when consuming from JMS (not for request/reply over JMS).
+     * See also the maxMessagesPerTask option to control dynamic scaling up/down of threads.
+     */
     public void setConcurrentConsumers(int concurrentConsumers) {
         this.concurrentConsumers = concurrentConsumers;
     }
 
+    public int getReplyToConcurrentConsumers() {
+        return replyToConcurrentConsumers;
+    }
+
+    /**
+     * Specifies the default number of concurrent consumers when doing request/reply over JMS.
+     * See also the maxMessagesPerTask option to control dynamic scaling up/down of threads.
+     */
+    public void setReplyToConcurrentConsumers(int replyToConcurrentConsumers) {
+        this.replyToConcurrentConsumers = replyToConcurrentConsumers;
+    }
+
     public int getMaxMessagesPerTask() {
         return maxMessagesPerTask;
     }
@@ -734,10 +754,26 @@ public class JmsConfiguration implements Cloneable {
         return maxConcurrentConsumers;
     }
 
+    /**
+     * Specifies the maximum number of concurrent consumers when consuming from JMS (not for request/reply over JMS).
+     * See also the maxMessagesPerTask option to control dynamic scaling up/down of threads.
+     */
     public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
         this.maxConcurrentConsumers = maxConcurrentConsumers;
     }
 
+    public int getReplyToMaxConcurrentConsumers() {
+        return replyToMaxConcurrentConsumers;
+    }
+
+    /**
+     * Specifies the maximum number of concurrent consumers when using request/reply over JMS.
+     * See also the maxMessagesPerTask option to control dynamic scaling up/down of threads.
+     */
+    public void setReplyToMaxConcurrentConsumers(int replyToMaxConcurrentConsumers) {
+        this.replyToMaxConcurrentConsumers = replyToMaxConcurrentConsumers;
+    }
+
     public boolean isExplicitQosEnabled() {
         return explicitQosEnabled != null ? explicitQosEnabled : false;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/b3afcac2/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
index 987d17f..d9571a6 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
@@ -542,6 +542,11 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
         return getConfiguration().getConcurrentConsumers();
     }
 
+    @ManagedAttribute
+    public int getReplyToConcurrentConsumers() {
+        return getConfiguration().getReplyToConcurrentConsumers();
+    }
+
     public ConnectionFactory getConnectionFactory() {
         return getConfiguration().getConnectionFactory();
     }
@@ -601,6 +606,11 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
     }
 
     @ManagedAttribute
+    public int getReplyToMaxConcurrentConsumers() {
+        return getConfiguration().getReplyToMaxConcurrentConsumers();
+    }
+
+    @ManagedAttribute
     public int getMaxMessagesPerTask() {
         return getConfiguration().getMaxMessagesPerTask();
     }
@@ -822,6 +832,11 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
         getConfiguration().setConcurrentConsumers(concurrentConsumers);
     }
 
+    @ManagedAttribute
+    public void setReplyToConcurrentConsumers(int concurrentConsumers) {
+        getConfiguration().setReplyToConcurrentConsumers(concurrentConsumers);
+    }
+
     public void setConnectionFactory(ConnectionFactory connectionFactory) {
         getConfiguration().setConnectionFactory(connectionFactory);
     }
@@ -897,6 +912,11 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
     }
 
     @ManagedAttribute
+    public void setReplyToMaxConcurrentConsumers(int maxConcurrentConsumers) {
+        getConfiguration().setReplyToMaxConcurrentConsumers(maxConcurrentConsumers);
+    }
+
+    @ManagedAttribute
     public void setMaxMessagesPerTask(int maxMessagesPerTask) {
         getConfiguration().setMaxMessagesPerTask(maxMessagesPerTask);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/b3afcac2/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
index ebb91c5..07ddfad 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
@@ -175,9 +175,9 @@ public class QueueReplyManager extends ReplyManagerSupport {
         answer.setMessageListener(this);
         answer.setPubSubDomain(false);
         answer.setSubscriptionDurable(false);
-        answer.setConcurrentConsumers(endpoint.getConcurrentConsumers());
-        if (endpoint.getMaxConcurrentConsumers() > 0) {
-            answer.setMaxConcurrentConsumers(endpoint.getMaxConcurrentConsumers());
+        answer.setConcurrentConsumers(endpoint.getReplyToConcurrentConsumers());
+        if (endpoint.getReplyToMaxConcurrentConsumers() > 0) {
+            answer.setMaxConcurrentConsumers(endpoint.getReplyToMaxConcurrentConsumers());
         }
         answer.setConnectionFactory(endpoint.getConnectionFactory());
         String clientId = endpoint.getClientId();

http://git-wip-us.apache.org/repos/asf/camel/blob/b3afcac2/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
index 123b9cd..0e3d98b 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
@@ -110,9 +110,9 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport {
         answer.setMessageListener(this);
         answer.setPubSubDomain(false);
         answer.setSubscriptionDurable(false);
-        answer.setConcurrentConsumers(endpoint.getConcurrentConsumers());
-        if (endpoint.getMaxConcurrentConsumers() > 0) {
-            answer.setMaxConcurrentConsumers(endpoint.getMaxConcurrentConsumers());
+        answer.setConcurrentConsumers(endpoint.getReplyToConcurrentConsumers());
+        if (endpoint.getReplyToMaxConcurrentConsumers() > 0) {
+            answer.setMaxConcurrentConsumers(endpoint.getReplyToMaxConcurrentConsumers());
         }
         answer.setConnectionFactory(endpoint.getConnectionFactory());
         // we use CACHE_CONSUMER by default to cling to the consumer as long as we can, since we can only consume

http://git-wip-us.apache.org/repos/asf/camel/blob/b3afcac2/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java
index cb2a67a..decf610 100644
--- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java
@@ -37,7 +37,6 @@ import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknow
 
 /**
  * Reliability tests for JMS TempQueue Reply Manager with multiple consumers.
- * @version 
  */
 public class JmsRequestReplyTempQueueMultipleConsumersTest extends CamelTestSupport {
 
@@ -97,7 +96,7 @@ public class JmsRequestReplyTempQueueMultipleConsumersTest extends CamelTestSupp
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("seda:start").inOut("jms:queue:foo?concurrentConsumers=10&maxConcurrentConsumers=20&recoveryInterval=10").process(new Processor() {
+                from("seda:start").inOut("jms:queue:foo?replyToConcurrentConsumers=10&replyToMaxConcurrentConsumers=20&recoveryInterval=10").process(new Processor() {
                     @Override
                     public void process(Exchange exchange) throws Exception {
                         String threadName = Thread.currentThread().getName();