You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2012/08/30 05:42:14 UTC
svn commit: r1378796 [1/3] - in /camel/trunk/components/camel-sjms: ./
src/main/java/org/apache/camel/component/sjms/
src/main/java/org/apache/camel/component/sjms/consumer/
src/main/java/org/apache/camel/component/sjms/jms/
src/main/java/org/apache/ca...
Author: ningjiang
Date: Thu Aug 30 03:42:12 2012
New Revision: 1378796
URL: http://svn.apache.org/viewvc?rev=1378796&view=rev
Log:
CAMEL-5497 Add Batch Transaction Support with thanks to Scott
Added:
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/BatchMessage.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/TransactionCommitStrategy.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageExchangeHelper.java
- copied, changed from r1378786, camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/BatchTransactionCommitStrategy.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/DefaultTransactionCommitStrategy.java
- copied, changed from r1378786, camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/KeyFormatStrategy.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueConsumerTest.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueProducerTest.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicConsumerTest.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicProducerTest.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedInOnlyQueueConsumerTest.java
- copied, changed from r1378786, camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/TransactedInOnlyQueueConsumerTest.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedInOnlyTopicConsumerTest.java
- copied, changed from r1378786, camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/TransactedInOnlyTopicConsumerTest.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedQueueProducerTest.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedTopicProducerTest.java
Removed:
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/TransactedInOnlyQueueConsumerTest.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/TransactedInOnlyTopicConsumerTest.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/TransactedQueueProducerTest.java
Modified:
camel/trunk/components/camel-sjms/pom.xml
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/DefaultJmsKeyFormatStrategy.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/IllegalHeaderException.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/KeyFormatStrategy.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/MissingHeaderException.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsHeaderFilterStrategy.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessageConsumer.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultConsumer.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultMessageHandler.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/ConnectionFactoryResource.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionTransactionSynchronization.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/JmsSelectorOptionTest.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/JmsSelectorTest.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SimpleJmsComponentTest.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointNameOverrideTest.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointTest.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerDefaultTest.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyQueueConsumerTest.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyTopicConsumerTest.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/ObjectPoolTest.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyQueueProducerTest.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyTopicProducerTest.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerAsyncLoadTest.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerSyncLoadTest.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerTest.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutTempQueueProducerTest.java
camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerTest.java
camel/trunk/components/camel-sjms/src/test/resources/log4j.properties
Modified: camel/trunk/components/camel-sjms/pom.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/pom.xml?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/pom.xml (original)
+++ camel/trunk/components/camel-sjms/pom.xml Thu Aug 30 03:42:12 2012
@@ -1,22 +1,25 @@
<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
+<project
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
<modelVersion>4.0.0</modelVersion>
<parent>
@@ -31,6 +34,7 @@
<name>Camel :: SJMS</name>
<description>A pure Java JMS Camel Component</description>
+
<properties>
<camel.osgi.export.pkg>
org.apache.camel.component.sjms,
@@ -46,21 +50,33 @@
org.apache.camel.component.sjms.producer,
org.apache.camel.component.sjms.tx
</camel.osgi.private.pkg>
- </properties>
+ <maven.test.redirectTestOutputToFile>true</maven.test.redirectTestOutputToFile>
+ </properties>
<dependencies>
-
+ <!-- =============================== -->
+ <!-- Required Dependencies -->
+ <!-- =============================== -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-annotation_1.0_spec</artifactId>
+ <version>1.1.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
- <scope>provided</scope>
+ <version>1.1.1</version>
</dependency>
- <!-- for testing -->
+ <!-- for testing -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test</artifactId>
@@ -98,35 +114,64 @@
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
+ <optional>true</optional>
<scope>test</scope>
</dependency>
</dependencies>
<build>
- <plugins>
+ <defaultGoal>install</defaultGoal>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin> <!-- Configure which tests are included/excuded -->
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
+ <forkMode>pertest</forkMode>
<childDelegation>false</childDelegation>
<useFile>true</useFile>
- <forkMode>pertest</forkMode>
- <forkedProcessTimeoutInSeconds>6000</forkedProcessTimeoutInSeconds>
+ <argLine>-Xmx512M</argLine>
+ <systemProperties>
+ <property>
+ <name>org.apache.activemq.default.directory.prefix</name>
+ <value>target/activemq/</value>
+ </property>
+ </systemProperties>
</configuration>
</plugin>
-
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-clean-plugin</artifactId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>2.8.1</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
<configuration>
- <filesets>
- <fileset>
- <directory>${basedir}/activemq-data</directory>
- </fileset>
- </filesets>
+ <forkMode>pertest</forkMode>
+ <childDelegation>false</childDelegation>
+ <useFile>true</useFile>
+ <argLine>-Xmx512M</argLine>
+ <systemProperties>
+ <property>
+ <name>org.apache.activemq.default.directory.prefix</name>
+ <value>target/activemq/</value>
+ </property>
+ </systemProperties>
</configuration>
</plugin>
</plugins>
</build>
-
</project>
Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/BatchMessage.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/BatchMessage.java?rev=1378796&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/BatchMessage.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/BatchMessage.java Thu Aug 30 03:42:12 2012
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link List} of these objects can be used to batch a collection of body and
+ * header pairs in one exchange.
+ *
+ */
+public class BatchMessage<T> {
+ private T payload;
+ private Map<String, Object> headers;
+
+ /**
+ * @param payload may not be null
+ * @param headers may be null
+ */
+ public BatchMessage(T payload, Map<String, Object> headers) {
+ super();
+ if (payload == null) {
+ throw new IllegalArgumentException("Payload may not be null");
+ } else {
+ this.payload = payload;
+ }
+ this.headers = headers;
+ }
+
+ public T getPayload() {
+ return payload;
+ }
+
+ public Map<String, Object> getHeaders() {
+ return headers;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((headers == null) ? 0 : headers.hashCode());
+ result = prime * result + ((payload == null) ? 0 : payload.hashCode());
+ return result;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof BatchMessage)) {
+ return false;
+ }
+ BatchMessage other = (BatchMessage)obj;
+ if (headers == null) {
+ if (other.headers != null) {
+ return false;
+ }
+ } else if (!headers.equals(other.headers)) {
+ return false;
+ }
+ if (payload == null) {
+ if (other.payload != null) {
+ return false;
+ }
+ } else if (!payload.equals(other.payload)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "BatchMessage [payload=" + payload + ", headers=" + headers + "]";
+ }
+}
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/DefaultJmsKeyFormatStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/DefaultJmsKeyFormatStrategy.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/DefaultJmsKeyFormatStrategy.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/DefaultJmsKeyFormatStrategy.java Thu Aug 30 03:42:12 2012
@@ -19,9 +19,10 @@ package org.apache.camel.component.sjms;
/**
* Default strategy that handles dots and hyphens.
* <p/>
- * This can be used for sending keys containg package names that is common by Java frameworks.
- *
- * @version
+ * This can be used for sending keys contain package names that is common by
+ * Java frameworks.
+ *
+ * @version
*/
public class DefaultJmsKeyFormatStrategy implements KeyFormatStrategy {
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/IllegalHeaderException.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/IllegalHeaderException.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/IllegalHeaderException.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/IllegalHeaderException.java Thu Aug 30 03:42:12 2012
@@ -24,11 +24,13 @@ import org.apache.camel.RuntimeCamelExce
*/
public class IllegalHeaderException extends RuntimeCamelException {
- private static final long serialVersionUID = 1L;
+
+ private static final long serialVersionUID = 3136304415267471091L;
/**
*/
public IllegalHeaderException() {
+ super();
}
/**
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/KeyFormatStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/KeyFormatStrategy.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/KeyFormatStrategy.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/KeyFormatStrategy.java Thu Aug 30 03:42:12 2012
@@ -17,23 +17,25 @@
package org.apache.camel.component.sjms;
/**
- * Strategy for applying encoding and decoding of JMS headers so they apply to the JMS spec.
- *
- * @version
+ * Strategy for applying encoding and decoding of JMS headers so they apply to
+ * the JMS spec.
+ *
+ * @version
*/
public interface KeyFormatStrategy {
/**
* Encodes the key before its sent as a {@link javax.jms.Message} message.
- *
- * @param key the original key
+ *
+ * @param key the original key
* @return the encoded key
*/
String encodeKey(String key);
/**
- * Decodes the key after its received from a {@link javax.jms.Message} message.
- *
+ * Decodes the key after its received from a {@link javax.jms.Message}
+ * message.
+ *
* @param key the encoded key
* @return the decoded key as the original key
*/
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/MissingHeaderException.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/MissingHeaderException.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/MissingHeaderException.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/MissingHeaderException.java Thu Aug 30 03:42:12 2012
@@ -20,26 +20,25 @@ import org.apache.camel.RuntimeCamelExce
/**
* TODO Add Class documentation for MissingHeaderException
- *
*/
public class MissingHeaderException extends RuntimeCamelException {
/**
*
*/
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = -6184009502090347023L;
/**
* TODO Add Constructor Javadoc
- *
*/
public MissingHeaderException() {
+ super();
// TODO Auto-generated constructor stub
}
/**
* TODO Add Constructor Javadoc
- *
+ *
* @param message
* @param cause
*/
@@ -50,7 +49,7 @@ public class MissingHeaderException exte
/**
* TODO Add Constructor Javadoc
- *
+ *
* @param message
*/
public MissingHeaderException(String message) {
@@ -60,7 +59,7 @@ public class MissingHeaderException exte
/**
* TODO Add Constructor Javadoc
- *
+ *
* @param cause
*/
public MissingHeaderException(Throwable cause) {
@@ -68,5 +67,4 @@ public class MissingHeaderException exte
// TODO Auto-generated constructor stub
}
-
}
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java Thu Aug 30 03:42:12 2012
@@ -25,6 +25,7 @@ import org.apache.camel.Endpoint;
import org.apache.camel.ExchangePattern;
import org.apache.camel.component.sjms.jms.ConnectionFactoryResource;
import org.apache.camel.component.sjms.jms.ConnectionResource;
+import org.apache.camel.component.sjms.tx.DefaultTransactionCommitStrategy;
import org.apache.camel.impl.DefaultComponent;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.HeaderFilterStrategyAware;
@@ -43,8 +44,9 @@ public class SjmsComponent extends Defau
private HeaderFilterStrategy headerFilterStrategy = new SjmsHeaderFilterStrategy();
private KeyFormatStrategy keyFormatStrategy;
private Integer maxConnections = 1;
+ private TransactionCommitStrategy commitStrategy = new DefaultTransactionCommitStrategy();
- /*
+ /**
* @see
* org.apache.camel.impl.DefaultComponent#createEndpoint(java.lang.String,
* java.lang.String, java.util.Map)
@@ -200,4 +202,25 @@ public class SjmsComponent extends Defau
public KeyFormatStrategy getKeyFormatStrategy() {
return keyFormatStrategy;
}
+
+ /**
+ * Gets the TransactionCommitStrategy value of commitStrategy for this
+ * instance of SjmsComponent.
+ *
+ * @return the commitStrategy
+ */
+ public TransactionCommitStrategy getCommitStrategy() {
+ return commitStrategy;
+ }
+
+ /**
+ * Sets the TransactionCommitStrategy value of commitStrategy for this
+ * instance of SjmsComponent.
+ *
+ * @param commitStrategy Sets TransactionCommitStrategy, default is TODO add
+ * default
+ */
+ public void setCommitStrategy(TransactionCommitStrategy commitStrategy) {
+ this.commitStrategy = commitStrategy;
+ }
}
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java Thu Aug 30 03:42:12 2012
@@ -18,17 +18,16 @@ package org.apache.camel.component.sjms;
/**
* TODO Add Class documentation for SjmsConstants
- *
*/
public final class SjmsConstants {
public static final String QUEUE_PREFIX = "queue:";
public static final String TOPIC_PREFIX = "topic:";
public static final String TEMP_QUEUE_PREFIX = "temp:queue:";
public static final String TEMP_TOPIC_PREFIX = "temp:topic:";
-
+
public static final String JMS_MESSAGE_TYPE = "JmsMessageType";
public static final String ORIGINAL_MESSAGE = "SjmsOriginalMessage";
-
+
private SjmsConstants() {
// Helper class
}
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java Thu Aug 30 03:42:12 2012
@@ -27,14 +27,13 @@ import org.apache.camel.impl.DefaultCons
/**
* TODO Add Class documentation for SjmsConsumer
- *
*/
public class SjmsConsumer extends DefaultConsumer {
public SjmsConsumer(Endpoint endpoint, Processor processor) {
super(endpoint, processor);
}
-
+
/**
* @return
*/
@@ -49,16 +48,20 @@ public class SjmsConsumer extends Defaul
protected SjmsEndpoint getSjmsEndpoint() {
return (SjmsEndpoint)this.getEndpoint();
}
-
+
protected ConnectionResource getConnectionResource() {
return getSjmsEndpoint().getConnectionResource();
}
-
+
protected SessionPool getSessionPool() {
return getSjmsEndpoint().getSessions();
}
- public boolean isEndpointTransacted() {
+ public int getAcknowledgementMode() {
+ return getSjmsEndpoint().getAcknowledgementMode().intValue();
+ }
+
+ public boolean isTransacted() {
return getSjmsEndpoint().isTransacted();
}
@@ -73,7 +76,7 @@ public class SjmsConsumer extends Defaul
public int getConsumerCount() {
return getSjmsEndpoint().getConsumerCount();
}
-
+
public boolean isTopic() {
return getSjmsEndpoint().isTopic();
}
@@ -85,4 +88,12 @@ public class SjmsConsumer extends Defaul
public String getDurableSubscriptionId() {
return getSjmsEndpoint().getDurableSubscriptionId();
}
+
+ public TransactionCommitStrategy getCommitStrategy() {
+ return getSjmsEndpoint().getCommitStrategy();
+ }
+
+ public int getTransactionBatchCount() {
+ return getSjmsEndpoint().getTransactionBatchCount();
+ }
}
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java Thu Aug 30 03:42:12 2012
@@ -31,17 +31,14 @@ import org.apache.camel.component.sjms.p
import org.apache.camel.component.sjms.producer.InOutProducer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.HeaderFilterStrategy;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* TODO Add Class documentation for SjmsEndpoint
- *
*/
public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSupport {
- protected final transient Logger logger = LoggerFactory
- .getLogger(getClass());
+ protected final transient Logger logger = LoggerFactory.getLogger(getClass());
private SessionPool sessions;
private boolean synchronous = true;
@@ -57,9 +54,11 @@ public class SjmsEndpoint extends Defaul
private String durableSubscriptionId;
private long responseTimeOut = 5000;
private String messageSelector;
-
+ private int transactionBatchCount = -1;
+ private TransactionCommitStrategy commitStrategy;
public SjmsEndpoint() {
+ super();
}
public SjmsEndpoint(String uri, Component component) {
@@ -68,31 +67,31 @@ public class SjmsEndpoint extends Defaul
setTopic(false);
} else if (getEndpointUri().indexOf("://topic:") > -1) {
setTopic(true);
- } else {
+ } else {
throw new RuntimeCamelException("Endpoint URI unsupported: " + uri);
}
}
-
+
@Override
protected void doStart() throws Exception {
super.doStart();
-
+
//
- // TODO since we only need a session pool for one use case, find a better way
+ // TODO since we only need a session pool for one use case, find a
+ // better way
//
// We only create a session pool when we are not transacted.
// Transacted listeners or producers need to be paired with the
// Session that created them.
- if (!isTransacted()) {
+ if (!isTransacted() && getExchangePattern().equals(ExchangePattern.InOnly)) {
sessions = new SessionPool(getSessionCount(), getConnectionResource());
-
+
// TODO fix the string hack
- sessions.setAcknowledgeMode(SessionAcknowledgementType
- .valueOf(getAcknowledgementMode() + ""));
+ sessions.setAcknowledgeMode(SessionAcknowledgementType.valueOf(getAcknowledgementMode() + ""));
getSessions().fillPool();
}
}
-
+
@Override
protected void doStop() throws Exception {
if (getSessions() != null) {
@@ -116,12 +115,12 @@ public class SjmsEndpoint extends Defaul
public Consumer createConsumer(Processor processor) throws Exception {
return new DefaultConsumer(this, processor);
}
-
+
@Override
public boolean isMultipleConsumersSupported() {
return true;
}
-
+
@Override
public boolean isSingleton() {
return true;
@@ -136,7 +135,7 @@ public class SjmsEndpoint extends Defaul
}
public SjmsComponent getSjmsComponent() {
- return (SjmsComponent) this.getComponent();
+ return (SjmsComponent)this.getComponent();
}
public ConnectionResource getConnectionResource() {
@@ -262,4 +261,20 @@ public class SjmsEndpoint extends Defaul
public String getMessageSelector() {
return messageSelector;
}
+
+ public TransactionCommitStrategy getCommitStrategy() {
+ return commitStrategy;
+ }
+
+ public void setCommitStrategy(TransactionCommitStrategy commitStrategy) {
+ this.commitStrategy = commitStrategy;
+ }
+
+ public int getTransactionBatchCount() {
+ return transactionBatchCount;
+ }
+
+ public void setTransactionBatchCount(int transactionBatchCount) {
+ this.transactionBatchCount = transactionBatchCount;
+ }
}
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsHeaderFilterStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsHeaderFilterStrategy.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsHeaderFilterStrategy.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsHeaderFilterStrategy.java Thu Aug 30 03:42:12 2012
@@ -19,7 +19,7 @@ package org.apache.camel.component.sjms;
import org.apache.camel.impl.DefaultHeaderFilterStrategy;
/**
- * @version
+ * @version
*/
public class SjmsHeaderFilterStrategy extends DefaultHeaderFilterStrategy {
@@ -28,8 +28,10 @@ public class SjmsHeaderFilterStrategy ex
}
protected void initialize() {
- // ignore provider specified JMS extension headers see page 39 of JMS 1.1 specification
- // added "JMSXRecvTimestamp" as a workaround for an Oracle bug/typo in AqjmsMessage
+ // ignore provider specified JMS extension headers see page 39 of JMS
+ // 1.1 specification
+ // added "JMSXRecvTimestamp" as a workaround for an Oracle bug/typo in
+ // AqjmsMessage
getOutFilter().add("JMSXUserID");
getOutFilter().add("JMSXAppID");
getOutFilter().add("JMSXDeliveryCount");
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessageConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessageConsumer.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessageConsumer.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessageConsumer.java Thu Aug 30 03:42:12 2012
@@ -26,11 +26,13 @@ import org.apache.camel.component.sjms.j
/**
* TODO Add Class documentation for SjmsMessageConsumer
- *
*/
public interface SjmsMessageConsumer extends MessageListener {
void handleMessage(Message message);
+
SjmsMessageConsumer createMessageConsumer(ConnectionResource connectionResource, String destinationName) throws Exception;
+
SjmsMessageConsumer createMessageConsumerListener(SessionPool sessionPool, String destinationName, Exchanger<Object> exchanger) throws Exception;
+
void destroyMessageConsumer() throws Exception;
}
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java Thu Aug 30 03:42:12 2012
@@ -16,6 +16,10 @@
*/
package org.apache.camel.component.sjms;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import javax.jms.MessageProducer;
@@ -31,14 +35,12 @@ import org.apache.camel.util.ObjectHelpe
/**
* Base SjmsProducer class.
- *
*/
-public abstract class SjmsProducer extends DefaultAsyncProducer {
+public abstract class SjmsProducer extends DefaultAsyncProducer {
-
/**
- * The {@link MessageProducerResources} pool for all {@link SjmsProducer} classes.
- *
+ * The {@link MessageProducerResources} pool for all {@link SjmsProducer}
+ * classes.
*/
protected class MessageProducerPool extends ObjectPool<MessageProducerResources> {
@@ -50,7 +52,7 @@ public abstract class SjmsProducer exten
protected MessageProducerResources createObject() throws Exception {
return doCreateProducerModel();
}
-
+
@Override
protected void destroyObject(MessageProducerResources model) throws Exception {
if (model.getMessageProducer() != null) {
@@ -73,23 +75,25 @@ public abstract class SjmsProducer exten
}
}
}
-
+
/**
- * The {@link MessageProducer} resources for all {@link SjmsProducer} classes.
+ * The {@link MessageProducer} resources for all {@link SjmsProducer}
+ * classes.
*/
protected class MessageProducerResources {
private final Session session;
private final MessageProducer messageProducer;
+ private final TransactionCommitStrategy commitStrategy;
- /**
- * TODO Add Constructor Javadoc
- *
- * @param session
- * @param messageProducer
- */
public MessageProducerResources(Session session, MessageProducer messageProducer) {
+ this(session, messageProducer, null);
+ }
+
+ public MessageProducerResources(Session session, MessageProducer messageProducer, TransactionCommitStrategy commitStrategy) {
+ super();
this.session = session;
this.messageProducer = messageProducer;
+ this.commitStrategy = commitStrategy;
}
/**
@@ -111,8 +115,18 @@ public abstract class SjmsProducer exten
public MessageProducer getMessageProducer() {
return messageProducer;
}
+
+ /**
+ * Gets the TransactionCommitStrategy value of commitStrategy for this
+ * instance of SjmsProducer.MessageProducerResources.
+ *
+ * @return the commitStrategy
+ */
+ public TransactionCommitStrategy getCommitStrategy() {
+ return commitStrategy;
+ }
}
-
+
private MessageProducerPool producers;
private final ExecutorService executor;
@@ -135,24 +149,29 @@ public abstract class SjmsProducer exten
super.doStop();
if (getProducers() != null) {
getProducers().drainPool();
- setProducers(null);
+ setProducers(null);
}
}
-
+
public abstract MessageProducerResources doCreateProducerModel() throws Exception;
-
+
public abstract void sendMessage(Exchange exchange, final AsyncCallback callback) throws Exception;
-
+
@Override
public boolean process(final Exchange exchange, final AsyncCallback callback) {
- if (log.isDebugEnabled()) {
- log.debug("Processing Exchange.id:{}", exchange.getExchangeId());
+ log.debug("Processing Exchange.id:{}", exchange.getExchangeId());
+
+ Object body = exchange.getIn().getBody();
+ if (body != null) {
+ if (body instanceof InputStream) {
+ byte[] bytes = exchange.getContext().getTypeConverter().convertTo(byte[].class, body);
+ exchange.getIn().setBody(bytes);
+ }
}
+
try {
if (!isSynchronous()) {
- if (log.isDebugEnabled()) {
- log.debug(" Sending message asynchronously: {}", exchange.getIn().getBody());
- }
+ log.debug(" Sending message asynchronously: {}", exchange.getIn().getBody());
getExecutor().execute(new Runnable() {
@Override
public void run() {
@@ -164,37 +183,70 @@ public abstract class SjmsProducer exten
}
});
} else {
- if (log.isDebugEnabled()) {
- log.debug(" Sending message synchronously: {}", exchange.getIn().getBody());
- }
+ log.debug(" Sending message synchronously: {}", exchange.getIn().getBody());
sendMessage(exchange, callback);
}
} catch (Exception e) {
- if (log.isDebugEnabled()) {
- log.debug("Processing Exchange.id:{}", exchange.getExchangeId() + " - FAILED");
- }
- if (log.isTraceEnabled()) {
- log.trace("Exception: " + e.getLocalizedMessage(), e);
- }
+ log.debug("Processing Exchange.id:{}", exchange.getExchangeId() + " - FAILED");
+ log.trace("Exception: " + e.getLocalizedMessage(), e);
exchange.setException(e);
}
- if (log.isDebugEnabled()) {
- log.debug("Processing Exchange.id:{}", exchange.getExchangeId() + " - SUCCESS");
- }
+ log.debug("Processing Exchange.id:{}", exchange.getExchangeId() + " - SUCCESS");
+
return isSynchronous();
}
+
+ public static byte[] getBytes(InputStream is) throws IOException {
+ int len;
+ int size = 1024;
+ byte[] buf;
+
+ if (is instanceof ByteArrayInputStream) {
+ size = is.available();
+ buf = new byte[size];
+ len = is.read(buf, 0, size);
+ } else {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ buf = new byte[size];
+ while ((len = is.read(buf, 0, size)) != -1) {
+ bos.write(buf, 0, len);
+ }
+ buf = bos.toByteArray();
+ }
+ return buf;
+ }
+
+// public static byte[] getBytesFromStream(InputStream is) throws IOException {
+// BufferedInputStream bis = new BufferedInputStream(is);
+// bis.available();
+//
+// // Create the byte array to hold the data
+// byte[] bytes = new byte[(int)bis.available()];
+//
+// // Read in the bytes
+// int offset = 0;
+// int numRead = 0;
+// while (offset < bytes.length && (numRead = is.read(bytes, offset, bytes.length - offset)) >= 0) {
+// offset += numRead;
+// }
+//
+// // Close the input stream and return bytes
+// is.close();
+// return bytes;
+// }
+
protected SjmsEndpoint getSjmsEndpoint() {
return (SjmsEndpoint)this.getEndpoint();
}
-
+
protected ConnectionResource getConnectionResource() {
return getSjmsEndpoint().getConnectionResource();
}
-
+
/**
* Gets the acknowledgment mode for this instance of DestinationProducer.
- *
+ *
* @return int
*/
public int getAcknowledgeMode() {
@@ -203,8 +255,8 @@ public abstract class SjmsProducer exten
/**
* Gets the synchronous value for this instance of DestinationProducer.
- *
- * @return true if synchronous, otherwise false
+ *
+ * @return true if synchronous, otherwise false
*/
public boolean isSynchronous() {
return getSjmsEndpoint().isSynchronous();
@@ -212,7 +264,7 @@ public abstract class SjmsProducer exten
/**
* Gets the replyTo for this instance of DestinationProducer.
- *
+ *
* @return String
*/
public String getReplyTo() {
@@ -221,7 +273,7 @@ public abstract class SjmsProducer exten
/**
* Gets the destinationName for this instance of DestinationProducer.
- *
+ *
* @return String
*/
public String getDestinationName() {
@@ -230,7 +282,7 @@ public abstract class SjmsProducer exten
/**
* Sets the producer pool for this instance of SjmsProducer.
- *
+ *
* @param producers A MessageProducerPool
*/
public void setProducers(MessageProducerPool producers) {
@@ -238,8 +290,9 @@ public abstract class SjmsProducer exten
}
/**
- * Gets the MessageProducerPool value of producers for this instance of SjmsProducer.
- *
+ * Gets the MessageProducerPool value of producers for this instance of
+ * SjmsProducer.
+ *
* @return the producers
*/
public MessageProducerPool getProducers() {
@@ -275,7 +328,7 @@ public abstract class SjmsProducer exten
/**
* Gets the producerCount for this instance of SjmsProducer.
- *
+ *
* @return int
*/
public int getProducerCount() {
@@ -284,7 +337,7 @@ public abstract class SjmsProducer exten
/**
* Gets consumerCount for this instance of SjmsProducer.
- *
+ *
* @return int
*/
public int getConsumerCount() {
@@ -293,7 +346,7 @@ public abstract class SjmsProducer exten
/**
* Gets the executor for this instance of SjmsProducer.
- *
+ *
* @return ExecutorService
*/
public ExecutorService getExecutor() {
@@ -302,30 +355,38 @@ public abstract class SjmsProducer exten
/**
* Gets the ttl for this instance of SjmsProducer.
- *
+ *
* @return long
*/
public long getTtl() {
- return getSjmsEndpoint().getTtl();
+ return getSjmsEndpoint().getTtl();
}
/**
* Gets the boolean value of persistent for this instance of SjmsProducer.
- *
+ *
* @return true if persistent, otherwise false
*/
public boolean isPersistent() {
- return getSjmsEndpoint().isPersistent();
+ return getSjmsEndpoint().isPersistent();
}
-
/**
* Gets responseTimeOut for this instance of SjmsProducer.
- *
+ *
* @return long
*/
public long getResponseTimeOut() {
return getSjmsEndpoint().getResponseTimeOut();
}
+ /**
+ * Gets commitStrategy for this instance of SjmsProducer.
+ *
+ * @return TransactionCommitStrategy
+ */
+ public TransactionCommitStrategy getCommitStrategy() {
+ return getSjmsEndpoint().getCommitStrategy();
+ }
+
}
Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/TransactionCommitStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/TransactionCommitStrategy.java?rev=1378796&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/TransactionCommitStrategy.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/TransactionCommitStrategy.java Thu Aug 30 03:42:12 2012
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms;
+
+import org.apache.camel.Exchange;
+
+/**
+ * Provides a entry point into the transaction
+ * {@link org.apache.camel.spi.Synchronization} workflow that will allow a user
+ * to control when the {@link javax.jms.Session} commit operation is executed.
+ *
+ */
+public interface TransactionCommitStrategy {
+
+ /**
+ * Should returns true to allow the commit to proceed. If false, the commit
+ * will be skipped. The default should always be true to avoid messages
+ * remaining uncommitted.
+ *
+ * @param exchange {@link org.apache.camel.Exchange}
+ * @return true if the {@link javax.jms.Session} should be committed,
+ * otherwise false
+ * @throws Exception
+ */
+ boolean commit(Exchange exchange) throws Exception;
+
+ /**
+ * Should returns true to allow the commit to proceed. If false, the commit
+ * will be skipped. The default should always be true to avoid messages
+ * remaining uncommitted.
+ *
+ * @param exchange {@link org.apache.camel.Exchange}
+ * @return true if the {@link javax.jms.Session} should be committed,
+ * otherwise false
+ * @throws Exception
+ */
+ boolean rollback(Exchange exchange) throws Exception;
+}
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultConsumer.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultConsumer.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultConsumer.java Thu Aug 30 03:42:12 2012
@@ -27,21 +27,22 @@ import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.component.sjms.SjmsConsumer;
import org.apache.camel.component.sjms.SjmsEndpoint;
+import org.apache.camel.component.sjms.TransactionCommitStrategy;
import org.apache.camel.component.sjms.jms.JmsObjectFactory;
import org.apache.camel.component.sjms.jms.ObjectPool;
+import org.apache.camel.component.sjms.tx.BatchTransactionCommitStrategy;
+import org.apache.camel.component.sjms.tx.DefaultTransactionCommitStrategy;
import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization;
/**
* A non-transacted queue consumer for a given JMS Destination
- *
*/
public class DefaultConsumer extends SjmsConsumer {
-
+
protected MessageConsumerPool consumers;
private final ExecutorService executor;
- protected class MessageConsumerPool extends
- ObjectPool<MessageConsumerResources> {
+ protected class MessageConsumerPool extends ObjectPool<MessageConsumerResources> {
public MessageConsumerPool() {
super(getConsumerCount());
@@ -50,9 +51,7 @@ public class DefaultConsumer extends Sjm
@Override
protected MessageConsumerResources createObject() throws Exception {
MessageConsumerResources model = null;
- if (isEndpointTransacted()
- || getSjmsEndpoint().getExchangePattern().equals(
- ExchangePattern.InOut)) {
+ if (isTransacted() || getSjmsEndpoint().getExchangePattern().equals(ExchangePattern.InOut)) {
model = createConsumerWithDedicatedSession();
} else {
model = createConsumerListener();
@@ -61,8 +60,7 @@ public class DefaultConsumer extends Sjm
}
@Override
- protected void destroyObject(MessageConsumerResources model)
- throws Exception {
+ protected void destroyObject(MessageConsumerResources model) throws Exception {
if (model != null) {
if (model.getMessageConsumer() != null) {
if (model.getMessageConsumer().getMessageListener() != null) {
@@ -90,24 +88,20 @@ public class DefaultConsumer extends Sjm
private final MessageConsumer messageConsumer;
/**
- * TODO Add Constructor Javadoc
- *
- * @param session
* @param messageProducer
*/
public MessageConsumerResources(MessageConsumer messageConsumer) {
+ super();
this.session = null;
this.messageConsumer = messageConsumer;
}
/**
- * TODO Add Constructor Javadoc
- *
* @param session
* @param messageProducer
*/
- public MessageConsumerResources(Session session,
- MessageConsumer messageConsumer) {
+ public MessageConsumerResources(Session session, MessageConsumer messageConsumer) {
+ super();
this.session = session;
this.messageConsumer = messageConsumer;
}
@@ -135,8 +129,7 @@ public class DefaultConsumer extends Sjm
public DefaultConsumer(SjmsEndpoint endpoint, Processor processor) {
super(endpoint, processor);
- this.executor = endpoint.getCamelContext().getExecutorServiceManager()
- .newDefaultThreadPool(this, "SjmsConsumer");
+ this.executor = endpoint.getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "SjmsConsumer");
}
@Override
@@ -170,7 +163,7 @@ public class DefaultConsumer extends Sjm
private MessageConsumerResources createConsumerWithDedicatedSession() throws Exception {
Connection conn = getConnectionResource().borrowConnection();
Session session = null;
- if (isEndpointTransacted()) {
+ if (isTransacted()) {
session = conn.createSession(true, Session.SESSION_TRANSACTED);
} else {
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -205,26 +198,30 @@ public class DefaultConsumer extends Sjm
/**
* Helper factory method used to create a MessageListener based on the MEP
*
- * @param session
- * a session is only required if we are a transacted consumer
+ * @param session a session is only required if we are a transacted consumer
* @return
*/
protected MessageListener createMessageHandler(Session session) {
+
+ TransactionCommitStrategy commitStrategy = null;
+ if (this.getCommitStrategy() != null) {
+ commitStrategy = this.getCommitStrategy();
+ } else if (this.getTransactionBatchCount() > 0) {
+ commitStrategy = new BatchTransactionCommitStrategy(this.getTransactionBatchCount());
+ } else {
+ commitStrategy = new DefaultTransactionCommitStrategy();
+ }
+
DefaultMessageHandler messageHandler = null;
- if (getSjmsEndpoint().getExchangePattern().equals(
- ExchangePattern.InOnly)) {
- if (isEndpointTransacted()) {
- messageHandler = new InOnlyMessageHandler(getEndpoint(),
- executor,
- new SessionTransactionSynchronization(session));
+ if (getSjmsEndpoint().getExchangePattern().equals(ExchangePattern.InOnly)) {
+ if (isTransacted()) {
+ messageHandler = new InOnlyMessageHandler(getEndpoint(), executor, new SessionTransactionSynchronization(session, commitStrategy));
} else {
messageHandler = new InOnlyMessageHandler(getEndpoint(), executor);
}
} else {
- if (isEndpointTransacted()) {
- messageHandler = new InOutMessageHandler(getEndpoint(),
- executor,
- new SessionTransactionSynchronization(session));
+ if (isTransacted()) {
+ messageHandler = new InOutMessageHandler(getEndpoint(), executor, new SessionTransactionSynchronization(session, commitStrategy));
} else {
messageHandler = new InOutMessageHandler(getEndpoint(), executor);
}
@@ -232,7 +229,7 @@ public class DefaultConsumer extends Sjm
messageHandler.setSession(session);
messageHandler.setProcessor(getAsyncProcessor());
messageHandler.setSynchronous(isSynchronous());
- messageHandler.setTransacted(isEndpointTransacted());
+ messageHandler.setTransacted(isTransacted());
messageHandler.setTopic(isTopic());
return messageHandler;
}
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultMessageHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultMessageHandler.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultMessageHandler.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultMessageHandler.java Thu Aug 30 03:42:12 2012
@@ -18,7 +18,6 @@ package org.apache.camel.component.sjms.
import java.util.concurrent.ExecutorService;
-import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
@@ -27,12 +26,11 @@ import org.apache.camel.AsyncProcessor;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.component.sjms.jms.JmsMessageHelper;
-import org.apache.camel.component.sjms.jms.SessionAcknowledgementType;
+import org.apache.camel.component.sjms.TransactionCommitStrategy;
+import org.apache.camel.component.sjms.jms.JmsMessageExchangeHelper;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.util.ObjectHelper;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,34 +38,41 @@ import static org.apache.camel.util.Obje
/**
* TODO Add Class documentation for DefaultMessageHandler
- *
*/
public abstract class DefaultMessageHandler implements MessageListener {
protected final Logger log = LoggerFactory.getLogger(getClass());
-
+
private final ExecutorService executor;
-
+
private Endpoint endpoint;
private AsyncProcessor processor;
private Session session;
private boolean transacted;
- private SessionAcknowledgementType acknowledgementType = SessionAcknowledgementType.AUTO_ACKNOWLEDGE;
private boolean synchronous = true;
- private Destination namedReplyTo;
private Synchronization synchronization;
private boolean topic;
+ private TransactionCommitStrategy commitStrategy;
public DefaultMessageHandler(Endpoint endpoint, ExecutorService executor) {
- this(endpoint, executor, null);
+ this.endpoint = endpoint;
+ this.executor = executor;
}
public DefaultMessageHandler(Endpoint endpoint, ExecutorService executor, Synchronization synchronization) {
+ super();
this.synchronization = synchronization;
this.endpoint = endpoint;
this.executor = executor;
}
+ public DefaultMessageHandler(Endpoint endpoint, ExecutorService executor, TransactionCommitStrategy commitStrategy) {
+ super();
+ this.endpoint = endpoint;
+ this.executor = executor;
+ this.commitStrategy = commitStrategy;
+ }
+
@Override
public void onMessage(Message message) {
handleMessage(message);
@@ -79,24 +84,19 @@ public abstract class DefaultMessageHand
private void handleMessage(Message message) {
RuntimeCamelException rce = null;
try {
- final DefaultExchange exchange = (DefaultExchange) JmsMessageHelper
- .createExchange(message, getEndpoint());
- if (log.isDebugEnabled()) {
- log.debug("Processing Exchange.id:{}", exchange.getExchangeId());
- }
+ final DefaultExchange exchange = (DefaultExchange)JmsMessageExchangeHelper.createExchange(message, getEndpoint());
+
+ log.debug("Processing Exchange.id:{}", exchange.getExchangeId());
+
if (isTransacted() && synchronization != null) {
exchange.addOnCompletion(synchronization);
}
try {
if (isTransacted() || isSynchronous()) {
- if (log.isDebugEnabled()) {
- log.debug(" Handling synchronous message: {}", exchange.getIn().getBody());
- }
+ log.debug(" Handling synchronous message: {}", exchange.getIn().getBody());
doHandleMessage(exchange);
} else {
- if (log.isDebugEnabled()) {
- log.debug(" Handling asynchronous message: {}", exchange.getIn().getBody());
- }
+ log.debug(" Handling asynchronous message: {}", exchange.getIn().getBody());
executor.execute(new Runnable() {
@Override
public void run() {
@@ -128,7 +128,7 @@ public abstract class DefaultMessageHand
}
public abstract void doHandleMessage(final Exchange exchange);
-
+
public abstract void close();
public void setTransacted(boolean transacted) {
@@ -151,15 +151,6 @@ public abstract class DefaultMessageHand
this.processor = processor;
}
- public SessionAcknowledgementType getAcknowledgementType() {
- return acknowledgementType;
- }
-
- public void setAcknowledgementType(
- SessionAcknowledgementType acknowledgementType) {
- this.acknowledgementType = acknowledgementType;
- }
-
public void setSession(Session session) {
this.session = session;
}
@@ -176,14 +167,6 @@ public abstract class DefaultMessageHand
return synchronous;
}
- public void setNamedReplyTo(Destination namedReplyToDestination) {
- this.namedReplyTo = namedReplyToDestination;
- }
-
- public Destination getNamedReplyTo() {
- return namedReplyTo;
- }
-
public void setTopic(boolean topic) {
this.topic = topic;
}
@@ -191,4 +174,8 @@ public abstract class DefaultMessageHand
public boolean isTopic() {
return topic;
}
+
+ public TransactionCommitStrategy getCommitStrategy() {
+ return commitStrategy;
+ }
}
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java Thu Aug 30 03:42:12 2012
@@ -21,27 +21,27 @@ import java.util.concurrent.ExecutorServ
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
+import org.apache.camel.component.sjms.TransactionCommitStrategy;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.util.AsyncProcessorHelper;
/**
- * TODO Add Class documentation for DefaultMessageHandler
+ * An InOnly {@link DefaultMessageHandler}
*
*/
public class InOnlyMessageHandler extends DefaultMessageHandler {
/**
- *
* @param endpoint
- * @param processor
+ * @param executor
*/
public InOnlyMessageHandler(Endpoint endpoint, ExecutorService executor) {
- this(endpoint, executor, null);
+ super(endpoint, executor);
}
/**
- *
- * @param stopped
+ * @param endpoint
+ * @param executor
* @param synchronization
*/
public InOnlyMessageHandler(Endpoint endpoint, ExecutorService executor, Synchronization synchronization) {
@@ -49,6 +49,16 @@ public class InOnlyMessageHandler extend
}
/**
+ * @param endpoint
+ * @param executor
+ * @param commitStrategy
+ * @param rollbackStrategy
+ */
+ public InOnlyMessageHandler(Endpoint endpoint, ExecutorService executor, TransactionCommitStrategy commitStrategy) {
+ super(endpoint, executor, commitStrategy);
+ }
+
+ /**
* @param message
*/
@Override
@@ -61,6 +71,7 @@ public class InOnlyMessageHandler extend
} else {
NoOpAsyncCallback callback = new NoOpAsyncCallback();
if (isTransacted() || isSynchronous()) {
+ //SessionTransactionAsyncCallback callback = new SessionTransactionAsyncCallback(exchange, getSession(), getCommitStrategy());
// must process synchronous if transacted or configured to
// do so
if (log.isDebugEnabled()) {
@@ -75,11 +86,10 @@ public class InOnlyMessageHandler extend
}
} else {
// process asynchronous using the async routing engine
- if (log.isDebugEnabled()) {
- log.debug("Aynchronous processing: Message[{}], Destination[{}] ", exchange.getIn().getBody(), this.getEndpoint().getEndpointUri());
- }
- boolean sync = AsyncProcessorHelper.process(getProcessor(),
- exchange, callback);
+ log.debug("Aynchronous processing: Message[{}], Destination[{}] ", exchange.getIn().getBody(), this.getEndpoint().getEndpointUri());
+ boolean sync = false;
+
+ sync = AsyncProcessorHelper.process(getProcessor(), exchange, callback);
if (!sync) {
// will be done async so return now
return;
@@ -87,7 +97,7 @@ public class InOnlyMessageHandler extend
}
}
}
-
+
@Override
public void close() {
// no-op
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java Thu Aug 30 03:42:12 2012
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.sjms.consumer;
+import java.io.InputStream;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
@@ -32,42 +33,58 @@ import javax.jms.Topic;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
-import org.apache.camel.component.sjms.jms.JmsMessageHelper;
+import org.apache.camel.component.sjms.TransactionCommitStrategy;
+import org.apache.camel.component.sjms.jms.JmsMessageExchangeHelper;
import org.apache.camel.component.sjms.jms.JmsObjectFactory;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.ObjectHelper;
/**
- * TODO Add Class documentation for DefaultMessageHandler
- * TODO Create a producer cache manager to store and purge unused cashed producers or we will have a memory leak
- *
+ * TODO Add Class documentation for DefaultMessageHandler
+ * TODO Create a producer
+ * cache manager to store and purge unused cashed producers or we will have a
+ * memory leak
*/
public class InOutMessageHandler extends DefaultMessageHandler {
private Map<String, MessageProducer> producerCache = new TreeMap<String, MessageProducer>();
private ReadWriteLock lock = new ReentrantReadWriteLock();
+
/**
* TODO Add Constructor Javadoc
- *
+ *
* @param endpoint
- * @param processor
+ * @param executor
*/
public InOutMessageHandler(Endpoint endpoint, ExecutorService executor) {
- this(endpoint, executor, null);
+ super(endpoint, executor);
}
-
+
/**
* TODO Add Constructor Javadoc
- *
- * @param stopped
+ *
+ * @param endpoint
+ * @param executor
* @param synchronization
*/
public InOutMessageHandler(Endpoint endpoint, ExecutorService executor, Synchronization synchronization) {
super(endpoint, executor, synchronization);
}
-
+
+ /**
+ * TODO Add Constructor Javadoc
+ *
+ * @param endpoint
+ * @param executor
+ * @param commitStrategy
+ * @param rollbackStrategy
+ */
+ public InOutMessageHandler(Endpoint endpoint, ExecutorService executor, TransactionCommitStrategy commitStrategy) {
+ super(endpoint, executor, commitStrategy);
+ }
+
/**
* @param message
*/
@@ -83,8 +100,7 @@ public class InOutMessageHandler extends
} else if (obj instanceof String) {
replyTo = JmsObjectFactory.createDestination(getSession(), (String)obj, isTopic());
} else {
- throw new Exception("The value of JMSReplyTo must be a valid Destination or String. Value provided: "
- + obj);
+ throw new Exception("The value of JMSReplyTo must be a valid Destination or String. Value provided: " + obj);
}
String destinationName = getDestinationName(replyTo);
@@ -106,7 +122,7 @@ public class InOutMessageHandler extends
}
}
}
-
+
MessageHanderAsyncCallback callback = new MessageHanderAsyncCallback(exchange, messageProducer);
if (exchange.isFailed()) {
return;
@@ -114,9 +130,7 @@ public class InOutMessageHandler extends
if (isTransacted() || isSynchronous()) {
// must process synchronous if transacted or configured to
// do so
- if (log.isDebugEnabled()) {
- log.debug("Synchronous processing: Message[{}], Destination[{}] ", exchange.getIn().getBody(), this.getEndpoint().getEndpointUri());
- }
+ log.debug("Synchronous processing: Message[{}], Destination[{}] ", exchange.getIn().getBody(), this.getEndpoint().getEndpointUri());
try {
AsyncProcessorHelper.process(getProcessor(), exchange);
} catch (Exception e) {
@@ -126,9 +140,7 @@ public class InOutMessageHandler extends
}
} else {
// process asynchronous using the async routing engine
- if (log.isDebugEnabled()) {
- log.debug("Aynchronous processing: Message[{}], Destination[{}] ", exchange.getIn().getBody(), this.getEndpoint().getEndpointUri());
- }
+ log.debug("Aynchronous processing: Message[{}], Destination[{}] ", exchange.getIn().getBody(), this.getEndpoint().getEndpointUri());
boolean sync = AsyncProcessorHelper.process(getProcessor(), exchange, callback);
if (!sync) {
// will be done async so return now
@@ -141,11 +153,10 @@ public class InOutMessageHandler extends
}
if (log.isDebugEnabled()) {
- log.debug("SjmsMessageConsumer invoked for Exchange id:{} ",
- exchange.getExchangeId());
+ log.debug("SjmsMessageConsumer invoked for Exchange id:{} ", exchange.getExchangeId());
}
}
-
+
@Override
public void close() {
for (String key : producerCache.keySet()) {
@@ -158,11 +169,11 @@ public class InOutMessageHandler extends
}
producerCache.clear();
}
-
+
private boolean isDestination(Object object) {
return object instanceof Destination;
}
-
+
private String getDestinationName(Destination destination) throws Exception {
String answer = null;
if (destination instanceof Queue) {
@@ -180,6 +191,7 @@ public class InOutMessageHandler extends
private MessageProducer localProducer;
public MessageHanderAsyncCallback(Exchange exchange, MessageProducer localProducer) {
+ super();
this.exchange = exchange;
this.localProducer = localProducer;
}
@@ -188,10 +200,15 @@ public class InOutMessageHandler extends
public void done(boolean sync) {
try {
- Message response = JmsMessageHelper.createMessage(exchange,
- getSession(), true);
- response.setJMSCorrelationID(exchange.getIn().getHeader(
- "JMSCorrelationID", String.class));
+ Object body = exchange.getOut().getBody();
+ if (body != null) {
+ if (body instanceof InputStream) {
+ byte[] bytes = exchange.getContext().getTypeConverter().convertTo(byte[].class, body);
+ exchange.getOut().setBody(bytes);
+ }
+ }
+ Message response = JmsMessageExchangeHelper.createMessage(exchange, getSession(), true);
+ response.setJMSCorrelationID(exchange.getIn().getHeader("JMSCorrelationID", String.class));
localProducer.send(response);
} catch (Exception e) {
exchange.setException(e);
Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/ConnectionFactoryResource.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/ConnectionFactoryResource.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/ConnectionFactoryResource.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/ConnectionFactoryResource.java Thu Aug 30 03:42:12 2012
@@ -34,6 +34,7 @@ public class ConnectionFactoryResource e
* Default Constructor
*/
public ConnectionFactoryResource() {
+ super();
}
/**
Copied: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageExchangeHelper.java (from r1378786, camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageExchangeHelper.java?p2=camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageExchangeHelper.java&p1=camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java&r1=1378786&r2=1378796&rev=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageExchangeHelper.java Thu Aug 30 03:42:12 2012
@@ -57,11 +57,11 @@ import static org.apache.camel.util.Obje
*
* @version
*/
-public final class JmsMessageHelper {
+public final class JmsMessageExchangeHelper {
- private static final Logger LOGGER = LoggerFactory.getLogger(JmsMessageHelper.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(JmsMessageExchangeHelper.class);
- private JmsMessageHelper() {
+ private JmsMessageExchangeHelper() {
}
public static Exchange createExchange(Message message, Endpoint endpoint) {
@@ -72,7 +72,7 @@ public final class JmsMessageHelper {
@SuppressWarnings("unchecked")
public static Exchange populateExchange(Message message, Exchange exchange, boolean out) {
try {
- JmsMessageHelper.setJmsMessageHeaders(message, exchange, out);
+ JmsMessageExchangeHelper.setJmsMessageHeaders(message, exchange, out);
if (message != null) {
// convert to JMS Message of the given type
@@ -82,7 +82,7 @@ public final class JmsMessageHelper {
} else {
bodyMessage = (DefaultMessage) exchange.getIn();
}
- switch (JmsMessageHelper.discoverType(message)) {
+ switch (JmsMessageExchangeHelper.discoverType(message)) {
case Bytes:
BytesMessage bytesMessage = (BytesMessage) message;
if (bytesMessage.getBodyLength() > Integer.MAX_VALUE) {
@@ -269,9 +269,7 @@ public final class JmsMessageHelper {
try {
message.setJMSReplyTo(replyTo);
} catch (Exception e) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Error setting the correlationId: {}", replyTo.toString());
- }
+ LOGGER.debug("Error setting the correlationId: {}", replyTo.toString());
}
}
@@ -387,7 +385,7 @@ public final class JmsMessageHelper {
} else if (headerName.equalsIgnoreCase("JMSPriority")) {
jmsMessage.setJMSPriority(ExchangeHelper.convertToType(exchange, Integer.class, headerValue));
} else if (headerName.equalsIgnoreCase("JMSDeliveryMode")) {
- JmsMessageHelper.setJMSDeliveryMode(exchange, jmsMessage, headerValue);
+ JmsMessageExchangeHelper.setJMSDeliveryMode(exchange, jmsMessage, headerValue);
} else if (headerName.equalsIgnoreCase("JMSExpiration")) {
jmsMessage.setJMSExpiration(ExchangeHelper.convertToType(exchange, Long.class, headerValue));
} else {
@@ -410,7 +408,7 @@ public final class JmsMessageHelper {
} else {
if (!(headerValue instanceof JmsMessageType)) {
String encodedName = new DefaultJmsKeyFormatStrategy().encodeKey(headerName);
- JmsMessageHelper.setProperty(jmsMessage, encodedName, headerValue);
+ JmsMessageExchangeHelper.setProperty(jmsMessage, encodedName, headerValue);
}
}
}
@@ -492,10 +490,10 @@ public final class JmsMessageHelper {
jmsMessage.getJMSTimestamp());
headers.put(
JmsMessageHeaderType.JMSReplyTo.toString(),
- JmsMessageHelper.getJMSReplyTo(jmsMessage));
+ JmsMessageExchangeHelper.getJMSReplyTo(jmsMessage));
headers.put(
JmsMessageHeaderType.JMSType.toString(),
- JmsMessageHelper.getJMSType(jmsMessage));
+ JmsMessageExchangeHelper.getJMSType(jmsMessage));
// this works around a bug in the ActiveMQ property handling
headers.put(
@@ -537,7 +535,7 @@ public final class JmsMessageHelper {
} else {
body = exchange.getIn().getBody();
}
- JmsMessageType messageType = JmsMessageHelper.discoverType(exchange);
+ JmsMessageType messageType = JmsMessageExchangeHelper.discoverType(exchange);
switch (messageType) {
case Bytes:
@@ -566,6 +564,7 @@ public final class JmsMessageHelper {
answer = textMessage;
break;
default:
+ answer = session.createMessage();
break;
}
} catch (Exception e) {
@@ -573,9 +572,60 @@ public final class JmsMessageHelper {
throw e;
}
- answer = JmsMessageHelper.setJmsMessageHeaders(exchange, answer);
+ answer = JmsMessageExchangeHelper.setJmsMessageHeaders(exchange, answer);
return answer;
}
+//
+// @SuppressWarnings("unchecked")
+// public static Message createMessage(Session session, Object payload, ) throws Exception {
+// Message answer = null;
+// Object body = null;
+// try {
+// if (out && exchange.getOut().getBody() != null) {
+// body = exchange.getOut().getBody();
+// } else {
+// body = exchange.getIn().getBody();
+// }
+// JmsMessageType messageType = JmsMessageExchangeHelper.discoverType(exchange);
+//
+// switch (messageType) {
+// case Bytes:
+// BytesMessage bytesMessage = session.createBytesMessage();
+// bytesMessage.writeBytes((byte[])body);
+// answer = bytesMessage;
+// break;
+// case Map:
+// MapMessage mapMessage = session.createMapMessage();
+// Map<String, Object> objMap = (Map<String, Object>)body;
+// Set<String> keys = objMap.keySet();
+// for (String key : keys) {
+// Object value = objMap.get(key);
+// mapMessage.setObject(key, value);
+// }
+// answer = mapMessage;
+// break;
+// case Object:
+// ObjectMessage objectMessage = session.createObjectMessage();
+// objectMessage.setObject((Serializable)body);
+// answer = objectMessage;
+// break;
+// case Text:
+// TextMessage textMessage = session.createTextMessage();
+// textMessage.setText((String)body);
+// answer = textMessage;
+// break;
+// default:
+// break;
+// }
+// } catch (Exception e) {
+// LOGGER.error("TODO Auto-generated catch block", e);
+// throw e;
+// }
+//
+// answer = JmsMessageExchangeHelper.setJmsMessageHeaders(exchange, answer);
+// return answer;
+// }
+//
private static boolean hasIllegalHeaderKey(String key) {
if (key == null) {