You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/07/17 15:22:00 UTC
[1/3] camel git commit: Refactored logic around parsing destination
names from the URI. Rejecting topics from batch consumption,
as does not make sense conceptually.
Repository: camel
Updated Branches:
refs/heads/master 65f9a3ab3 -> 832a99c54
Refactored logic around parsing destination names from the URI. Rejecting topics from batch consumption, as does not make sense conceptually.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2ba152d3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2ba152d3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2ba152d3
Branch: refs/heads/master
Commit: 2ba152d387d23d2f0f4dc2297984d4173b147a5e
Parents: ab1d1dd
Author: jkorab <ja...@gmail.com>
Authored: Fri Jul 17 11:42:47 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Jul 17 14:56:22 2015 +0200
----------------------------------------------------------------------
.../camel/component/sjms/SjmsComponent.java | 2 +-
.../camel/component/sjms/SjmsEndpoint.java | 22 ++------
.../component/sjms/batch/SjmsBatchConsumer.java | 15 +++---
.../component/sjms/batch/SjmsBatchEndpoint.java | 19 +++++--
.../sjms/jms/DestinationNameParser.java | 36 +++++++++++++
.../sjms/batch/SjmsBatchConsumerTest.java | 49 ++++++++++++++---
.../sjms/batch/SjmsBatchEndpointTest.java | 23 ++++++--
.../sjms/jms/DestinationNameParserTest.java | 56 ++++++++++++++++++++
.../sjms/producer/QueueProducerQoSTest.java | 23 ++++----
9 files changed, 192 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
index f2eebc6..2503162 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
@@ -61,7 +61,7 @@ public class SjmsComponent extends UriEndpointComponent implements HeaderFilterS
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
validateMepAndReplyTo(parameters);
uri = normalizeUri(uri);
- SjmsEndpoint endpoint = new SjmsEndpoint(uri, this);
+ SjmsEndpoint endpoint = new SjmsEndpoint(uri, this, remaining);
setProperties(endpoint, parameters);
if (endpoint.isTransacted()) {
endpoint.setSynchronous(true);
http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
index 67774d4..0e8d68a 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
@@ -22,11 +22,7 @@ import org.apache.camel.ExchangePattern;
import org.apache.camel.MultipleConsumersSupport;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
-import org.apache.camel.component.sjms.jms.ConnectionResource;
-import org.apache.camel.component.sjms.jms.DefaultDestinationCreationStrategy;
-import org.apache.camel.component.sjms.jms.DestinationCreationStrategy;
-import org.apache.camel.component.sjms.jms.KeyFormatStrategy;
-import org.apache.camel.component.sjms.jms.SessionAcknowledgementType;
+import org.apache.camel.component.sjms.jms.*;
import org.apache.camel.component.sjms.producer.InOnlyProducer;
import org.apache.camel.component.sjms.producer.InOutProducer;
import org.apache.camel.impl.DefaultEndpoint;
@@ -95,19 +91,11 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu
public SjmsEndpoint() {
}
- public SjmsEndpoint(String uri, Component component) {
+ public SjmsEndpoint(String uri, Component component, String remaining) {
super(uri, component);
- if (getEndpointUri().contains("://queue:")) {
- topic = false;
- } else if (getEndpointUri().contains("://topic:")) {
- topic = true;
- } else {
- throw new IllegalArgumentException("Endpoint URI unsupported: " + uri);
- }
- destinationName = getEndpointUri().substring(getEndpointUri().lastIndexOf(":") + 1);
- if (destinationName.contains("?")) {
- destinationName = destinationName.substring(0, destinationName.lastIndexOf("?"));
- }
+ DestinationNameParser parser = new DestinationNameParser();
+ topic = parser.isTopic(remaining);
+ this.destinationName = parser.getShortName(remaining);
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index 613d471..ca47c7c 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -16,13 +16,11 @@
*/
package org.apache.camel.component.sjms.batch;
-import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.sjms.jms.JmsMessageHelper;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.processor.aggregate.AggregationStrategy;
-import org.apache.camel.spi.Synchronization;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,7 +88,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
}
@Override
- public Endpoint getEndpoint() {
+ public SjmsBatchEndpoint getEndpoint() {
return sjmsBatchEndpoint;
}
@@ -164,10 +162,11 @@ public class SjmsBatchConsumer extends DefaultConsumer {
// a batch corresponds to a single session that will be committed or rolled back by a background thread
final Session session = connection.createSession(TRANSACTED, Session.CLIENT_ACKNOWLEDGE);
try {
- // destinationName only creates queues; there is no additional value to be gained
- // by transactionally consuming topic messages in batches
+ // only batch consumption from queues is supported - it makes no sense to transactionally consume
+ // from a topic as you don't car about message loss, users can just use a regular aggregator instead
Queue queue = session.createQueue(destinationName);
MessageConsumer consumer = session.createConsumer(queue);
+
try {
consumeBatchesOnLoop(session, consumer);
} finally {
@@ -289,11 +288,11 @@ public class SjmsBatchConsumer extends DefaultConsumer {
int id = batchCount.getAndIncrement();
int batchSize = exchange.getProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, Integer.class);
if (LOG.isDebugEnabled()) {
- LOG.debug("Processing batch:size={}:total={}", batchSize, messagesReceived.addAndGet(batchSize));
+ LOG.debug("Processing batch[" + id + "]:size=" + batchSize + ":total=" + messagesReceived.addAndGet(batchSize));
}
- Synchronization committing = new SessionCompletion(session);
- exchange.addOnCompletion(committing);
+ SessionCompletion sessionCompletion = new SessionCompletion(session);
+ exchange.addOnCompletion(sessionCompletion);
try {
processor.process(exchange);
LOG.debug("Completed processing[{}]:total={}", id, messagesProcessed.addAndGet(batchSize));
http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
index afd5cbe..5d307a7 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
@@ -20,6 +20,7 @@ import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
+import org.apache.camel.component.sjms.jms.DestinationNameParser;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.spi.Metadata;
@@ -30,7 +31,10 @@ import org.apache.camel.spi.UriPath;
/**
* @author jkorab
*/
-@UriEndpoint(scheme = "sjmsBatch", title = "Simple JMS Batch Component", syntax = "sjms-batch:destinationName?aggregationStrategy=#aggStrategy", consumerClass = SjmsBatchComponent.class, label = "messaging")
+@UriEndpoint(scheme = "sjmsBatch",
+ title = "Simple JMS Batch Component",
+ syntax = "sjms-batch:destinationName?aggregationStrategy=#aggStrategy",
+ consumerClass = SjmsBatchComponent.class, label = "messaging")
public class SjmsBatchEndpoint extends DefaultEndpoint {
public static final int DEFAULT_COMPLETION_SIZE = 200; // the default dispatch queue size in ActiveMQ
@@ -38,7 +42,8 @@ public class SjmsBatchEndpoint extends DefaultEndpoint {
public static final String PROPERTY_BATCH_SIZE = "CamelSjmsBatchSize";
@UriPath
- @Metadata(required = "true")
+ @Metadata(required = "true",
+ description = "The destination name. Only queues are supported, names may be prefixed by 'queue:'.")
private String destinationName;
@UriParam(label = "consumer", defaultValue = "1", description = "The number of JMS sessions to consume from")
@@ -61,11 +66,18 @@ public class SjmsBatchEndpoint extends DefaultEndpoint {
@UriParam(label = "consumer", description = "A #-reference to an AggregationStrategy visible to Camel")
private AggregationStrategy aggregationStrategy;
+ private boolean topic;
+
public SjmsBatchEndpoint() {}
public SjmsBatchEndpoint(String endpointUri, Component component, String remaining) {
super(endpointUri, component);
- this.destinationName = remaining;
+ DestinationNameParser parser = new DestinationNameParser();
+ if (parser.isTopic(remaining)) {
+ throw new IllegalArgumentException("Only batch consumption from queues is supported. For topics you " +
+ "should use a regular JMS consumer with an aggregator.");
+ }
+ this.destinationName = parser.getShortName(remaining);
}
@Override
@@ -130,4 +142,5 @@ public class SjmsBatchEndpoint extends DefaultEndpoint {
public void setPollDuration(Integer pollDuration) {
this.pollDuration = pollDuration;
}
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
new file mode 100644
index 0000000..d248350
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.jms;
+
+/**
+ * @author jkorab
+ */
+public class DestinationNameParser {
+ public boolean isTopic(String destinationName) {
+ if (destinationName == null) {
+ throw new IllegalArgumentException("destinationName is null");
+ }
+ return destinationName.startsWith("topic:");
+ }
+
+ public String getShortName(String destinationName) {
+ if (destinationName == null) {
+ throw new IllegalArgumentException("destinationName is null");
+ }
+ return destinationName.substring(destinationName.lastIndexOf(":") + 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
index 58fc717..642a38f 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
@@ -17,9 +17,11 @@
package org.apache.camel.component.sjms.batch;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
+import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.sjms.SjmsComponent;
@@ -60,7 +62,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
CamelContext context = new DefaultCamelContext(registry);
context.addComponent("sjms", sjmsComponent);
- context.addComponent("sjmsbatch", sjmsBatchComponent);
+ context.addComponent("sjms-batch", sjmsBatchComponent);
return context;
}
@@ -100,7 +102,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
int completionTimeout = 1000;
int completionSize = 200;
- fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s" +
+ fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s" +
"&consumerCount=%s&aggregationStrategy=#testStrategy",
queueName, completionTimeout, completionSize, consumerCount)
.routeId("batchConsumer").startupOrder(10).autoStartup(false)
@@ -138,7 +140,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
context.addRoutes(new TransactedSendHarness(queueName));
context.addRoutes(new RouteBuilder() {
public void configure() throws Exception {
- fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+ fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
queueName, completionTimeout, completionSize).routeId("batchConsumer").startupOrder(10)
.log(LoggingLevel.DEBUG, "${body.size}")
.to("mock:batches");
@@ -163,7 +165,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
context.addRoutes(new TransactedSendHarness(queueName));
context.addRoutes(new RouteBuilder() {
public void configure() throws Exception {
- fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+ fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
queueName, completionTimeout, completionSize).routeId("batchConsumer").startupOrder(10)
.to("mock:batches");
}
@@ -199,11 +201,11 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
.toF("sjms:%s", queueName + "B")
.end();
- fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+ fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
queueName + "A", completionTimeout, completionSize).routeId("batchConsumerA")
.to("mock:outA");
- fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+ fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
queueName + "B", completionTimeout, completionSize).routeId("batchConsumerB")
.to("mock:outB");
@@ -226,6 +228,39 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
assertFirstMessageBodyOfLength(mockOutB, messageCount);
}
+ @Test
+ public void testConsumption_rollback() throws Exception {
+ final int completionTimeout = 2000;
+ final int completionSize = 5;
+
+ final String queueName = getQueueName();
+ context.addRoutes(new TransactedSendHarness(queueName));
+ context.addRoutes(new RouteBuilder() {
+ public void configure() throws Exception {
+ fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+ queueName, completionTimeout, completionSize).routeId("batchConsumer").startupOrder(10)
+ .to("mock:batches");
+ }
+ });
+ context.start();
+
+ int messageCount = 5;
+ MockEndpoint mockBatches = getMockEndpoint("mock:batches");
+ // the first time around, the batch should throw an exception
+ mockBatches.whenExchangeReceived(1, new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ throw new RuntimeException("Boom!");
+ }
+ });
+ // so the batch should be processed twice due to redelivery
+ mockBatches.expectedMessageCount(2);
+
+ template.sendBody("direct:in", generateStrings(messageCount));
+ mockBatches.assertIsSatisfied();
+
+ }
+
private void assertFirstMessageBodyOfLength(MockEndpoint mockEndpoint, int expectedLength) {
Exchange exchange = mockEndpoint.getExchanges().get(0);
assertEquals(expectedLength, exchange.getIn().getBody(List.class).size());
@@ -233,7 +268,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
private String getQueueName() {
SimpleDateFormat sdf = new SimpleDateFormat("yyMMddhhmmss");
- return "sjmsbatch-" + sdf.format(new Date());
+ return "sjms-batch-" + sdf.format(new Date());
}
private String[] generateStrings(int messageCount) {
http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
index 7a75e76..c97d0dd 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.sjms.batch;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
+import org.apache.camel.FailedToCreateProducerException;
+import org.apache.camel.FailedToCreateRouteException;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.sjms.SjmsComponent;
import org.apache.camel.impl.DefaultCamelContext;
@@ -68,7 +70,7 @@ public class SjmsBatchEndpointTest extends CamelTestSupport {
sjmsBatchComponent.setConnectionFactory(connectionFactory);
CamelContext context = new DefaultCamelContext(registry);
- context.addComponent("sjmsbatch", sjmsBatchComponent);
+ context.addComponent("sjms-batch", sjmsBatchComponent);
context.addComponent("sjms", sjmsComponent);
return context;
@@ -79,11 +81,11 @@ public class SjmsBatchEndpointTest extends CamelTestSupport {
return true;
}
- @Test(expected = org.apache.camel.FailedToCreateProducerException.class)
+ @Test(expected = FailedToCreateProducerException.class)
public void testProducerFailure() throws Exception {
context.addRoutes(new RouteBuilder() {
public void configure() throws Exception {
- from("direct:in").to("sjmsbatch:testQueue");
+ from("direct:in").to("sjms-batch:testQueue");
}
});
context.start();
@@ -94,7 +96,7 @@ public class SjmsBatchEndpointTest extends CamelTestSupport {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("sjmsbatch:in?aggregationStrategy=#aggStrategy&pollDuration=-1")
+ from("sjms-batch:in?aggregationStrategy=#aggStrategy&pollDuration=-1")
.to("mock:out");
}
});
@@ -106,11 +108,22 @@ public class SjmsBatchEndpointTest extends CamelTestSupport {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("sjmsbatch:in?aggregationStrategy=#aggStrategy&consumerCount=-1")
+ from("sjms-batch:in?aggregationStrategy=#aggStrategy&consumerCount=-1")
.to("mock:out");
}
});
context.start();
}
+ @Test(expected = FailedToCreateRouteException.class)
+ public void testConsumer_topic() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("sjms-batch:topic:in?aggregationStrategy=#aggStrategy")
+ .to("mock:out");
+ }
+ });
+ context.start();
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java
new file mode 100644
index 0000000..3fb4968
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.jms;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author jkorab
+ */
+public class DestinationNameParserTest {
+
+ @Test
+ public void testIsTopic() throws Exception {
+ DestinationNameParser parser = new DestinationNameParser();
+ assertTrue(parser.isTopic("topic:foo"));
+ assertFalse(parser.isTopic("queue:bar"));
+ assertFalse(parser.isTopic("bar"));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testIsTopic_nullDestinationName() throws Exception {
+ DestinationNameParser parser = new DestinationNameParser();
+ parser.isTopic(null);
+ }
+
+ @Test
+ public void testGetShortName() throws Exception {
+ DestinationNameParser parser = new DestinationNameParser();
+ assertEquals("foo", parser.getShortName("topic:foo"));
+ assertFalse("bar", parser.isTopic("queue:bar"));
+ assertFalse("bar", parser.isTopic("bar"));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testGetShortName_nullDestinationName() throws Exception {
+ DestinationNameParser parser = new DestinationNameParser();
+ parser.getShortName(null);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java
index c82842a..beef0a9 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java
@@ -16,14 +16,14 @@
*/
package org.apache.camel.component.sjms.producer;
-import java.util.concurrent.TimeUnit;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.EndpointInject;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.sjms.support.JmsTestSupport;
import org.junit.Test;
@@ -33,12 +33,14 @@ public class QueueProducerQoSTest extends JmsTestSupport {
private static final String TEST_INOUT_DESTINATION_NAME = "queue.producer.test.qos.inout";
private static final String EXPIRED_MESSAGE_ROUTE_ID = "expiredAdvisoryRoute";
+ public static final String MOCK_EXPIRED_ADVISORY = "mock:expiredAdvisory";
+
+ @EndpointInject(uri = MOCK_EXPIRED_ADVISORY)
+ MockEndpoint mockExpiredAdvisory;
@Test
public void testInOutQueueProducerTTL() throws Exception {
-
- NotifyBuilder expireMatcher = new NotifyBuilder(context)
- .fromRoute(EXPIRED_MESSAGE_ROUTE_ID).whenCompleted(1).create();
+ mockExpiredAdvisory.expectedMessageCount(1);
String endpoint = String.format("sjms:queue:%s?ttl=1000&exchangePattern=InOut&responseTimeOut=500", TEST_INOUT_DESTINATION_NAME);
@@ -51,8 +53,7 @@ public class QueueProducerQoSTest extends JmsTestSupport {
// we're just interested in the message becoming expired
}
- // we should delay a bit so broker can run its expiration processes...
- assertFalse(expireMatcher.matches(2, TimeUnit.SECONDS));
+ assertMockEndpointsSatisfied();
DestinationViewMBean queue = getQueueMBean(TEST_INOUT_DESTINATION_NAME);
assertEquals("There were unexpected messages left in the queue: " + TEST_INOUT_DESTINATION_NAME,
@@ -61,14 +62,12 @@ public class QueueProducerQoSTest extends JmsTestSupport {
@Test
public void testInOnlyQueueProducerTTL() throws Exception {
- NotifyBuilder expireMatcher = new NotifyBuilder(context)
- .fromRoute(EXPIRED_MESSAGE_ROUTE_ID).whenCompleted(1).create();
+ mockExpiredAdvisory.expectedMessageCount(1);
String endpoint = String.format("sjms:queue:%s?ttl=1000", TEST_INONLY_DESTINATION_NAME);
template.sendBody(endpoint, "test message");
- // we should delay a bit so broker can run its expiration processes...
- assertFalse(expireMatcher.matches(2, TimeUnit.SECONDS));
+ assertMockEndpointsSatisfied();
DestinationViewMBean queue = getQueueMBean(TEST_INONLY_DESTINATION_NAME);
assertEquals("There were unexpected messages left in the queue: " + TEST_INONLY_DESTINATION_NAME,
@@ -102,7 +101,7 @@ public class QueueProducerQoSTest extends JmsTestSupport {
public void configure() throws Exception {
from("sjms:topic:ActiveMQ.Advisory.Expired.Queue.>")
.routeId(EXPIRED_MESSAGE_ROUTE_ID)
- .to("mock:expiredAdvisory");
+ .to(MOCK_EXPIRED_ADVISORY);
}
};
}
[2/3] camel git commit: Merged in sjms-batch component.
Posted by da...@apache.org.
Merged in sjms-batch component.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ab1d1dd7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ab1d1dd7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ab1d1dd7
Branch: refs/heads/master
Commit: ab1d1dd78fe53edb50c4ede447e4ac5a55ee2ac9
Parents: 65f9a3a
Author: jkorab <ja...@gmail.com>
Authored: Thu Jul 16 11:32:29 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Jul 17 14:56:22 2015 +0200
----------------------------------------------------------------------
components/camel-sjms/pom.xml | 6 +-
.../component/sjms/batch/SessionCompletion.java | 61 ++++
.../sjms/batch/SjmsBatchComponent.java | 53 ++++
.../component/sjms/batch/SjmsBatchConsumer.java | 306 +++++++++++++++++++
.../component/sjms/batch/SjmsBatchEndpoint.java | 133 ++++++++
.../sjms/consumer/AbstractMessageHandler.java | 3 +-
.../component/sjms/jms/JmsMessageHelper.java | 21 +-
.../org/apache/camel/component/sjms-batch | 18 ++
.../sjms/batch/EmbeddedActiveMQBroker.java | 74 +++++
.../sjms/batch/ListAggregationStrategy.java | 43 +++
.../sjms/batch/SjmsBatchConsumerTest.java | 247 +++++++++++++++
.../sjms/batch/SjmsBatchEndpointTest.java | 116 +++++++
12 files changed, 1075 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-sjms/pom.xml b/components/camel-sjms/pom.xml
index 9778517..6977796 100644
--- a/components/camel-sjms/pom.xml
+++ b/components/camel-sjms/pom.xml
@@ -32,7 +32,8 @@
<properties>
<camel.osgi.export.pkg>
org.apache.camel.component.sjms,
- org.apache.camel.component.sjms.jms
+ org.apache.camel.component.sjms.jms,
+ org.apache.camel.component.sjms.batch
</camel.osgi.export.pkg>
<camel.osgi.private.pkg>
org.apache.camel.component.sjms.consumer,
@@ -41,7 +42,8 @@
org.apache.camel.component.sjms.tx
</camel.osgi.private.pkg>
<camel.osgi.export.service>
- org.apache.camel.spi.ComponentResolver;component=sjms
+ org.apache.camel.spi.ComponentResolver;component=sjms,
+ org.apache.camel.spi.ComponentResolver;component=sjms-batch
</camel.osgi.export.service>
</properties>
http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java
new file mode 100644
index 0000000..27f06e6
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.Synchronization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+/**
+ * @author jkorab
+ */
+class SessionCompletion implements Synchronization {
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ private final Session session;
+
+ public SessionCompletion(Session session) {
+ assert (session != null);
+ this.session = session;
+ }
+
+ @Override
+ public void onComplete(Exchange exchange) {
+ try {
+ log.debug("Committing");
+ session.commit();
+ } catch (JMSException ex) {
+ log.error("Exception caught while committing: {}", ex.getMessage());
+ exchange.setException(ex);
+ }
+ }
+
+ @Override
+ public void onFailure(Exchange exchange) {
+ try {
+ log.debug("Rolling back");
+ session.rollback();
+ } catch (JMSException ex) {
+ log.error("Exception caught while rolling back: {}", ex.getMessage());
+ exchange.setException(ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
new file mode 100644
index 0000000..04b875d
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.UriEndpointComponent;
+import org.apache.camel.util.ObjectHelper;
+
+import javax.jms.ConnectionFactory;
+import java.util.Map;
+
+/**
+ * @author jkorab
+ */
+public class SjmsBatchComponent extends UriEndpointComponent {
+
+ private ConnectionFactory connectionFactory;
+
+ public SjmsBatchComponent() {
+ super(SjmsBatchEndpoint.class);
+ }
+
+ @Override
+ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+ ObjectHelper.notNull(connectionFactory, "connectionFactory is null");
+ SjmsBatchEndpoint sjmsBatchEndpoint = new SjmsBatchEndpoint(uri, this, remaining);
+ setProperties(sjmsBatchEndpoint, parameters);
+ return sjmsBatchEndpoint;
+ }
+
+ public ConnectionFactory getConnectionFactory() {
+ return connectionFactory;
+ }
+
+ public void setConnectionFactory(ConnectionFactory connectionFactory) {
+ this.connectionFactory = connectionFactory;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
new file mode 100644
index 0000000..613d471
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.sjms.jms.JmsMessageHelper;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Date;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * @author jkorab
+ */
+public class SjmsBatchConsumer extends DefaultConsumer {
+ private static final boolean TRANSACTED = true;
+ private final Logger LOG = LoggerFactory.getLogger(SjmsBatchConsumer.class);
+
+ private final SjmsBatchEndpoint sjmsBatchEndpoint;
+ private final AggregationStrategy aggregationStrategy;
+
+ private final int completionSize;
+ private final int completionTimeout;
+ private final int consumerCount;
+ private final int pollDuration;
+
+ private final ConnectionFactory connectionFactory;
+ private final String destinationName;
+ private final Processor processor;
+
+ private static AtomicInteger batchCount = new AtomicInteger();
+ private static AtomicLong messagesReceived = new AtomicLong();
+ private static AtomicLong messagesProcessed = new AtomicLong();
+ private ExecutorService jmsConsumerExecutors;
+
+ public SjmsBatchConsumer(SjmsBatchEndpoint sjmsBatchEndpoint, Processor processor) {
+ super(sjmsBatchEndpoint, processor);
+
+ this.sjmsBatchEndpoint = ObjectHelper.notNull(sjmsBatchEndpoint, "batchJmsEndpoint");
+ this.processor = ObjectHelper.notNull(processor, "processor");
+
+ destinationName = ObjectHelper.notEmpty(sjmsBatchEndpoint.getDestinationName(), "destinationName");
+
+ completionSize = sjmsBatchEndpoint.getCompletionSize();
+ completionTimeout = sjmsBatchEndpoint.getCompletionTimeout();
+ pollDuration = sjmsBatchEndpoint.getPollDuration();
+ if (pollDuration < 0) {
+ throw new IllegalArgumentException("pollDuration must be 0 or greater");
+ }
+
+ this.aggregationStrategy = ObjectHelper.notNull(sjmsBatchEndpoint.getAggregationStrategy(), "aggregationStrategy");
+
+ consumerCount = sjmsBatchEndpoint.getConsumerCount();
+ if (consumerCount <= 0) {
+ throw new IllegalArgumentException("consumerCount must be greater than 0");
+ }
+
+ SjmsBatchComponent sjmsBatchComponent = (SjmsBatchComponent) sjmsBatchEndpoint.getComponent();
+ connectionFactory = ObjectHelper.notNull(sjmsBatchComponent.getConnectionFactory(), "jmsBatchComponent.connectionFactory");
+ }
+
+ @Override
+ public Endpoint getEndpoint() {
+ return sjmsBatchEndpoint;
+ }
+
+ private final AtomicBoolean running = new AtomicBoolean(true);
+ private final AtomicReference<CountDownLatch> consumersShutdownLatchRef = new AtomicReference<>();
+
+ private Connection connection;
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+
+ // start up a shared connection
+ try {
+ connection = connectionFactory.createConnection();
+ connection.start();
+ } catch (JMSException ex) {
+ LOG.error("Exception caught closing connection: {}", getStackTrace(ex));
+ return;
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Starting " + consumerCount + " consumer(s) for " + destinationName + ":" + completionSize);
+ }
+ consumersShutdownLatchRef.set(new CountDownLatch(consumerCount));
+
+ jmsConsumerExecutors = getEndpoint().getCamelContext().getExecutorServiceManager()
+ .newFixedThreadPool(this, "SjmsBatchConsumer", consumerCount);
+ for (int i = 0; i < consumerCount; i++) {
+ jmsConsumerExecutors.execute(new BatchConsumptionLoop());
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+ running.set(false);
+ CountDownLatch consumersShutdownLatch = consumersShutdownLatchRef.get();
+ if (consumersShutdownLatch != null) {
+ LOG.info("Stop signalled, waiting on consumers to shut down");
+ if (consumersShutdownLatch.await(60, TimeUnit.SECONDS)) {
+ LOG.warn("Timeout waiting on consumer threads to signal completion - shutting down");
+ } else {
+ LOG.info("All consumers have shut down");
+ }
+ } else {
+ LOG.info("Stop signalled while there are no consumers yet, so no need to wait for consumers");
+ }
+
+ try {
+ LOG.debug("Shutting down JMS connection");
+ connection.close();
+ } catch (JMSException jex) {
+ LOG.error("Exception caught closing connection: {}", getStackTrace(jex));
+ }
+
+ getEndpoint().getCamelContext().getExecutorServiceManager()
+ .shutdown(jmsConsumerExecutors);
+ }
+
+ private String getStackTrace(Exception ex) {
+ StringWriter writer = new StringWriter();
+ ex.printStackTrace(new PrintWriter(writer));
+ return writer.toString();
+ }
+
+ private class BatchConsumptionLoop implements Runnable {
+ @Override
+ public void run() {
+ try {
+ // a batch corresponds to a single session that will be committed or rolled back by a background thread
+ final Session session = connection.createSession(TRANSACTED, Session.CLIENT_ACKNOWLEDGE);
+ try {
+ // destinationName only creates queues; there is no additional value to be gained
+ // by transactionally consuming topic messages in batches
+ Queue queue = session.createQueue(destinationName);
+ MessageConsumer consumer = session.createConsumer(queue);
+ try {
+ consumeBatchesOnLoop(session, consumer);
+ } finally {
+ try {
+ consumer.close();
+ } catch (JMSException ex2) {
+ log.error("Exception caught closing consumer: {}", ex2.getMessage());
+
+ }
+ }
+ } finally {
+ try {
+ session.close();
+ } catch (JMSException ex1) {
+ log.error("Exception caught closing session: {}", ex1.getMessage());
+ }
+ }
+ } catch (JMSException ex) {
+ // from loop
+ LOG.error("Exception caught consuming from {}: {}", destinationName, getStackTrace(ex));
+ } finally {
+ // indicate that we have shut down
+ CountDownLatch consumersShutdownLatch = consumersShutdownLatchRef.get();
+ consumersShutdownLatch.countDown();
+ }
+ }
+
+ private void consumeBatchesOnLoop(Session session, MessageConsumer consumer) throws JMSException {
+ final boolean usingTimeout = completionTimeout > 0;
+
+ batchConsumption:
+ while (running.get()) {
+ int messageCount = 0;
+
+ // reset the clock counters
+ long timeElapsed = 0;
+ long startTime = 0;
+ Exchange aggregatedExchange = null;
+
+ batch:
+ while ((completionSize <= 0) || (messageCount < completionSize)) {
+ // check periodically to see whether we should be shutting down
+ long waitTime = (usingTimeout && (timeElapsed > 0))
+ ? getReceiveWaitTime(timeElapsed)
+ : pollDuration;
+ Message message = consumer.receive(waitTime);
+
+ if (running.get()) { // no interruptions received
+ if (message == null) {
+ // timed out, no message received
+ LOG.trace("No message received");
+ } else {
+ if ((usingTimeout) && (messageCount == 0)) { // this is the first message
+ startTime = new Date().getTime(); // start counting down the period for this batch
+ }
+ messageCount++;
+ LOG.debug("Message received: {}", messageCount);
+ if ((message instanceof ObjectMessage)
+ || (message instanceof TextMessage)) {
+ Exchange exchange = JmsMessageHelper.createExchange(message, getEndpoint());
+ aggregatedExchange = aggregationStrategy.aggregate(aggregatedExchange, exchange);
+ aggregatedExchange.setProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, messageCount);
+ } else {
+ throw new IllegalArgumentException("Unexpected message type: "
+ + message.getClass().toString());
+ }
+ }
+
+ if ((usingTimeout) && (startTime > 0)) {
+ // a batch has been started, check whether it should be timed out
+ long currentTime = new Date().getTime();
+ timeElapsed = currentTime - startTime;
+
+ if (timeElapsed > completionTimeout) {
+ // batch finished by timeout
+ break batch;
+ }
+ }
+
+ } else {
+ LOG.info("Shutdown signal received - rolling batch back");
+ session.rollback();
+ break batchConsumption;
+ }
+ } // batch
+ assert (aggregatedExchange != null);
+ process(aggregatedExchange, session);
+ }
+ }
+
+ /**
+ * Determine the time that a call to {@link MessageConsumer#receive()} should wait given the time that has elapsed for this batch.
+ * @param timeElapsed The time that has elapsed.
+ * @return The shorter of the time remaining or poll duration.
+ */
+ private long getReceiveWaitTime(long timeElapsed) {
+ long timeRemaining = getTimeRemaining(timeElapsed);
+
+ // wait for the shorter of the time remaining or the poll duration
+ if (timeRemaining <= 0) { // ensure that the thread doesn't wait indefinitely
+ timeRemaining = 1;
+ }
+ final long waitTime = (timeRemaining > pollDuration) ? pollDuration : timeRemaining;
+
+ LOG.debug("waiting for {}", waitTime);
+ return waitTime;
+ }
+
+ private long getTimeRemaining(long timeElapsed) {
+ long timeRemaining = completionTimeout - timeElapsed;
+ if (LOG.isDebugEnabled() && (timeElapsed > 0)) {
+ LOG.debug("Time remaining this batch: {}", timeRemaining);
+ }
+ return timeRemaining;
+ }
+
+ private void process(Exchange exchange, Session session) {
+ assert (exchange != null);
+ int id = batchCount.getAndIncrement();
+ int batchSize = exchange.getProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, Integer.class);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing batch:size={}:total={}", batchSize, messagesReceived.addAndGet(batchSize));
+ }
+
+ Synchronization committing = new SessionCompletion(session);
+ exchange.addOnCompletion(committing);
+ try {
+ processor.process(exchange);
+ LOG.debug("Completed processing[{}]:total={}", id, messagesProcessed.addAndGet(batchSize));
+ } catch (Exception e) {
+ LOG.error("Error processing exchange: {}", e.getMessage());
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
new file mode 100644
index 0000000..afd5cbe
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+
+/**
+ * @author jkorab
+ */
+@UriEndpoint(scheme = "sjmsBatch", title = "Simple JMS Batch Component", syntax = "sjms-batch:destinationName?aggregationStrategy=#aggStrategy", consumerClass = SjmsBatchComponent.class, label = "messaging")
+public class SjmsBatchEndpoint extends DefaultEndpoint {
+
+ public static final int DEFAULT_COMPLETION_SIZE = 200; // the default dispatch queue size in ActiveMQ
+ public static final int DEFAULT_COMPLETION_TIMEOUT = 500;
+ public static final String PROPERTY_BATCH_SIZE = "CamelSjmsBatchSize";
+
+ @UriPath
+ @Metadata(required = "true")
+ private String destinationName;
+
+ @UriParam(label = "consumer", defaultValue = "1", description = "The number of JMS sessions to consume from")
+ private Integer consumerCount = 1;
+
+ @UriParam(label = "consumer", defaultValue = "200",
+ description = "The number of messages consumed at which the batch will be completed")
+ private Integer completionSize = DEFAULT_COMPLETION_SIZE;
+
+ @UriParam(label = "consumer", defaultValue = "500",
+ description = "The timeout from receipt of the first first message when the batch will be completed")
+ private Integer completionTimeout = DEFAULT_COMPLETION_TIMEOUT;
+
+ @UriParam(label = "consumer", defaultValue = "1000",
+ description = "The duration in milliseconds of each poll for messages. " +
+ "completionTimeOut will be used if it is shorter and a batch has started.")
+ private Integer pollDuration = 1000;
+
+ @Metadata(required = "true")
+ @UriParam(label = "consumer", description = "A #-reference to an AggregationStrategy visible to Camel")
+ private AggregationStrategy aggregationStrategy;
+
+ public SjmsBatchEndpoint() {}
+
+ public SjmsBatchEndpoint(String endpointUri, Component component, String remaining) {
+ super(endpointUri, component);
+ this.destinationName = remaining;
+ }
+
+ @Override
+ public boolean isSingleton() {
+ return true;
+ }
+
+ @Override
+ public Producer createProducer() throws Exception {
+ throw new UnsupportedOperationException("Cannot produce though a " + SjmsBatchEndpoint.class.getName());
+ }
+
+ @Override
+ public Consumer createConsumer(Processor processor) throws Exception {
+ return new SjmsBatchConsumer(this, processor);
+ }
+
+ public AggregationStrategy getAggregationStrategy() {
+ return aggregationStrategy;
+ }
+
+ public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
+ this.aggregationStrategy = aggregationStrategy;
+ }
+
+ public Integer getCompletionSize() {
+ return completionSize;
+ }
+
+ public void setCompletionSize(Integer completionSize) {
+ this.completionSize = completionSize;
+ }
+
+ public Integer getCompletionTimeout() {
+ return completionTimeout;
+ }
+
+ public void setCompletionTimeout(Integer completionTimeout) {
+ this.completionTimeout = completionTimeout;
+ }
+
+ public String getDestinationName() {
+ return destinationName;
+ }
+
+ public void setDestinationName(String destinationName) {
+ this.destinationName = destinationName;
+ }
+
+ public Integer getConsumerCount() {
+ return consumerCount;
+ }
+
+ public void setConsumerCount(Integer consumerCount) {
+ this.consumerCount = consumerCount;
+ }
+
+ public Integer getPollDuration() {
+ return pollDuration;
+ }
+
+ public void setPollDuration(Integer pollDuration) {
+ this.pollDuration = pollDuration;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
index c3c5e55..1598a43 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
@@ -70,7 +70,8 @@ public abstract class AbstractMessageHandler implements MessageListener {
public void onMessage(Message message) {
RuntimeCamelException rce = null;
try {
- final DefaultExchange exchange = (DefaultExchange) JmsMessageHelper.createExchange(message, getEndpoint());
+ SjmsEndpoint endpoint = (SjmsEndpoint) getEndpoint();
+ final DefaultExchange exchange = (DefaultExchange) JmsMessageHelper.createExchange(message, endpoint, endpoint.getJmsKeyFormatStrategy());
log.debug("Processing Exchange.id:{}", exchange.getExchangeId());
http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
index dcccd9b..79787c9 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
@@ -62,8 +62,23 @@ public final class JmsMessageHelper implements JmsConstants {
}
public static Exchange createExchange(Message message, Endpoint endpoint) {
+ return createExchange(message, endpoint, null);
+ }
+
+ /**
+ * Creates an Exchange from a JMS Message.
+ * @param message The JMS message.
+ * @param endpoint The Endpoint to use to create the Exchange object.
+ * @param keyFormatStrategy the a {@link KeyFormatStrategy} to used to
+ * format keys in a JMS 1.1 compliant manner. If null the
+ * {@link DefaultJmsKeyFormatStrategy} will be used.
+ * @return Populated Exchange.
+ */
+ public static Exchange createExchange(Message message, Endpoint endpoint, KeyFormatStrategy keyFormatStrategy) {
Exchange exchange = endpoint.createExchange();
- return populateExchange(message, exchange, false, ((SjmsEndpoint)endpoint).getJmsKeyFormatStrategy());
+ KeyFormatStrategy initialisedKeyFormatStrategy = (keyFormatStrategy == null)
+ ? new DefaultJmsKeyFormatStrategy() : keyFormatStrategy;
+ return populateExchange(message, exchange, false, initialisedKeyFormatStrategy);
}
@SuppressWarnings("unchecked")
@@ -222,11 +237,11 @@ public final class JmsMessageHelper implements JmsConstants {
* @param jmsMessage the {@link Message} to add or update the headers on
* @param messageHeaders a {@link Map} of String/Object pairs
* @param keyFormatStrategy the a {@link KeyFormatStrategy} to used to
- * format keys in a JMS 1.1 compliant manner. If null the
- * {@link DefaultJmsKeyFormatStrategy} will be used.
+ * format keys in a JMS 1.1 compliant manner.
* @return {@link Message}
*/
private static Message setJmsMessageHeaders(final Message jmsMessage, Map<String, Object> messageHeaders, KeyFormatStrategy keyFormatStrategy) throws IllegalHeaderException {
+
Map<String, Object> headers = new HashMap<String, Object>(messageHeaders);
for (final Map.Entry<String, Object> entry : headers.entrySet()) {
String headerName = entry.getKey();
http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/resources/META-INF/services/org/apache/camel/component/sjms-batch
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/resources/META-INF/services/org/apache/camel/component/sjms-batch b/components/camel-sjms/src/main/resources/META-INF/services/org/apache/camel/component/sjms-batch
new file mode 100644
index 0000000..9ee9e4c
--- /dev/null
+++ b/components/camel-sjms/src/main/resources/META-INF/services/org/apache/camel/component/sjms-batch
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.component.sjms.batch.SjmsBatchComponent
http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/EmbeddedActiveMQBroker.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/EmbeddedActiveMQBroker.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/EmbeddedActiveMQBroker.java
new file mode 100644
index 0000000..fd1ed27
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/EmbeddedActiveMQBroker.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
+import org.apache.camel.test.AvailablePortFinder;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JUnit Test aspect that creates an embedded ActiveMQ broker at the beginning of each test and shuts it down after.
+ */
+public class EmbeddedActiveMQBroker extends ExternalResource {
+
+ private final Logger log = LoggerFactory.getLogger(EmbeddedActiveMQBroker.class);
+ private final String brokerId;
+ private BrokerService brokerService;
+ private final String tcpConnectorUri;
+
+ public EmbeddedActiveMQBroker(String brokerId) {
+ if ((brokerId == null) || (brokerId.isEmpty())) {
+ throw new IllegalArgumentException("brokerId is empty");
+ }
+ this.brokerId = brokerId;
+ tcpConnectorUri = "tcp://localhost:" + AvailablePortFinder.getNextAvailable();
+
+ brokerService = new BrokerService();
+ brokerService.setBrokerId(brokerId);
+ brokerService.setPersistent(false);
+ brokerService.setUseJmx(false);
+ try {
+ brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter());
+ brokerService.addConnector(tcpConnectorUri);
+ } catch (Exception e) {
+ throw new RuntimeException("Problem creating brokerService", e);
+ }
+ }
+
+ @Override
+ protected void before() throws Throwable {
+ log.info("Starting embedded broker[{}] on {}", brokerId, tcpConnectorUri);
+ brokerService.start();
+ }
+
+ @Override
+ protected void after() {
+ try {
+ log.info("Stopping embedded broker[{}]", brokerId);
+ brokerService.stop();
+ } catch (Exception e) {
+ throw new RuntimeException("Exception shutting down broker service", e);
+ }
+ }
+
+ public String getTcpConnectorUri() {
+ return tcpConnectorUri;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java
new file mode 100644
index 0000000..39774b2
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author jkorab
+ */
+public class ListAggregationStrategy implements AggregationStrategy {
+ @Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ String body = newExchange.getIn().getBody(String.class);
+ if (oldExchange == null) {
+ List<String> list = new ArrayList<String>();
+ list.add(body);
+ newExchange.getIn().setBody(list);
+ return newExchange;
+ } else {
+ List<String> list = oldExchange.getIn().getBody(List.class);
+ list.add(body);
+ return oldExchange;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
new file mode 100644
index 0000000..58fc717
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.camel.util.StopWatch;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.ConnectionFactory;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * @author jkorab
+ */
+public class SjmsBatchConsumerTest extends CamelTestSupport {
+ private final Logger LOG = LoggerFactory.getLogger(SjmsBatchConsumerTest.class);
+
+ @Rule
+ public EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker("localhost");
+
+ @Override
+ public CamelContext createCamelContext() throws Exception {
+ SimpleRegistry registry = new SimpleRegistry();
+ registry.put("testStrategy", new ListAggregationStrategy());
+ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTcpConnectorUri());
+
+ SjmsComponent sjmsComponent = new SjmsComponent();
+ sjmsComponent.setConnectionFactory(connectionFactory);
+
+ SjmsBatchComponent sjmsBatchComponent = new SjmsBatchComponent();
+ sjmsBatchComponent.setConnectionFactory(connectionFactory);
+
+ CamelContext context = new DefaultCamelContext(registry);
+ context.addComponent("sjms", sjmsComponent);
+ context.addComponent("sjmsbatch", sjmsBatchComponent);
+ return context;
+ }
+
+ private static class TransactedSendHarness extends RouteBuilder {
+ private final String queueName;
+
+ public TransactedSendHarness(String queueName) {
+ this.queueName = queueName;
+ }
+
+ @Override
+ public void configure() throws Exception {
+ from("direct:in").routeId("harness").startupOrder(20)
+ .split(body())
+ .toF("sjms:queue:%s?transacted=true", queueName)
+ .to("mock:before")
+ .end();
+ }
+ }
+
+ @Override
+ public boolean isUseAdviceWith() {
+ return true;
+ }
+
+ @Test
+ public void testConsumption() throws Exception {
+
+ final int messageCount = 10000;
+ final int consumerCount = 5;
+
+ final String queueName = getQueueName();
+ context.addRoutes(new TransactedSendHarness(queueName));
+ context.addRoutes(new RouteBuilder() {
+ public void configure() throws Exception {
+
+ int completionTimeout = 1000;
+ int completionSize = 200;
+
+ fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s" +
+ "&consumerCount=%s&aggregationStrategy=#testStrategy",
+ queueName, completionTimeout, completionSize, consumerCount)
+ .routeId("batchConsumer").startupOrder(10).autoStartup(false)
+ .split(body())
+ .to("mock:split");
+ }
+ });
+ context.start();
+
+ MockEndpoint mockBefore = getMockEndpoint("mock:before");
+ mockBefore.setExpectedMessageCount(messageCount);
+
+ MockEndpoint mockSplit = getMockEndpoint("mock:split");
+ mockSplit.setExpectedMessageCount(messageCount);
+
+ LOG.info("Sending messages");
+ template.sendBody("direct:in", generateStrings(messageCount));
+ LOG.info("Send complete");
+
+ StopWatch stopWatch = new StopWatch();
+ context.startRoute("batchConsumer");
+ assertMockEndpointsSatisfied();
+ long time = stopWatch.stop();
+
+ LOG.info("Processed {} messages in {} ms", messageCount, time);
+ LOG.info("Average throughput {} msg/s", (long) (messageCount / (time / 1000d)));
+ }
+
+ @Test
+ public void testConsumption_completionSize() throws Exception {
+ final int completionSize = 5;
+ final int completionTimeout = -1; // size-based only
+
+ final String queueName = getQueueName();
+ context.addRoutes(new TransactedSendHarness(queueName));
+ context.addRoutes(new RouteBuilder() {
+ public void configure() throws Exception {
+ fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+ queueName, completionTimeout, completionSize).routeId("batchConsumer").startupOrder(10)
+ .log(LoggingLevel.DEBUG, "${body.size}")
+ .to("mock:batches");
+ }
+ });
+ context.start();
+
+ int messageCount = 100;
+ MockEndpoint mockBatches = getMockEndpoint("mock:batches");
+ mockBatches.expectedMessageCount(messageCount / completionSize);
+
+ template.sendBody("direct:in", generateStrings(messageCount));
+ mockBatches.assertIsSatisfied();
+ }
+
+ @Test
+ public void testConsumption_completionTimeout() throws Exception {
+ final int completionTimeout = 2000;
+ final int completionSize = -1; // timeout-based only
+
+ final String queueName = getQueueName();
+ context.addRoutes(new TransactedSendHarness(queueName));
+ context.addRoutes(new RouteBuilder() {
+ public void configure() throws Exception {
+ fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+ queueName, completionTimeout, completionSize).routeId("batchConsumer").startupOrder(10)
+ .to("mock:batches");
+ }
+ });
+ context.start();
+
+ int messageCount = 50;
+ assertTrue(messageCount < SjmsBatchEndpoint.DEFAULT_COMPLETION_SIZE);
+ MockEndpoint mockBatches = getMockEndpoint("mock:batches");
+ mockBatches.expectedMessageCount(1); // everything batched together
+
+ template.sendBody("direct:in", generateStrings(messageCount));
+ mockBatches.assertIsSatisfied();
+ assertFirstMessageBodyOfLength(mockBatches, messageCount);
+ }
+
+ /**
+ * Checks whether multiple consumer endpoints can operate in parallel.
+ */
+ @Test
+ public void testConsumption_multipleConsumerEndpoints() throws Exception {
+ final int completionTimeout = 2000;
+ final int completionSize = 5;
+
+ final String queueName = getQueueName();
+ context.addRoutes(new RouteBuilder() {
+ public void configure() throws Exception {
+
+ from("direct:in")
+ .split().body()
+ .multicast()
+ .toF("sjms:%s", queueName + "A")
+ .toF("sjms:%s", queueName + "B")
+ .end();
+
+ fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+ queueName + "A", completionTimeout, completionSize).routeId("batchConsumerA")
+ .to("mock:outA");
+
+ fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+ queueName + "B", completionTimeout, completionSize).routeId("batchConsumerB")
+ .to("mock:outB");
+
+ }
+ });
+ context.start();
+
+ int messageCount = 5;
+
+ assertTrue(messageCount < SjmsBatchEndpoint.DEFAULT_COMPLETION_SIZE);
+ MockEndpoint mockOutA = getMockEndpoint("mock:outA");
+ mockOutA.expectedMessageCount(1); // everything batched together
+ MockEndpoint mockOutB = getMockEndpoint("mock:outB");
+ mockOutB.expectedMessageCount(1); // everything batched together
+
+ template.sendBody("direct:in", generateStrings(messageCount));
+ assertMockEndpointsSatisfied();
+
+ assertFirstMessageBodyOfLength(mockOutA, messageCount);
+ assertFirstMessageBodyOfLength(mockOutB, messageCount);
+ }
+
+ private void assertFirstMessageBodyOfLength(MockEndpoint mockEndpoint, int expectedLength) {
+ Exchange exchange = mockEndpoint.getExchanges().get(0);
+ assertEquals(expectedLength, exchange.getIn().getBody(List.class).size());
+ }
+
+ private String getQueueName() {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyMMddhhmmss");
+ return "sjmsbatch-" + sdf.format(new Date());
+ }
+
+ private String[] generateStrings(int messageCount) {
+ String[] strings = new String[messageCount];
+ for (int i = 0; i < messageCount; i++) {
+ strings[i] = "message:" + i;
+ }
+ return strings;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
new file mode 100644
index 0000000..7a75e76
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.camel.util.toolbox.AggregationStrategies;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * @author jkorab
+ */
+public class SjmsBatchEndpointTest extends CamelTestSupport {
+
+ // Create one embedded broker instance for the entire test, as we aren't actually
+ // going to send any messages to it; we just need it so that the ConnectionFactory
+ // has something local to connect to.
+ public static EmbeddedActiveMQBroker broker;
+
+ @BeforeClass
+ public static void setupBroker() {
+ broker = new EmbeddedActiveMQBroker("localhost");
+ try {
+ broker.before();
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+ }
+
+ @AfterClass
+ public static void shutDownBroker() {
+ broker.after();
+ }
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ SimpleRegistry registry = new SimpleRegistry();
+ registry.put("aggStrategy", AggregationStrategies.groupedExchange());
+
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
+ connectionFactory.setBrokerURL(broker.getTcpConnectorUri());
+
+ SjmsComponent sjmsComponent = new SjmsComponent();
+ sjmsComponent.setConnectionFactory(connectionFactory);
+
+ SjmsBatchComponent sjmsBatchComponent = new SjmsBatchComponent();
+ sjmsBatchComponent.setConnectionFactory(connectionFactory);
+
+ CamelContext context = new DefaultCamelContext(registry);
+ context.addComponent("sjmsbatch", sjmsBatchComponent);
+ context.addComponent("sjms", sjmsComponent);
+
+ return context;
+ }
+
+ @Override
+ public boolean isUseAdviceWith() {
+ return true;
+ }
+
+ @Test(expected = org.apache.camel.FailedToCreateProducerException.class)
+ public void testProducerFailure() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ public void configure() throws Exception {
+ from("direct:in").to("sjmsbatch:testQueue");
+ }
+ });
+ context.start();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testConsumer_negativePollDuration() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("sjmsbatch:in?aggregationStrategy=#aggStrategy&pollDuration=-1")
+ .to("mock:out");
+ }
+ });
+ context.start();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testConsumer_negativeConsumerCount() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("sjmsbatch:in?aggregationStrategy=#aggStrategy&consumerCount=-1")
+ .to("mock:out");
+ }
+ });
+ context.start();
+ }
+
+}
[3/3] camel git commit: Polished and fixed CS and a few other bits
Posted by da...@apache.org.
Polished and fixed CS and a few other bits
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/832a99c5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/832a99c5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/832a99c5
Branch: refs/heads/master
Commit: 832a99c546d4fca5510a88136de8fb2002c54d6f
Parents: 2ba152d
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Jul 17 15:25:15 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Jul 17 15:25:15 2015 +0200
----------------------------------------------------------------------
.../camel/component/sjms/SjmsEndpoint.java | 7 +-
.../component/sjms/batch/SessionCompletion.java | 22 ++--
.../sjms/batch/SjmsBatchComponent.java | 14 +--
.../component/sjms/batch/SjmsBatchConsumer.java | 79 +++++++-------
.../component/sjms/batch/SjmsBatchEndpoint.java | 104 +++++++++----------
.../sjms/jms/DestinationNameParser.java | 1 +
.../sjms/batch/ListAggregationStrategy.java | 12 +--
.../sjms/batch/SjmsBatchConsumerTest.java | 43 ++++----
.../sjms/batch/SjmsBatchEndpointTest.java | 11 +-
.../sjms/jms/DestinationNameParserTest.java | 4 +-
.../sjms/producer/QueueProducerQoSTest.java | 2 +-
11 files changed, 145 insertions(+), 154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
index 0e8d68a..6ffa513 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
@@ -22,7 +22,12 @@ import org.apache.camel.ExchangePattern;
import org.apache.camel.MultipleConsumersSupport;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
-import org.apache.camel.component.sjms.jms.*;
+import org.apache.camel.component.sjms.jms.ConnectionResource;
+import org.apache.camel.component.sjms.jms.DefaultDestinationCreationStrategy;
+import org.apache.camel.component.sjms.jms.DestinationCreationStrategy;
+import org.apache.camel.component.sjms.jms.DestinationNameParser;
+import org.apache.camel.component.sjms.jms.KeyFormatStrategy;
+import org.apache.camel.component.sjms.jms.SessionAcknowledgementType;
import org.apache.camel.component.sjms.producer.InOnlyProducer;
import org.apache.camel.component.sjms.producer.InOutProducer;
import org.apache.camel.impl.DefaultEndpoint;
http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java
index 27f06e6..cae90cb 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java
@@ -16,34 +16,32 @@
*/
package org.apache.camel.component.sjms.batch;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
import org.apache.camel.Exchange;
import org.apache.camel.spi.Synchronization;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.JMSException;
-import javax.jms.Session;
-
-/**
- * @author jkorab
- */
class SessionCompletion implements Synchronization {
- private final Logger log = LoggerFactory.getLogger(this.getClass());
+ private static final Logger LOG = LoggerFactory.getLogger(SessionCompletion.class);
private final Session session;
+ // TODO: add more details in the commit/rollback eg such as message id
+
public SessionCompletion(Session session) {
- assert (session != null);
this.session = session;
}
@Override
public void onComplete(Exchange exchange) {
try {
- log.debug("Committing");
+ LOG.debug("Committing");
session.commit();
} catch (JMSException ex) {
- log.error("Exception caught while committing: {}", ex.getMessage());
+ LOG.error("Exception caught while committing: {}", ex.getMessage());
exchange.setException(ex);
}
}
@@ -51,10 +49,10 @@ class SessionCompletion implements Synchronization {
@Override
public void onFailure(Exchange exchange) {
try {
- log.debug("Rolling back");
+ LOG.debug("Rolling back");
session.rollback();
} catch (JMSException ex) {
- log.error("Exception caught while rolling back: {}", ex.getMessage());
+ LOG.error("Exception caught while rolling back: {}", ex.getMessage());
exchange.setException(ex);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
index 04b875d..421fd8a 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
@@ -16,16 +16,13 @@
*/
package org.apache.camel.component.sjms.batch;
+import java.util.Map;
+import javax.jms.ConnectionFactory;
+
import org.apache.camel.Endpoint;
import org.apache.camel.impl.UriEndpointComponent;
import org.apache.camel.util.ObjectHelper;
-import javax.jms.ConnectionFactory;
-import java.util.Map;
-
-/**
- * @author jkorab
- */
public class SjmsBatchComponent extends UriEndpointComponent {
private ConnectionFactory connectionFactory;
@@ -36,7 +33,7 @@ public class SjmsBatchComponent extends UriEndpointComponent {
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
- ObjectHelper.notNull(connectionFactory, "connectionFactory is null");
+ ObjectHelper.notNull(connectionFactory, "connectionFactory");
SjmsBatchEndpoint sjmsBatchEndpoint = new SjmsBatchEndpoint(uri, this, remaining);
setProperties(sjmsBatchEndpoint, parameters);
return sjmsBatchEndpoint;
@@ -46,6 +43,9 @@ public class SjmsBatchComponent extends UriEndpointComponent {
return connectionFactory;
}
+ /**
+ * A ConnectionFactory is required to enable the SjmsBatchComponent.
+ */
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index ca47c7c..ee2b250 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -16,16 +16,6 @@
*/
package org.apache.camel.component.sjms.batch;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.component.sjms.jms.JmsMessageHelper;
-import org.apache.camel.impl.DefaultConsumer;
-import org.apache.camel.processor.aggregate.AggregationStrategy;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.*;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Date;
@@ -36,30 +26,47 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.sjms.jms.JmsMessageHelper;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-/**
- * @author jkorab
- */
public class SjmsBatchConsumer extends DefaultConsumer {
private static final boolean TRANSACTED = true;
- private final Logger LOG = LoggerFactory.getLogger(SjmsBatchConsumer.class);
+ private static final Logger LOG = LoggerFactory.getLogger(SjmsBatchConsumer.class);
+
+ // global counters, maybe they should be on component instead?
+ private static final AtomicInteger BATCH_COUNT = new AtomicInteger();
+ private static final AtomicLong MESSAGE_RECEIVED = new AtomicLong();
+ private static final AtomicLong MESSAGE_PROCESSED = new AtomicLong();
private final SjmsBatchEndpoint sjmsBatchEndpoint;
private final AggregationStrategy aggregationStrategy;
-
private final int completionSize;
private final int completionTimeout;
private final int consumerCount;
private final int pollDuration;
-
private final ConnectionFactory connectionFactory;
private final String destinationName;
private final Processor processor;
-
- private static AtomicInteger batchCount = new AtomicInteger();
- private static AtomicLong messagesReceived = new AtomicLong();
- private static AtomicLong messagesProcessed = new AtomicLong();
private ExecutorService jmsConsumerExecutors;
+ private final AtomicBoolean running = new AtomicBoolean(true);
+ private final AtomicReference<CountDownLatch> consumersShutdownLatchRef = new AtomicReference<>();
+ private Connection connection;
public SjmsBatchConsumer(SjmsBatchEndpoint sjmsBatchEndpoint, Processor processor) {
super(sjmsBatchEndpoint, processor);
@@ -92,11 +99,6 @@ public class SjmsBatchConsumer extends DefaultConsumer {
return sjmsBatchEndpoint;
}
- private final AtomicBoolean running = new AtomicBoolean(true);
- private final AtomicReference<CountDownLatch> consumersShutdownLatchRef = new AtomicReference<>();
-
- private Connection connection;
-
@Override
protected void doStart() throws Exception {
super.doStart();
@@ -145,8 +147,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
LOG.error("Exception caught closing connection: {}", getStackTrace(jex));
}
- getEndpoint().getCamelContext().getExecutorServiceManager()
- .shutdown(jmsConsumerExecutors);
+ getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(jmsConsumerExecutors);
}
private String getStackTrace(Exception ex) {
@@ -174,7 +175,6 @@ public class SjmsBatchConsumer extends DefaultConsumer {
consumer.close();
} catch (JMSException ex2) {
log.error("Exception caught closing consumer: {}", ex2.getMessage());
-
}
}
} finally {
@@ -197,7 +197,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
private void consumeBatchesOnLoop(Session session, MessageConsumer consumer) throws JMSException {
final boolean usingTimeout = completionTimeout > 0;
- batchConsumption:
+ batchConsumption:
while (running.get()) {
int messageCount = 0;
@@ -206,7 +206,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
long startTime = 0;
Exchange aggregatedExchange = null;
- batch:
+ batch:
while ((completionSize <= 0) || (messageCount < completionSize)) {
// check periodically to see whether we should be shutting down
long waitTime = (usingTimeout && (timeElapsed > 0))
@@ -219,7 +219,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
// timed out, no message received
LOG.trace("No message received");
} else {
- if ((usingTimeout) && (messageCount == 0)) { // this is the first message
+ if (usingTimeout && messageCount == 0) { // this is the first message
startTime = new Date().getTime(); // start counting down the period for this batch
}
messageCount++;
@@ -230,12 +230,11 @@ public class SjmsBatchConsumer extends DefaultConsumer {
aggregatedExchange = aggregationStrategy.aggregate(aggregatedExchange, exchange);
aggregatedExchange.setProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, messageCount);
} else {
- throw new IllegalArgumentException("Unexpected message type: "
- + message.getClass().toString());
+ throw new IllegalArgumentException("Unexpected message type: " + message.getClass().toString());
}
}
- if ((usingTimeout) && (startTime > 0)) {
+ if (usingTimeout && startTime > 0) {
// a batch has been started, check whether it should be timed out
long currentTime = new Date().getTime();
timeElapsed = currentTime - startTime;
@@ -252,13 +251,13 @@ public class SjmsBatchConsumer extends DefaultConsumer {
break batchConsumption;
}
} // batch
- assert (aggregatedExchange != null);
process(aggregatedExchange, session);
}
}
/**
* Determine the time that a call to {@link MessageConsumer#receive()} should wait given the time that has elapsed for this batch.
+ *
* @param timeElapsed The time that has elapsed.
* @return The shorter of the time remaining or poll duration.
*/
@@ -277,25 +276,25 @@ public class SjmsBatchConsumer extends DefaultConsumer {
private long getTimeRemaining(long timeElapsed) {
long timeRemaining = completionTimeout - timeElapsed;
- if (LOG.isDebugEnabled() && (timeElapsed > 0)) {
+ if (LOG.isDebugEnabled() && timeElapsed > 0) {
LOG.debug("Time remaining this batch: {}", timeRemaining);
}
return timeRemaining;
}
private void process(Exchange exchange, Session session) {
- assert (exchange != null);
- int id = batchCount.getAndIncrement();
+ int id = BATCH_COUNT.getAndIncrement();
int batchSize = exchange.getProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, Integer.class);
if (LOG.isDebugEnabled()) {
- LOG.debug("Processing batch[" + id + "]:size=" + batchSize + ":total=" + messagesReceived.addAndGet(batchSize));
+ LOG.debug("Processing batch[" + id + "]:size=" + batchSize + ":total=" + MESSAGE_RECEIVED.addAndGet(batchSize));
}
SessionCompletion sessionCompletion = new SessionCompletion(session);
exchange.addOnCompletion(sessionCompletion);
try {
processor.process(exchange);
- LOG.debug("Completed processing[{}]:total={}", id, messagesProcessed.addAndGet(batchSize));
+ long total = MESSAGE_PROCESSED.addAndGet(batchSize);
+ LOG.debug("Completed processing[{}]:total={}", id, total);
} catch (Exception e) {
LOG.error("Error processing exchange: {}", e.getMessage());
}
http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
index 5d307a7..b4c052f 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
@@ -28,12 +28,7 @@ import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
-/**
- * @author jkorab
- */
-@UriEndpoint(scheme = "sjmsBatch",
- title = "Simple JMS Batch Component",
- syntax = "sjms-batch:destinationName?aggregationStrategy=#aggStrategy",
+@UriEndpoint(scheme = "sjms-batch", title = "Simple JMS Batch Component", syntax = "sjms-batch:destinationName",
consumerClass = SjmsBatchComponent.class, label = "messaging")
public class SjmsBatchEndpoint extends DefaultEndpoint {
@@ -41,41 +36,29 @@ public class SjmsBatchEndpoint extends DefaultEndpoint {
public static final int DEFAULT_COMPLETION_TIMEOUT = 500;
public static final String PROPERTY_BATCH_SIZE = "CamelSjmsBatchSize";
- @UriPath
- @Metadata(required = "true",
- description = "The destination name. Only queues are supported, names may be prefixed by 'queue:'.")
+ @UriPath(label = "consumer") @Metadata(required = "true")
private String destinationName;
-
- @UriParam(label = "consumer", defaultValue = "1", description = "The number of JMS sessions to consume from")
- private Integer consumerCount = 1;
-
- @UriParam(label = "consumer", defaultValue = "200",
- description = "The number of messages consumed at which the batch will be completed")
- private Integer completionSize = DEFAULT_COMPLETION_SIZE;
-
- @UriParam(label = "consumer", defaultValue = "500",
- description = "The timeout from receipt of the first first message when the batch will be completed")
- private Integer completionTimeout = DEFAULT_COMPLETION_TIMEOUT;
-
- @UriParam(label = "consumer", defaultValue = "1000",
- description = "The duration in milliseconds of each poll for messages. " +
- "completionTimeOut will be used if it is shorter and a batch has started.")
- private Integer pollDuration = 1000;
-
- @Metadata(required = "true")
- @UriParam(label = "consumer", description = "A #-reference to an AggregationStrategy visible to Camel")
+ @UriParam(label = "consumer", defaultValue = "1")
+ private int consumerCount = 1;
+ @UriParam(label = "consumer", defaultValue = "200")
+ private int completionSize = DEFAULT_COMPLETION_SIZE;
+ @UriParam(label = "consumer", defaultValue = "500")
+ private int completionTimeout = DEFAULT_COMPLETION_TIMEOUT;
+ @UriParam(label = "consumer", defaultValue = "1000")
+ private int pollDuration = 1000;
+ @UriParam(label = "consumer") @Metadata(required = "true")
private AggregationStrategy aggregationStrategy;
- private boolean topic;
-
- public SjmsBatchEndpoint() {}
+ public SjmsBatchEndpoint() {
+ }
public SjmsBatchEndpoint(String endpointUri, Component component, String remaining) {
super(endpointUri, component);
+
DestinationNameParser parser = new DestinationNameParser();
if (parser.isTopic(remaining)) {
- throw new IllegalArgumentException("Only batch consumption from queues is supported. For topics you " +
- "should use a regular JMS consumer with an aggregator.");
+ throw new IllegalArgumentException("Only batch consumption from queues is supported. For topics you "
+ + "should use a regular JMS consumer with an aggregator.");
}
this.destinationName = parser.getShortName(remaining);
}
@@ -99,47 +82,62 @@ public class SjmsBatchEndpoint extends DefaultEndpoint {
return aggregationStrategy;
}
+ /**
+ * The aggregation strategy to use, which merges all the batched messages into a single message
+ */
public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
this.aggregationStrategy = aggregationStrategy;
}
- public Integer getCompletionSize() {
- return completionSize;
- }
-
- public void setCompletionSize(Integer completionSize) {
- this.completionSize = completionSize;
+ /**
+ * The destination name. Only queues are supported, names may be prefixed by 'queue:'.
+ */
+ public String getDestinationName() {
+ return destinationName;
}
- public Integer getCompletionTimeout() {
- return completionTimeout;
+ public int getConsumerCount() {
+ return consumerCount;
}
- public void setCompletionTimeout(Integer completionTimeout) {
- this.completionTimeout = completionTimeout;
+ /**
+ * The number of JMS sessions to consume from
+ */
+ public void setConsumerCount(int consumerCount) {
+ this.consumerCount = consumerCount;
}
- public String getDestinationName() {
- return destinationName;
+ public int getCompletionSize() {
+ return completionSize;
}
- public void setDestinationName(String destinationName) {
- this.destinationName = destinationName;
+ /**
+ * The number of messages consumed at which the batch will be completed
+ */
+ public void setCompletionSize(int completionSize) {
+ this.completionSize = completionSize;
}
- public Integer getConsumerCount() {
- return consumerCount;
+ public int getCompletionTimeout() {
+ return completionTimeout;
}
- public void setConsumerCount(Integer consumerCount) {
- this.consumerCount = consumerCount;
+ /**
+ * The timeout from receipt of the first first message when the batch will be completed
+ */
+ public void setCompletionTimeout(int completionTimeout) {
+ this.completionTimeout = completionTimeout;
}
- public Integer getPollDuration() {
+ public int getPollDuration() {
return pollDuration;
}
- public void setPollDuration(Integer pollDuration) {
+ /**
+ * The duration in milliseconds of each poll for messages.
+ * completionTimeOut will be used if it is shorter and a batch has started.
+ */
+ public void setPollDuration(int pollDuration) {
this.pollDuration = pollDuration;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
index d248350..ddc213d 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
@@ -20,6 +20,7 @@ package org.apache.camel.component.sjms.jms;
* @author jkorab
*/
public class DestinationNameParser {
+
public boolean isTopic(String destinationName) {
if (destinationName == null) {
throw new IllegalArgumentException("destinationName is null");
http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java
index 39774b2..d1eb6e5 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java
@@ -16,16 +16,14 @@
*/
package org.apache.camel.component.sjms.batch;
-import org.apache.camel.Exchange;
-import org.apache.camel.processor.aggregate.AggregationStrategy;
-
import java.util.ArrayList;
import java.util.List;
-/**
- * @author jkorab
- */
+import org.apache.camel.Exchange;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
public class ListAggregationStrategy implements AggregationStrategy {
+
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
String body = newExchange.getIn().getBody(String.class);
@@ -34,7 +32,7 @@ public class ListAggregationStrategy implements AggregationStrategy {
list.add(body);
newExchange.getIn().setBody(list);
return newExchange;
- } else {
+ } else {
List<String> list = oldExchange.getIn().getBody(List.class);
list.add(body);
return oldExchange;
http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
index 642a38f..76c739b 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
@@ -16,8 +16,12 @@
*/
package org.apache.camel.component.sjms.batch;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import javax.jms.ConnectionFactory;
+
import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
@@ -34,16 +38,8 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.ConnectionFactory;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.List;
-
-/**
- * @author jkorab
- */
public class SjmsBatchConsumerTest extends CamelTestSupport {
- private final Logger LOG = LoggerFactory.getLogger(SjmsBatchConsumerTest.class);
+ private static final Logger LOG = LoggerFactory.getLogger(SjmsBatchConsumerTest.class);
@Rule
public EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker("localhost");
@@ -76,10 +72,10 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
@Override
public void configure() throws Exception {
from("direct:in").routeId("harness").startupOrder(20)
- .split(body())
+ .split(body())
.toF("sjms:queue:%s?transacted=true", queueName)
.to("mock:before")
- .end();
+ .end();
}
}
@@ -102,12 +98,11 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
int completionTimeout = 1000;
int completionSize = 200;
- fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s" +
- "&consumerCount=%s&aggregationStrategy=#testStrategy",
+ fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&consumerCount=%s&aggregationStrategy=#testStrategy",
queueName, completionTimeout, completionSize, consumerCount)
.routeId("batchConsumer").startupOrder(10).autoStartup(false)
- .split(body())
- .to("mock:split");
+ .split(body())
+ .to("mock:split");
}
});
context.start();
@@ -132,7 +127,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
}
@Test
- public void testConsumption_completionSize() throws Exception {
+ public void testConsumptionCompletionSize() throws Exception {
final int completionSize = 5;
final int completionTimeout = -1; // size-based only
@@ -157,7 +152,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
}
@Test
- public void testConsumption_completionTimeout() throws Exception {
+ public void testConsumptionCompletionTimeout() throws Exception {
final int completionTimeout = 2000;
final int completionSize = -1; // timeout-based only
@@ -186,7 +181,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
* Checks whether multiple consumer endpoints can operate in parallel.
*/
@Test
- public void testConsumption_multipleConsumerEndpoints() throws Exception {
+ public void testConsumptionMultipleConsumerEndpoints() throws Exception {
final int completionTimeout = 2000;
final int completionSize = 5;
@@ -196,10 +191,10 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
from("direct:in")
.split().body()
- .multicast()
- .toF("sjms:%s", queueName + "A")
- .toF("sjms:%s", queueName + "B")
- .end();
+ .multicast()
+ .toF("sjms:%s", queueName + "A")
+ .toF("sjms:%s", queueName + "B")
+ .end();
fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
queueName + "A", completionTimeout, completionSize).routeId("batchConsumerA")
@@ -229,7 +224,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
}
@Test
- public void testConsumption_rollback() throws Exception {
+ public void testConsumptionRollback() throws Exception {
final int completionTimeout = 2000;
final int completionSize = 5;
http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
index c97d0dd..45fe324 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
@@ -30,9 +30,6 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-/**
- * @author jkorab
- */
public class SjmsBatchEndpointTest extends CamelTestSupport {
// Create one embedded broker instance for the entire test, as we aren't actually
@@ -92,19 +89,19 @@ public class SjmsBatchEndpointTest extends CamelTestSupport {
}
@Test(expected = IllegalArgumentException.class)
- public void testConsumer_negativePollDuration() throws Exception {
+ public void testConsumerNegativePollDuration() throws Exception {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("sjms-batch:in?aggregationStrategy=#aggStrategy&pollDuration=-1")
- .to("mock:out");
+ .to("mock:out");
}
});
context.start();
}
@Test(expected = IllegalArgumentException.class)
- public void testConsumer_negativeConsumerCount() throws Exception {
+ public void testConsumerNegativeConsumerCount() throws Exception {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
@@ -116,7 +113,7 @@ public class SjmsBatchEndpointTest extends CamelTestSupport {
}
@Test(expected = FailedToCreateRouteException.class)
- public void testConsumer_topic() throws Exception {
+ public void testConsumerTopic() throws Exception {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java
index 3fb4968..7f25239 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java
@@ -34,7 +34,7 @@ public class DestinationNameParserTest {
}
@Test(expected = IllegalArgumentException.class)
- public void testIsTopic_nullDestinationName() throws Exception {
+ public void testIsTopicNullDestinationName() throws Exception {
DestinationNameParser parser = new DestinationNameParser();
parser.isTopic(null);
}
@@ -48,7 +48,7 @@ public class DestinationNameParserTest {
}
@Test(expected = IllegalArgumentException.class)
- public void testGetShortName_nullDestinationName() throws Exception {
+ public void testGetShortNameNullDestinationName() throws Exception {
DestinationNameParser parser = new DestinationNameParser();
parser.getShortName(null);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java
index beef0a9..2a925f0 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java
@@ -33,7 +33,7 @@ public class QueueProducerQoSTest extends JmsTestSupport {
private static final String TEST_INOUT_DESTINATION_NAME = "queue.producer.test.qos.inout";
private static final String EXPIRED_MESSAGE_ROUTE_ID = "expiredAdvisoryRoute";
- public static final String MOCK_EXPIRED_ADVISORY = "mock:expiredAdvisory";
+ private static final String MOCK_EXPIRED_ADVISORY = "mock:expiredAdvisory";
@EndpointInject(uri = MOCK_EXPIRED_ADVISORY)
MockEndpoint mockExpiredAdvisory;