You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/07/17 15:22:01 UTC
[2/3] camel git commit: Merged in sjms-batch component.
Merged in sjms-batch component.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ab1d1dd7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ab1d1dd7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ab1d1dd7
Branch: refs/heads/master
Commit: ab1d1dd78fe53edb50c4ede447e4ac5a55ee2ac9
Parents: 65f9a3a
Author: jkorab <ja...@gmail.com>
Authored: Thu Jul 16 11:32:29 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Jul 17 14:56:22 2015 +0200
----------------------------------------------------------------------
components/camel-sjms/pom.xml | 6 +-
.../component/sjms/batch/SessionCompletion.java | 61 ++++
.../sjms/batch/SjmsBatchComponent.java | 53 ++++
.../component/sjms/batch/SjmsBatchConsumer.java | 306 +++++++++++++++++++
.../component/sjms/batch/SjmsBatchEndpoint.java | 133 ++++++++
.../sjms/consumer/AbstractMessageHandler.java | 3 +-
.../component/sjms/jms/JmsMessageHelper.java | 21 +-
.../org/apache/camel/component/sjms-batch | 18 ++
.../sjms/batch/EmbeddedActiveMQBroker.java | 74 +++++
.../sjms/batch/ListAggregationStrategy.java | 43 +++
.../sjms/batch/SjmsBatchConsumerTest.java | 247 +++++++++++++++
.../sjms/batch/SjmsBatchEndpointTest.java | 116 +++++++
12 files changed, 1075 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-sjms/pom.xml b/components/camel-sjms/pom.xml
index 9778517..6977796 100644
--- a/components/camel-sjms/pom.xml
+++ b/components/camel-sjms/pom.xml
@@ -32,7 +32,8 @@
<properties>
<camel.osgi.export.pkg>
org.apache.camel.component.sjms,
- org.apache.camel.component.sjms.jms
+ org.apache.camel.component.sjms.jms,
+ org.apache.camel.component.sjms.batch
</camel.osgi.export.pkg>
<camel.osgi.private.pkg>
org.apache.camel.component.sjms.consumer,
@@ -41,7 +42,8 @@
org.apache.camel.component.sjms.tx
</camel.osgi.private.pkg>
<camel.osgi.export.service>
- org.apache.camel.spi.ComponentResolver;component=sjms
+ org.apache.camel.spi.ComponentResolver;component=sjms,
+ org.apache.camel.spi.ComponentResolver;component=sjms-batch
</camel.osgi.export.service>
</properties>
http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java
new file mode 100644
index 0000000..27f06e6
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.Synchronization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+/**
+ * @author jkorab
+ */
+class SessionCompletion implements Synchronization {
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ private final Session session;
+
+ public SessionCompletion(Session session) {
+ assert (session != null);
+ this.session = session;
+ }
+
+ @Override
+ public void onComplete(Exchange exchange) {
+ try {
+ log.debug("Committing");
+ session.commit();
+ } catch (JMSException ex) {
+ log.error("Exception caught while committing: {}", ex.getMessage());
+ exchange.setException(ex);
+ }
+ }
+
+ @Override
+ public void onFailure(Exchange exchange) {
+ try {
+ log.debug("Rolling back");
+ session.rollback();
+ } catch (JMSException ex) {
+ log.error("Exception caught while rolling back: {}", ex.getMessage());
+ exchange.setException(ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
new file mode 100644
index 0000000..04b875d
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.UriEndpointComponent;
+import org.apache.camel.util.ObjectHelper;
+
+import javax.jms.ConnectionFactory;
+import java.util.Map;
+
+/**
+ * @author jkorab
+ */
+public class SjmsBatchComponent extends UriEndpointComponent {
+
+ private ConnectionFactory connectionFactory;
+
+ public SjmsBatchComponent() {
+ super(SjmsBatchEndpoint.class);
+ }
+
+ @Override
+ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+ ObjectHelper.notNull(connectionFactory, "connectionFactory is null");
+ SjmsBatchEndpoint sjmsBatchEndpoint = new SjmsBatchEndpoint(uri, this, remaining);
+ setProperties(sjmsBatchEndpoint, parameters);
+ return sjmsBatchEndpoint;
+ }
+
+ public ConnectionFactory getConnectionFactory() {
+ return connectionFactory;
+ }
+
+ public void setConnectionFactory(ConnectionFactory connectionFactory) {
+ this.connectionFactory = connectionFactory;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
new file mode 100644
index 0000000..613d471
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.sjms.jms.JmsMessageHelper;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Date;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * @author jkorab
+ */
+public class SjmsBatchConsumer extends DefaultConsumer {
+ private static final boolean TRANSACTED = true;
+ private final Logger LOG = LoggerFactory.getLogger(SjmsBatchConsumer.class);
+
+ private final SjmsBatchEndpoint sjmsBatchEndpoint;
+ private final AggregationStrategy aggregationStrategy;
+
+ private final int completionSize;
+ private final int completionTimeout;
+ private final int consumerCount;
+ private final int pollDuration;
+
+ private final ConnectionFactory connectionFactory;
+ private final String destinationName;
+ private final Processor processor;
+
+ private static AtomicInteger batchCount = new AtomicInteger();
+ private static AtomicLong messagesReceived = new AtomicLong();
+ private static AtomicLong messagesProcessed = new AtomicLong();
+ private ExecutorService jmsConsumerExecutors;
+
+ public SjmsBatchConsumer(SjmsBatchEndpoint sjmsBatchEndpoint, Processor processor) {
+ super(sjmsBatchEndpoint, processor);
+
+ this.sjmsBatchEndpoint = ObjectHelper.notNull(sjmsBatchEndpoint, "batchJmsEndpoint");
+ this.processor = ObjectHelper.notNull(processor, "processor");
+
+ destinationName = ObjectHelper.notEmpty(sjmsBatchEndpoint.getDestinationName(), "destinationName");
+
+ completionSize = sjmsBatchEndpoint.getCompletionSize();
+ completionTimeout = sjmsBatchEndpoint.getCompletionTimeout();
+ pollDuration = sjmsBatchEndpoint.getPollDuration();
+ if (pollDuration < 0) {
+ throw new IllegalArgumentException("pollDuration must be 0 or greater");
+ }
+
+ this.aggregationStrategy = ObjectHelper.notNull(sjmsBatchEndpoint.getAggregationStrategy(), "aggregationStrategy");
+
+ consumerCount = sjmsBatchEndpoint.getConsumerCount();
+ if (consumerCount <= 0) {
+ throw new IllegalArgumentException("consumerCount must be greater than 0");
+ }
+
+ SjmsBatchComponent sjmsBatchComponent = (SjmsBatchComponent) sjmsBatchEndpoint.getComponent();
+ connectionFactory = ObjectHelper.notNull(sjmsBatchComponent.getConnectionFactory(), "jmsBatchComponent.connectionFactory");
+ }
+
+ @Override
+ public Endpoint getEndpoint() {
+ return sjmsBatchEndpoint;
+ }
+
+ private final AtomicBoolean running = new AtomicBoolean(true);
+ private final AtomicReference<CountDownLatch> consumersShutdownLatchRef = new AtomicReference<>();
+
+ private Connection connection;
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+
+ // start up a shared connection
+ try {
+ connection = connectionFactory.createConnection();
+ connection.start();
+ } catch (JMSException ex) {
+ LOG.error("Exception caught closing connection: {}", getStackTrace(ex));
+ return;
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Starting " + consumerCount + " consumer(s) for " + destinationName + ":" + completionSize);
+ }
+ consumersShutdownLatchRef.set(new CountDownLatch(consumerCount));
+
+ jmsConsumerExecutors = getEndpoint().getCamelContext().getExecutorServiceManager()
+ .newFixedThreadPool(this, "SjmsBatchConsumer", consumerCount);
+ for (int i = 0; i < consumerCount; i++) {
+ jmsConsumerExecutors.execute(new BatchConsumptionLoop());
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+ running.set(false);
+ CountDownLatch consumersShutdownLatch = consumersShutdownLatchRef.get();
+ if (consumersShutdownLatch != null) {
+ LOG.info("Stop signalled, waiting on consumers to shut down");
+ if (consumersShutdownLatch.await(60, TimeUnit.SECONDS)) {
+ LOG.warn("Timeout waiting on consumer threads to signal completion - shutting down");
+ } else {
+ LOG.info("All consumers have shut down");
+ }
+ } else {
+ LOG.info("Stop signalled while there are no consumers yet, so no need to wait for consumers");
+ }
+
+ try {
+ LOG.debug("Shutting down JMS connection");
+ connection.close();
+ } catch (JMSException jex) {
+ LOG.error("Exception caught closing connection: {}", getStackTrace(jex));
+ }
+
+ getEndpoint().getCamelContext().getExecutorServiceManager()
+ .shutdown(jmsConsumerExecutors);
+ }
+
+ private String getStackTrace(Exception ex) {
+ StringWriter writer = new StringWriter();
+ ex.printStackTrace(new PrintWriter(writer));
+ return writer.toString();
+ }
+
+ private class BatchConsumptionLoop implements Runnable {
+ @Override
+ public void run() {
+ try {
+ // a batch corresponds to a single session that will be committed or rolled back by a background thread
+ final Session session = connection.createSession(TRANSACTED, Session.CLIENT_ACKNOWLEDGE);
+ try {
+ // destinationName only creates queues; there is no additional value to be gained
+ // by transactionally consuming topic messages in batches
+ Queue queue = session.createQueue(destinationName);
+ MessageConsumer consumer = session.createConsumer(queue);
+ try {
+ consumeBatchesOnLoop(session, consumer);
+ } finally {
+ try {
+ consumer.close();
+ } catch (JMSException ex2) {
+ log.error("Exception caught closing consumer: {}", ex2.getMessage());
+
+ }
+ }
+ } finally {
+ try {
+ session.close();
+ } catch (JMSException ex1) {
+ log.error("Exception caught closing session: {}", ex1.getMessage());
+ }
+ }
+ } catch (JMSException ex) {
+ // from loop
+ LOG.error("Exception caught consuming from {}: {}", destinationName, getStackTrace(ex));
+ } finally {
+ // indicate that we have shut down
+ CountDownLatch consumersShutdownLatch = consumersShutdownLatchRef.get();
+ consumersShutdownLatch.countDown();
+ }
+ }
+
+ private void consumeBatchesOnLoop(Session session, MessageConsumer consumer) throws JMSException {
+ final boolean usingTimeout = completionTimeout > 0;
+
+ batchConsumption:
+ while (running.get()) {
+ int messageCount = 0;
+
+ // reset the clock counters
+ long timeElapsed = 0;
+ long startTime = 0;
+ Exchange aggregatedExchange = null;
+
+ batch:
+ while ((completionSize <= 0) || (messageCount < completionSize)) {
+ // check periodically to see whether we should be shutting down
+ long waitTime = (usingTimeout && (timeElapsed > 0))
+ ? getReceiveWaitTime(timeElapsed)
+ : pollDuration;
+ Message message = consumer.receive(waitTime);
+
+ if (running.get()) { // no interruptions received
+ if (message == null) {
+ // timed out, no message received
+ LOG.trace("No message received");
+ } else {
+ if ((usingTimeout) && (messageCount == 0)) { // this is the first message
+ startTime = new Date().getTime(); // start counting down the period for this batch
+ }
+ messageCount++;
+ LOG.debug("Message received: {}", messageCount);
+ if ((message instanceof ObjectMessage)
+ || (message instanceof TextMessage)) {
+ Exchange exchange = JmsMessageHelper.createExchange(message, getEndpoint());
+ aggregatedExchange = aggregationStrategy.aggregate(aggregatedExchange, exchange);
+ aggregatedExchange.setProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, messageCount);
+ } else {
+ throw new IllegalArgumentException("Unexpected message type: "
+ + message.getClass().toString());
+ }
+ }
+
+ if ((usingTimeout) && (startTime > 0)) {
+ // a batch has been started, check whether it should be timed out
+ long currentTime = new Date().getTime();
+ timeElapsed = currentTime - startTime;
+
+ if (timeElapsed > completionTimeout) {
+ // batch finished by timeout
+ break batch;
+ }
+ }
+
+ } else {
+ LOG.info("Shutdown signal received - rolling batch back");
+ session.rollback();
+ break batchConsumption;
+ }
+ } // batch
+ assert (aggregatedExchange != null);
+ process(aggregatedExchange, session);
+ }
+ }
+
+ /**
+ * Determine the time that a call to {@link MessageConsumer#receive()} should wait given the time that has elapsed for this batch.
+ * @param timeElapsed The time that has elapsed.
+ * @return The shorter of the time remaining or poll duration.
+ */
+ private long getReceiveWaitTime(long timeElapsed) {
+ long timeRemaining = getTimeRemaining(timeElapsed);
+
+ // wait for the shorter of the time remaining or the poll duration
+ if (timeRemaining <= 0) { // ensure that the thread doesn't wait indefinitely
+ timeRemaining = 1;
+ }
+ final long waitTime = (timeRemaining > pollDuration) ? pollDuration : timeRemaining;
+
+ LOG.debug("waiting for {}", waitTime);
+ return waitTime;
+ }
+
+ private long getTimeRemaining(long timeElapsed) {
+ long timeRemaining = completionTimeout - timeElapsed;
+ if (LOG.isDebugEnabled() && (timeElapsed > 0)) {
+ LOG.debug("Time remaining this batch: {}", timeRemaining);
+ }
+ return timeRemaining;
+ }
+
+ private void process(Exchange exchange, Session session) {
+ assert (exchange != null);
+ int id = batchCount.getAndIncrement();
+ int batchSize = exchange.getProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, Integer.class);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing batch:size={}:total={}", batchSize, messagesReceived.addAndGet(batchSize));
+ }
+
+ Synchronization committing = new SessionCompletion(session);
+ exchange.addOnCompletion(committing);
+ try {
+ processor.process(exchange);
+ LOG.debug("Completed processing[{}]:total={}", id, messagesProcessed.addAndGet(batchSize));
+ } catch (Exception e) {
+ LOG.error("Error processing exchange: {}", e.getMessage());
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
new file mode 100644
index 0000000..afd5cbe
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+
+/**
+ * @author jkorab
+ */
+@UriEndpoint(scheme = "sjmsBatch", title = "Simple JMS Batch Component", syntax = "sjms-batch:destinationName?aggregationStrategy=#aggStrategy", consumerClass = SjmsBatchComponent.class, label = "messaging")
+public class SjmsBatchEndpoint extends DefaultEndpoint {
+
+ public static final int DEFAULT_COMPLETION_SIZE = 200; // the default dispatch queue size in ActiveMQ
+ public static final int DEFAULT_COMPLETION_TIMEOUT = 500;
+ public static final String PROPERTY_BATCH_SIZE = "CamelSjmsBatchSize";
+
+ @UriPath
+ @Metadata(required = "true")
+ private String destinationName;
+
+ @UriParam(label = "consumer", defaultValue = "1", description = "The number of JMS sessions to consume from")
+ private Integer consumerCount = 1;
+
+ @UriParam(label = "consumer", defaultValue = "200",
+ description = "The number of messages consumed at which the batch will be completed")
+ private Integer completionSize = DEFAULT_COMPLETION_SIZE;
+
+ @UriParam(label = "consumer", defaultValue = "500",
+ description = "The timeout from receipt of the first first message when the batch will be completed")
+ private Integer completionTimeout = DEFAULT_COMPLETION_TIMEOUT;
+
+ @UriParam(label = "consumer", defaultValue = "1000",
+ description = "The duration in milliseconds of each poll for messages. " +
+ "completionTimeOut will be used if it is shorter and a batch has started.")
+ private Integer pollDuration = 1000;
+
+ @Metadata(required = "true")
+ @UriParam(label = "consumer", description = "A #-reference to an AggregationStrategy visible to Camel")
+ private AggregationStrategy aggregationStrategy;
+
+ public SjmsBatchEndpoint() {}
+
+ public SjmsBatchEndpoint(String endpointUri, Component component, String remaining) {
+ super(endpointUri, component);
+ this.destinationName = remaining;
+ }
+
+ @Override
+ public boolean isSingleton() {
+ return true;
+ }
+
+ @Override
+ public Producer createProducer() throws Exception {
+ throw new UnsupportedOperationException("Cannot produce though a " + SjmsBatchEndpoint.class.getName());
+ }
+
+ @Override
+ public Consumer createConsumer(Processor processor) throws Exception {
+ return new SjmsBatchConsumer(this, processor);
+ }
+
+ public AggregationStrategy getAggregationStrategy() {
+ return aggregationStrategy;
+ }
+
+ public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
+ this.aggregationStrategy = aggregationStrategy;
+ }
+
+ public Integer getCompletionSize() {
+ return completionSize;
+ }
+
+ public void setCompletionSize(Integer completionSize) {
+ this.completionSize = completionSize;
+ }
+
+ public Integer getCompletionTimeout() {
+ return completionTimeout;
+ }
+
+ public void setCompletionTimeout(Integer completionTimeout) {
+ this.completionTimeout = completionTimeout;
+ }
+
+ public String getDestinationName() {
+ return destinationName;
+ }
+
+ public void setDestinationName(String destinationName) {
+ this.destinationName = destinationName;
+ }
+
+ public Integer getConsumerCount() {
+ return consumerCount;
+ }
+
+ public void setConsumerCount(Integer consumerCount) {
+ this.consumerCount = consumerCount;
+ }
+
+ public Integer getPollDuration() {
+ return pollDuration;
+ }
+
+ public void setPollDuration(Integer pollDuration) {
+ this.pollDuration = pollDuration;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
index c3c5e55..1598a43 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
@@ -70,7 +70,8 @@ public abstract class AbstractMessageHandler implements MessageListener {
public void onMessage(Message message) {
RuntimeCamelException rce = null;
try {
- final DefaultExchange exchange = (DefaultExchange) JmsMessageHelper.createExchange(message, getEndpoint());
+ SjmsEndpoint endpoint = (SjmsEndpoint) getEndpoint();
+ final DefaultExchange exchange = (DefaultExchange) JmsMessageHelper.createExchange(message, endpoint, endpoint.getJmsKeyFormatStrategy());
log.debug("Processing Exchange.id:{}", exchange.getExchangeId());
http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
index dcccd9b..79787c9 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
@@ -62,8 +62,23 @@ public final class JmsMessageHelper implements JmsConstants {
}
public static Exchange createExchange(Message message, Endpoint endpoint) {
+ return createExchange(message, endpoint, null);
+ }
+
+ /**
+ * Creates an Exchange from a JMS Message.
+ * @param message The JMS message.
+ * @param endpoint The Endpoint to use to create the Exchange object.
+ * @param keyFormatStrategy the a {@link KeyFormatStrategy} to used to
+ * format keys in a JMS 1.1 compliant manner. If null the
+ * {@link DefaultJmsKeyFormatStrategy} will be used.
+ * @return Populated Exchange.
+ */
+ public static Exchange createExchange(Message message, Endpoint endpoint, KeyFormatStrategy keyFormatStrategy) {
Exchange exchange = endpoint.createExchange();
- return populateExchange(message, exchange, false, ((SjmsEndpoint)endpoint).getJmsKeyFormatStrategy());
+ KeyFormatStrategy initialisedKeyFormatStrategy = (keyFormatStrategy == null)
+ ? new DefaultJmsKeyFormatStrategy() : keyFormatStrategy;
+ return populateExchange(message, exchange, false, initialisedKeyFormatStrategy);
}
@SuppressWarnings("unchecked")
@@ -222,11 +237,11 @@ public final class JmsMessageHelper implements JmsConstants {
* @param jmsMessage the {@link Message} to add or update the headers on
* @param messageHeaders a {@link Map} of String/Object pairs
* @param keyFormatStrategy the a {@link KeyFormatStrategy} to used to
- * format keys in a JMS 1.1 compliant manner. If null the
- * {@link DefaultJmsKeyFormatStrategy} will be used.
+ * format keys in a JMS 1.1 compliant manner.
* @return {@link Message}
*/
private static Message setJmsMessageHeaders(final Message jmsMessage, Map<String, Object> messageHeaders, KeyFormatStrategy keyFormatStrategy) throws IllegalHeaderException {
+
Map<String, Object> headers = new HashMap<String, Object>(messageHeaders);
for (final Map.Entry<String, Object> entry : headers.entrySet()) {
String headerName = entry.getKey();
http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/resources/META-INF/services/org/apache/camel/component/sjms-batch
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/resources/META-INF/services/org/apache/camel/component/sjms-batch b/components/camel-sjms/src/main/resources/META-INF/services/org/apache/camel/component/sjms-batch
new file mode 100644
index 0000000..9ee9e4c
--- /dev/null
+++ b/components/camel-sjms/src/main/resources/META-INF/services/org/apache/camel/component/sjms-batch
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.component.sjms.batch.SjmsBatchComponent
http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/EmbeddedActiveMQBroker.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/EmbeddedActiveMQBroker.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/EmbeddedActiveMQBroker.java
new file mode 100644
index 0000000..fd1ed27
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/EmbeddedActiveMQBroker.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
+import org.apache.camel.test.AvailablePortFinder;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JUnit Test aspect that creates an embedded ActiveMQ broker at the beginning of each test and shuts it down after.
+ */
+public class EmbeddedActiveMQBroker extends ExternalResource {
+
+ private final Logger log = LoggerFactory.getLogger(EmbeddedActiveMQBroker.class);
+ private final String brokerId;
+ private BrokerService brokerService;
+ private final String tcpConnectorUri;
+
+ public EmbeddedActiveMQBroker(String brokerId) {
+ if ((brokerId == null) || (brokerId.isEmpty())) {
+ throw new IllegalArgumentException("brokerId is empty");
+ }
+ this.brokerId = brokerId;
+ tcpConnectorUri = "tcp://localhost:" + AvailablePortFinder.getNextAvailable();
+
+ brokerService = new BrokerService();
+ brokerService.setBrokerId(brokerId);
+ brokerService.setPersistent(false);
+ brokerService.setUseJmx(false);
+ try {
+ brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter());
+ brokerService.addConnector(tcpConnectorUri);
+ } catch (Exception e) {
+ throw new RuntimeException("Problem creating brokerService", e);
+ }
+ }
+
+ @Override
+ protected void before() throws Throwable {
+ log.info("Starting embedded broker[{}] on {}", brokerId, tcpConnectorUri);
+ brokerService.start();
+ }
+
+ @Override
+ protected void after() {
+ try {
+ log.info("Stopping embedded broker[{}]", brokerId);
+ brokerService.stop();
+ } catch (Exception e) {
+ throw new RuntimeException("Exception shutting down broker service", e);
+ }
+ }
+
+ public String getTcpConnectorUri() {
+ return tcpConnectorUri;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java
new file mode 100644
index 0000000..39774b2
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author jkorab
+ */
+public class ListAggregationStrategy implements AggregationStrategy {
+ @Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ String body = newExchange.getIn().getBody(String.class);
+ if (oldExchange == null) {
+ List<String> list = new ArrayList<String>();
+ list.add(body);
+ newExchange.getIn().setBody(list);
+ return newExchange;
+ } else {
+ List<String> list = oldExchange.getIn().getBody(List.class);
+ list.add(body);
+ return oldExchange;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
new file mode 100644
index 0000000..58fc717
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.camel.util.StopWatch;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.ConnectionFactory;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * @author jkorab
+ */
+public class SjmsBatchConsumerTest extends CamelTestSupport {
+ private final Logger LOG = LoggerFactory.getLogger(SjmsBatchConsumerTest.class);
+
+ @Rule
+ public EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker("localhost");
+
+ @Override
+ public CamelContext createCamelContext() throws Exception {
+ SimpleRegistry registry = new SimpleRegistry();
+ registry.put("testStrategy", new ListAggregationStrategy());
+ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTcpConnectorUri());
+
+ SjmsComponent sjmsComponent = new SjmsComponent();
+ sjmsComponent.setConnectionFactory(connectionFactory);
+
+ SjmsBatchComponent sjmsBatchComponent = new SjmsBatchComponent();
+ sjmsBatchComponent.setConnectionFactory(connectionFactory);
+
+ CamelContext context = new DefaultCamelContext(registry);
+ context.addComponent("sjms", sjmsComponent);
+ context.addComponent("sjmsbatch", sjmsBatchComponent);
+ return context;
+ }
+
+ private static class TransactedSendHarness extends RouteBuilder {
+ private final String queueName;
+
+ public TransactedSendHarness(String queueName) {
+ this.queueName = queueName;
+ }
+
+ @Override
+ public void configure() throws Exception {
+ from("direct:in").routeId("harness").startupOrder(20)
+ .split(body())
+ .toF("sjms:queue:%s?transacted=true", queueName)
+ .to("mock:before")
+ .end();
+ }
+ }
+
+ @Override
+ public boolean isUseAdviceWith() {
+ return true;
+ }
+
+ @Test
+ public void testConsumption() throws Exception {
+
+ final int messageCount = 10000;
+ final int consumerCount = 5;
+
+ final String queueName = getQueueName();
+ context.addRoutes(new TransactedSendHarness(queueName));
+ context.addRoutes(new RouteBuilder() {
+ public void configure() throws Exception {
+
+ int completionTimeout = 1000;
+ int completionSize = 200;
+
+ fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s" +
+ "&consumerCount=%s&aggregationStrategy=#testStrategy",
+ queueName, completionTimeout, completionSize, consumerCount)
+ .routeId("batchConsumer").startupOrder(10).autoStartup(false)
+ .split(body())
+ .to("mock:split");
+ }
+ });
+ context.start();
+
+ MockEndpoint mockBefore = getMockEndpoint("mock:before");
+ mockBefore.setExpectedMessageCount(messageCount);
+
+ MockEndpoint mockSplit = getMockEndpoint("mock:split");
+ mockSplit.setExpectedMessageCount(messageCount);
+
+ LOG.info("Sending messages");
+ template.sendBody("direct:in", generateStrings(messageCount));
+ LOG.info("Send complete");
+
+ StopWatch stopWatch = new StopWatch();
+ context.startRoute("batchConsumer");
+ assertMockEndpointsSatisfied();
+ long time = stopWatch.stop();
+
+ LOG.info("Processed {} messages in {} ms", messageCount, time);
+ LOG.info("Average throughput {} msg/s", (long) (messageCount / (time / 1000d)));
+ }
+
+ @Test
+ public void testConsumption_completionSize() throws Exception {
+ final int completionSize = 5;
+ final int completionTimeout = -1; // size-based only
+
+ final String queueName = getQueueName();
+ context.addRoutes(new TransactedSendHarness(queueName));
+ context.addRoutes(new RouteBuilder() {
+ public void configure() throws Exception {
+ fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+ queueName, completionTimeout, completionSize).routeId("batchConsumer").startupOrder(10)
+ .log(LoggingLevel.DEBUG, "${body.size}")
+ .to("mock:batches");
+ }
+ });
+ context.start();
+
+ int messageCount = 100;
+ MockEndpoint mockBatches = getMockEndpoint("mock:batches");
+ mockBatches.expectedMessageCount(messageCount / completionSize);
+
+ template.sendBody("direct:in", generateStrings(messageCount));
+ mockBatches.assertIsSatisfied();
+ }
+
+ @Test
+ public void testConsumption_completionTimeout() throws Exception {
+ final int completionTimeout = 2000;
+ final int completionSize = -1; // timeout-based only
+
+ final String queueName = getQueueName();
+ context.addRoutes(new TransactedSendHarness(queueName));
+ context.addRoutes(new RouteBuilder() {
+ public void configure() throws Exception {
+ fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+ queueName, completionTimeout, completionSize).routeId("batchConsumer").startupOrder(10)
+ .to("mock:batches");
+ }
+ });
+ context.start();
+
+ int messageCount = 50;
+ assertTrue(messageCount < SjmsBatchEndpoint.DEFAULT_COMPLETION_SIZE);
+ MockEndpoint mockBatches = getMockEndpoint("mock:batches");
+ mockBatches.expectedMessageCount(1); // everything batched together
+
+ template.sendBody("direct:in", generateStrings(messageCount));
+ mockBatches.assertIsSatisfied();
+ assertFirstMessageBodyOfLength(mockBatches, messageCount);
+ }
+
+ /**
+ * Checks whether multiple consumer endpoints can operate in parallel.
+ */
+ @Test
+ public void testConsumption_multipleConsumerEndpoints() throws Exception {
+ final int completionTimeout = 2000;
+ final int completionSize = 5;
+
+ final String queueName = getQueueName();
+ context.addRoutes(new RouteBuilder() {
+ public void configure() throws Exception {
+
+ from("direct:in")
+ .split().body()
+ .multicast()
+ .toF("sjms:%s", queueName + "A")
+ .toF("sjms:%s", queueName + "B")
+ .end();
+
+ fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+ queueName + "A", completionTimeout, completionSize).routeId("batchConsumerA")
+ .to("mock:outA");
+
+ fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+ queueName + "B", completionTimeout, completionSize).routeId("batchConsumerB")
+ .to("mock:outB");
+
+ }
+ });
+ context.start();
+
+ int messageCount = 5;
+
+ assertTrue(messageCount < SjmsBatchEndpoint.DEFAULT_COMPLETION_SIZE);
+ MockEndpoint mockOutA = getMockEndpoint("mock:outA");
+ mockOutA.expectedMessageCount(1); // everything batched together
+ MockEndpoint mockOutB = getMockEndpoint("mock:outB");
+ mockOutB.expectedMessageCount(1); // everything batched together
+
+ template.sendBody("direct:in", generateStrings(messageCount));
+ assertMockEndpointsSatisfied();
+
+ assertFirstMessageBodyOfLength(mockOutA, messageCount);
+ assertFirstMessageBodyOfLength(mockOutB, messageCount);
+ }
+
+ private void assertFirstMessageBodyOfLength(MockEndpoint mockEndpoint, int expectedLength) {
+ Exchange exchange = mockEndpoint.getExchanges().get(0);
+ assertEquals(expectedLength, exchange.getIn().getBody(List.class).size());
+ }
+
+ private String getQueueName() {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyMMddhhmmss");
+ return "sjmsbatch-" + sdf.format(new Date());
+ }
+
+ private String[] generateStrings(int messageCount) {
+ String[] strings = new String[messageCount];
+ for (int i = 0; i < messageCount; i++) {
+ strings[i] = "message:" + i;
+ }
+ return strings;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
new file mode 100644
index 0000000..7a75e76
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.camel.util.toolbox.AggregationStrategies;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * @author jkorab
+ */
+public class SjmsBatchEndpointTest extends CamelTestSupport {
+
+ // Create one embedded broker instance for the entire test, as we aren't actually
+ // going to send any messages to it; we just need it so that the ConnectionFactory
+ // has something local to connect to.
+ public static EmbeddedActiveMQBroker broker;
+
+ @BeforeClass
+ public static void setupBroker() {
+ broker = new EmbeddedActiveMQBroker("localhost");
+ try {
+ broker.before();
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+ }
+
+ @AfterClass
+ public static void shutDownBroker() {
+ broker.after();
+ }
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ SimpleRegistry registry = new SimpleRegistry();
+ registry.put("aggStrategy", AggregationStrategies.groupedExchange());
+
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
+ connectionFactory.setBrokerURL(broker.getTcpConnectorUri());
+
+ SjmsComponent sjmsComponent = new SjmsComponent();
+ sjmsComponent.setConnectionFactory(connectionFactory);
+
+ SjmsBatchComponent sjmsBatchComponent = new SjmsBatchComponent();
+ sjmsBatchComponent.setConnectionFactory(connectionFactory);
+
+ CamelContext context = new DefaultCamelContext(registry);
+ context.addComponent("sjmsbatch", sjmsBatchComponent);
+ context.addComponent("sjms", sjmsComponent);
+
+ return context;
+ }
+
+ @Override
+ public boolean isUseAdviceWith() {
+ return true;
+ }
+
+ @Test(expected = org.apache.camel.FailedToCreateProducerException.class)
+ public void testProducerFailure() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ public void configure() throws Exception {
+ from("direct:in").to("sjmsbatch:testQueue");
+ }
+ });
+ context.start();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testConsumer_negativePollDuration() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("sjmsbatch:in?aggregationStrategy=#aggStrategy&pollDuration=-1")
+ .to("mock:out");
+ }
+ });
+ context.start();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testConsumer_negativeConsumerCount() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("sjmsbatch:in?aggregationStrategy=#aggStrategy&consumerCount=-1")
+ .to("mock:out");
+ }
+ });
+ context.start();
+ }
+
+}