You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2012/08/30 05:42:14 UTC

svn commit: r1378796 [1/3] - in /camel/trunk/components/camel-sjms: ./ src/main/java/org/apache/camel/component/sjms/ src/main/java/org/apache/camel/component/sjms/consumer/ src/main/java/org/apache/camel/component/sjms/jms/ src/main/java/org/apache/ca...

Author: ningjiang
Date: Thu Aug 30 03:42:12 2012
New Revision: 1378796

URL: http://svn.apache.org/viewvc?rev=1378796&view=rev
Log:
CAMEL-5497 Add Batch Transaction Support with thanks to Scott

Added:
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/BatchMessage.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/TransactionCommitStrategy.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageExchangeHelper.java
      - copied, changed from r1378786, camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/BatchTransactionCommitStrategy.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/DefaultTransactionCommitStrategy.java
      - copied, changed from r1378786, camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/KeyFormatStrategy.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueConsumerTest.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedQueueProducerTest.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicConsumerTest.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/BatchTransactedTopicProducerTest.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedInOnlyQueueConsumerTest.java
      - copied, changed from r1378786, camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/TransactedInOnlyQueueConsumerTest.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedInOnlyTopicConsumerTest.java
      - copied, changed from r1378786, camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/TransactedInOnlyTopicConsumerTest.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedQueueProducerTest.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/tx/TransactedTopicProducerTest.java
Removed:
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/TransactedInOnlyQueueConsumerTest.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/TransactedInOnlyTopicConsumerTest.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/TransactedQueueProducerTest.java
Modified:
    camel/trunk/components/camel-sjms/pom.xml
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/DefaultJmsKeyFormatStrategy.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/IllegalHeaderException.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/KeyFormatStrategy.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/MissingHeaderException.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsHeaderFilterStrategy.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessageConsumer.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultConsumer.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultMessageHandler.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/ConnectionFactoryResource.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
    camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionTransactionSynchronization.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/JmsSelectorOptionTest.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/JmsSelectorTest.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SimpleJmsComponentTest.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointNameOverrideTest.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointTest.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerDefaultTest.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyQueueConsumerTest.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyTopicConsumerTest.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/ObjectPoolTest.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyQueueProducerTest.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyTopicProducerTest.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerAsyncLoadTest.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerSyncLoadTest.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerTest.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutTempQueueProducerTest.java
    camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerTest.java
    camel/trunk/components/camel-sjms/src/test/resources/log4j.properties

Modified: camel/trunk/components/camel-sjms/pom.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/pom.xml?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/pom.xml (original)
+++ camel/trunk/components/camel-sjms/pom.xml Thu Aug 30 03:42:12 2012
@@ -1,22 +1,25 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
+<project
+    xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <!--
+
+        Licensed to the Apache Software Foundation (ASF) under one or more
+        contributor license agreements.  See the NOTICE file distributed with
+        this work for additional information regarding copyright ownership.
+        The ASF licenses this file to You under the Apache License, Version 2.0
+        (the "License"); you may not use this file except in compliance with
+        the License.  You may obtain a copy of the License at
+
+            http://www.apache.org/licenses/LICENSE-2.0
+
+        Unless required by applicable law or agreed to in writing, software
+        distributed under the License is distributed on an "AS IS" BASIS,
+        WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+        See the License for the specific language governing permissions and
+        limitations under the License.
+    -->
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
@@ -31,6 +34,7 @@
     <name>Camel :: SJMS</name>
     <description>A pure Java JMS Camel Component</description>
 
+
     <properties>
         <camel.osgi.export.pkg>
             org.apache.camel.component.sjms,
@@ -46,21 +50,33 @@
             org.apache.camel.component.sjms.producer,
             org.apache.camel.component.sjms.tx
         </camel.osgi.private.pkg>
-    </properties>
 
+        <maven.test.redirectTestOutputToFile>true</maven.test.redirectTestOutputToFile>
+    </properties>
     <dependencies>
-
+    <!-- =============================== -->
+    <!-- Required Dependencies -->
+    <!-- =============================== -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-core</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.geronimo.specs</groupId>
+            <artifactId>geronimo-annotation_1.0_spec</artifactId>
+            <version>1.1.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.geronimo.specs</groupId>
             <artifactId>geronimo-jms_1.1_spec</artifactId>
-            <scope>provided</scope>
+            <version>1.1.1</version>
         </dependency>
 
-        <!-- for testing -->
+    <!-- for testing -->
         <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-test</artifactId>
@@ -98,35 +114,64 @@
         <dependency>
             <groupId>log4j</groupId>
             <artifactId>log4j</artifactId>
+            <optional>true</optional>
             <scope>test</scope>
         </dependency>
     </dependencies>
 
     <build>
-        <plugins>
+        <defaultGoal>install</defaultGoal>
 
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>2.3.2</version>
+                <configuration>
+                    <source>1.6</source>
+                    <target>1.6</target>
+                </configuration>
+            </plugin>      <!-- Configure which tests are included/excuded -->
             <plugin>
                 <artifactId>maven-surefire-plugin</artifactId>
                 <configuration>
+                    <forkMode>pertest</forkMode>
                     <childDelegation>false</childDelegation>
                     <useFile>true</useFile>
-                    <forkMode>pertest</forkMode>
-                    <forkedProcessTimeoutInSeconds>6000</forkedProcessTimeoutInSeconds>
+                    <argLine>-Xmx512M</argLine>
+                    <systemProperties>
+                        <property>
+                            <name>org.apache.activemq.default.directory.prefix</name>
+                            <value>target/activemq/</value>
+                        </property>
+                    </systemProperties>
                 </configuration>
             </plugin>
-
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-clean-plugin</artifactId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+                <version>2.8.1</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>integration-test</goal>
+                            <goal>verify</goal>
+                        </goals>
+                    </execution>
+                </executions>
                 <configuration>
-                    <filesets>
-                        <fileset>
-                            <directory>${basedir}/activemq-data</directory>
-                        </fileset>
-                    </filesets>
+                    <forkMode>pertest</forkMode>
+                    <childDelegation>false</childDelegation>
+                    <useFile>true</useFile>
+                    <argLine>-Xmx512M</argLine>
+                    <systemProperties>
+                        <property>
+                            <name>org.apache.activemq.default.directory.prefix</name>
+                            <value>target/activemq/</value>
+                        </property>
+                    </systemProperties>
                 </configuration>
             </plugin>
         </plugins>
     </build>
-
 </project>

Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/BatchMessage.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/BatchMessage.java?rev=1378796&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/BatchMessage.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/BatchMessage.java Thu Aug 30 03:42:12 2012
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link List} of these objects can be used to batch a collection of body and
+ * header pairs in one exchange.
+ * 
+ */
+public class BatchMessage<T> {
+    private T payload;
+    private Map<String, Object> headers;
+
+    /**
+     * @param payload may not be null
+     * @param headers may be null
+     */
+    public BatchMessage(T payload, Map<String, Object> headers) {
+        super();
+        if (payload == null) {
+            throw new IllegalArgumentException("Payload may not be null");
+        } else {
+            this.payload = payload;
+        }
+        this.headers = headers;
+    }
+
+    public T getPayload() {
+        return payload;
+    }
+
+    public Map<String, Object> getHeaders() {
+        return headers;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((headers == null) ? 0 : headers.hashCode());
+        result = prime * result + ((payload == null) ? 0 : payload.hashCode());
+        return result;
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof BatchMessage)) {
+            return false;
+        }
+        BatchMessage other = (BatchMessage)obj;
+        if (headers == null) {
+            if (other.headers != null) {
+                return false;
+            }
+        } else if (!headers.equals(other.headers)) {
+            return false;
+        }
+        if (payload == null) {
+            if (other.payload != null) {
+                return false;
+            }
+        } else if (!payload.equals(other.payload)) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "BatchMessage [payload=" + payload + ", headers=" + headers + "]";
+    }
+}

Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/DefaultJmsKeyFormatStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/DefaultJmsKeyFormatStrategy.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/DefaultJmsKeyFormatStrategy.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/DefaultJmsKeyFormatStrategy.java Thu Aug 30 03:42:12 2012
@@ -19,9 +19,10 @@ package org.apache.camel.component.sjms;
 /**
  * Default strategy that handles dots and hyphens.
  * <p/>
- * This can be used for sending keys containg package names that is common by Java frameworks.
- *
- * @version 
+ * This can be used for sending keys contain package names that is common by
+ * Java frameworks.
+ * 
+ * @version
  */
 public class DefaultJmsKeyFormatStrategy implements KeyFormatStrategy {
 

Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/IllegalHeaderException.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/IllegalHeaderException.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/IllegalHeaderException.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/IllegalHeaderException.java Thu Aug 30 03:42:12 2012
@@ -24,11 +24,13 @@ import org.apache.camel.RuntimeCamelExce
  */
 public class IllegalHeaderException extends RuntimeCamelException {
 
-    private static final long serialVersionUID = 1L;
+   
+    private static final long serialVersionUID = 3136304415267471091L;
 
     /**
      */
     public IllegalHeaderException() {
+        super();
     }
 
     /**

Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/KeyFormatStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/KeyFormatStrategy.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/KeyFormatStrategy.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/KeyFormatStrategy.java Thu Aug 30 03:42:12 2012
@@ -17,23 +17,25 @@
 package org.apache.camel.component.sjms;
 
 /**
- * Strategy for applying encoding and decoding of JMS headers so they apply to the JMS spec.
- *
- * @version 
+ * Strategy for applying encoding and decoding of JMS headers so they apply to
+ * the JMS spec.
+ * 
+ * @version
  */
 public interface KeyFormatStrategy {
 
     /**
      * Encodes the key before its sent as a {@link javax.jms.Message} message.
-     *
-     * @param key  the original key
+     * 
+     * @param key the original key
      * @return the encoded key
      */
     String encodeKey(String key);
 
     /**
-     * Decodes the key after its received from a {@link javax.jms.Message} message.
-     *
+     * Decodes the key after its received from a {@link javax.jms.Message}
+     * message.
+     * 
      * @param key the encoded key
      * @return the decoded key as the original key
      */

Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/MissingHeaderException.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/MissingHeaderException.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/MissingHeaderException.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/MissingHeaderException.java Thu Aug 30 03:42:12 2012
@@ -20,26 +20,25 @@ import org.apache.camel.RuntimeCamelExce
 
 /**
  * TODO Add Class documentation for MissingHeaderException
- *
  */
 public class MissingHeaderException extends RuntimeCamelException {
 
     /**
      * 
      */
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = -6184009502090347023L;
 
     /**
      * TODO Add Constructor Javadoc
-     *
      */
     public MissingHeaderException() {
+        super();
         // TODO Auto-generated constructor stub
     }
 
     /**
      * TODO Add Constructor Javadoc
-     *
+     * 
      * @param message
      * @param cause
      */
@@ -50,7 +49,7 @@ public class MissingHeaderException exte
 
     /**
      * TODO Add Constructor Javadoc
-     *
+     * 
      * @param message
      */
     public MissingHeaderException(String message) {
@@ -60,7 +59,7 @@ public class MissingHeaderException exte
 
     /**
      * TODO Add Constructor Javadoc
-     *
+     * 
      * @param cause
      */
     public MissingHeaderException(Throwable cause) {
@@ -68,5 +67,4 @@ public class MissingHeaderException exte
         // TODO Auto-generated constructor stub
     }
 
-    
 }

Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java Thu Aug 30 03:42:12 2012
@@ -25,6 +25,7 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.component.sjms.jms.ConnectionFactoryResource;
 import org.apache.camel.component.sjms.jms.ConnectionResource;
+import org.apache.camel.component.sjms.tx.DefaultTransactionCommitStrategy;
 import org.apache.camel.impl.DefaultComponent;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.HeaderFilterStrategyAware;
@@ -43,8 +44,9 @@ public class SjmsComponent extends Defau
     private HeaderFilterStrategy headerFilterStrategy = new SjmsHeaderFilterStrategy();
     private KeyFormatStrategy keyFormatStrategy;
     private Integer maxConnections = 1;
+    private TransactionCommitStrategy commitStrategy = new DefaultTransactionCommitStrategy();
 
-    /*
+    /**
      * @see
      * org.apache.camel.impl.DefaultComponent#createEndpoint(java.lang.String,
      * java.lang.String, java.util.Map)
@@ -200,4 +202,25 @@ public class SjmsComponent extends Defau
     public KeyFormatStrategy getKeyFormatStrategy() {
         return keyFormatStrategy;
     }
+
+    /**
+     * Gets the TransactionCommitStrategy value of commitStrategy for this
+     * instance of SjmsComponent.
+     * 
+     * @return the commitStrategy
+     */
+    public TransactionCommitStrategy getCommitStrategy() {
+        return commitStrategy;
+    }
+
+    /**
+     * Sets the TransactionCommitStrategy value of commitStrategy for this
+     * instance of SjmsComponent.
+     * 
+     * @param commitStrategy Sets TransactionCommitStrategy, default is TODO add
+     *            default
+     */
+    public void setCommitStrategy(TransactionCommitStrategy commitStrategy) {
+        this.commitStrategy = commitStrategy;
+    }
 }

Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java Thu Aug 30 03:42:12 2012
@@ -18,17 +18,16 @@ package org.apache.camel.component.sjms;
 
 /**
  * TODO Add Class documentation for SjmsConstants
- *
  */
 public final class SjmsConstants {
     public static final String QUEUE_PREFIX = "queue:";
     public static final String TOPIC_PREFIX = "topic:";
     public static final String TEMP_QUEUE_PREFIX = "temp:queue:";
     public static final String TEMP_TOPIC_PREFIX = "temp:topic:";
-    
+
     public static final String JMS_MESSAGE_TYPE = "JmsMessageType";
     public static final String ORIGINAL_MESSAGE = "SjmsOriginalMessage";
-    
+
     private SjmsConstants() {
         // Helper class
     }

Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java Thu Aug 30 03:42:12 2012
@@ -27,14 +27,13 @@ import org.apache.camel.impl.DefaultCons
 
 /**
  * TODO Add Class documentation for SjmsConsumer
- *
  */
 public class SjmsConsumer extends DefaultConsumer {
 
     public SjmsConsumer(Endpoint endpoint, Processor processor) {
         super(endpoint, processor);
     }
-    
+
     /**
      * @return
      */
@@ -49,16 +48,20 @@ public class SjmsConsumer extends Defaul
     protected SjmsEndpoint getSjmsEndpoint() {
         return (SjmsEndpoint)this.getEndpoint();
     }
-    
+
     protected ConnectionResource getConnectionResource() {
         return getSjmsEndpoint().getConnectionResource();
     }
-    
+
     protected SessionPool getSessionPool() {
         return getSjmsEndpoint().getSessions();
     }
 
-    public boolean isEndpointTransacted() {
+    public int getAcknowledgementMode() {
+        return getSjmsEndpoint().getAcknowledgementMode().intValue();
+    }
+
+    public boolean isTransacted() {
         return getSjmsEndpoint().isTransacted();
     }
 
@@ -73,7 +76,7 @@ public class SjmsConsumer extends Defaul
     public int getConsumerCount() {
         return getSjmsEndpoint().getConsumerCount();
     }
-    
+
     public boolean isTopic() {
         return getSjmsEndpoint().isTopic();
     }
@@ -85,4 +88,12 @@ public class SjmsConsumer extends Defaul
     public String getDurableSubscriptionId() {
         return getSjmsEndpoint().getDurableSubscriptionId();
     }
+
+    public TransactionCommitStrategy getCommitStrategy() {
+        return getSjmsEndpoint().getCommitStrategy();
+    }
+
+    public int getTransactionBatchCount() {
+        return getSjmsEndpoint().getTransactionBatchCount();
+    }
 }

Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java Thu Aug 30 03:42:12 2012
@@ -31,17 +31,14 @@ import org.apache.camel.component.sjms.p
 import org.apache.camel.component.sjms.producer.InOutProducer;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.spi.HeaderFilterStrategy;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * TODO Add Class documentation for SjmsEndpoint
- *
  */
 public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSupport {
-    protected final transient Logger logger = LoggerFactory
-            .getLogger(getClass());
+    protected final transient Logger logger = LoggerFactory.getLogger(getClass());
 
     private SessionPool sessions;
     private boolean synchronous = true;
@@ -57,9 +54,11 @@ public class SjmsEndpoint extends Defaul
     private String durableSubscriptionId;
     private long responseTimeOut = 5000;
     private String messageSelector;
-    
+    private int transactionBatchCount = -1;
+    private TransactionCommitStrategy commitStrategy;
 
     public SjmsEndpoint() {
+        super();
     }
 
     public SjmsEndpoint(String uri, Component component) {
@@ -68,31 +67,31 @@ public class SjmsEndpoint extends Defaul
             setTopic(false);
         } else if (getEndpointUri().indexOf("://topic:") > -1) {
             setTopic(true);
-        }  else {
+        } else {
             throw new RuntimeCamelException("Endpoint URI unsupported: " + uri);
         }
     }
-    
+
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        
+
         //
-        // TODO since we only need a session pool for one use case, find a better way
+        // TODO since we only need a session pool for one use case, find a
+        // better way
         //
         // We only create a session pool when we are not transacted.
         // Transacted listeners or producers need to be paired with the
         // Session that created them.
-        if (!isTransacted()) {
+        if (!isTransacted() && getExchangePattern().equals(ExchangePattern.InOnly)) {
             sessions = new SessionPool(getSessionCount(), getConnectionResource());
-            
+
             // TODO fix the string hack
-            sessions.setAcknowledgeMode(SessionAcknowledgementType
-                    .valueOf(getAcknowledgementMode() + ""));
+            sessions.setAcknowledgeMode(SessionAcknowledgementType.valueOf(getAcknowledgementMode() + ""));
             getSessions().fillPool();
         }
     }
-    
+
     @Override
     protected void doStop() throws Exception {
         if (getSessions() != null) {
@@ -116,12 +115,12 @@ public class SjmsEndpoint extends Defaul
     public Consumer createConsumer(Processor processor) throws Exception {
         return new DefaultConsumer(this, processor);
     }
-    
+
     @Override
     public boolean isMultipleConsumersSupported() {
         return true;
     }
-    
+
     @Override
     public boolean isSingleton() {
         return true;
@@ -136,7 +135,7 @@ public class SjmsEndpoint extends Defaul
     }
 
     public SjmsComponent getSjmsComponent() {
-        return (SjmsComponent) this.getComponent();
+        return (SjmsComponent)this.getComponent();
     }
 
     public ConnectionResource getConnectionResource() {
@@ -262,4 +261,20 @@ public class SjmsEndpoint extends Defaul
     public String getMessageSelector() {
         return messageSelector;
     }
+
+    public TransactionCommitStrategy getCommitStrategy() {
+        return commitStrategy;
+    }
+
+    public void setCommitStrategy(TransactionCommitStrategy commitStrategy) {
+        this.commitStrategy = commitStrategy;
+    }
+
+    public int getTransactionBatchCount() {
+        return transactionBatchCount;
+    }
+
+    public void setTransactionBatchCount(int transactionBatchCount) {
+        this.transactionBatchCount = transactionBatchCount;
+    }
 }

Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsHeaderFilterStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsHeaderFilterStrategy.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsHeaderFilterStrategy.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsHeaderFilterStrategy.java Thu Aug 30 03:42:12 2012
@@ -19,7 +19,7 @@ package org.apache.camel.component.sjms;
 import org.apache.camel.impl.DefaultHeaderFilterStrategy;
 
 /**
- * @version 
+ * @version
  */
 public class SjmsHeaderFilterStrategy extends DefaultHeaderFilterStrategy {
 
@@ -28,8 +28,10 @@ public class SjmsHeaderFilterStrategy ex
     }
 
     protected void initialize() {
-        // ignore provider specified JMS extension headers see page 39 of JMS 1.1 specification
-        // added "JMSXRecvTimestamp" as a workaround for an Oracle bug/typo in AqjmsMessage
+        // ignore provider specified JMS extension headers see page 39 of JMS
+        // 1.1 specification
+        // added "JMSXRecvTimestamp" as a workaround for an Oracle bug/typo in
+        // AqjmsMessage
         getOutFilter().add("JMSXUserID");
         getOutFilter().add("JMSXAppID");
         getOutFilter().add("JMSXDeliveryCount");

Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessageConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessageConsumer.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessageConsumer.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessageConsumer.java Thu Aug 30 03:42:12 2012
@@ -26,11 +26,13 @@ import org.apache.camel.component.sjms.j
 
 /**
  * TODO Add Class documentation for SjmsMessageConsumer
- *
  */
 public interface SjmsMessageConsumer extends MessageListener {
     void handleMessage(Message message);
+
     SjmsMessageConsumer createMessageConsumer(ConnectionResource connectionResource, String destinationName) throws Exception;
+
     SjmsMessageConsumer createMessageConsumerListener(SessionPool sessionPool, String destinationName, Exchanger<Object> exchanger) throws Exception;
+
     void destroyMessageConsumer() throws Exception;
 }

Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java Thu Aug 30 03:42:12 2012
@@ -16,6 +16,10 @@
  */
 package org.apache.camel.component.sjms;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.concurrent.ExecutorService;
 
 import javax.jms.MessageProducer;
@@ -31,14 +35,12 @@ import org.apache.camel.util.ObjectHelpe
 
 /**
  * Base SjmsProducer class.
- *
  */
-public abstract class SjmsProducer extends DefaultAsyncProducer  {
+public abstract class SjmsProducer extends DefaultAsyncProducer {
 
-    
     /**
-     * The {@link MessageProducerResources} pool for all {@link SjmsProducer} classes.
-     *
+     * The {@link MessageProducerResources} pool for all {@link SjmsProducer}
+     * classes.
      */
     protected class MessageProducerPool extends ObjectPool<MessageProducerResources> {
 
@@ -50,7 +52,7 @@ public abstract class SjmsProducer exten
         protected MessageProducerResources createObject() throws Exception {
             return doCreateProducerModel();
         }
-        
+
         @Override
         protected void destroyObject(MessageProducerResources model) throws Exception {
             if (model.getMessageProducer() != null) {
@@ -73,23 +75,25 @@ public abstract class SjmsProducer exten
             }
         }
     }
-    
+
     /**
-     * The {@link MessageProducer} resources for all {@link SjmsProducer} classes. 
+     * The {@link MessageProducer} resources for all {@link SjmsProducer}
+     * classes.
      */
     protected class MessageProducerResources {
         private final Session session;
         private final MessageProducer messageProducer;
+        private final TransactionCommitStrategy commitStrategy;
 
-        /**
-         * TODO Add Constructor Javadoc
-         * 
-         * @param session
-         * @param messageProducer
-         */
         public MessageProducerResources(Session session, MessageProducer messageProducer) {
+            this(session, messageProducer, null);
+        }
+
+        public MessageProducerResources(Session session, MessageProducer messageProducer, TransactionCommitStrategy commitStrategy) {
+            super();
             this.session = session;
             this.messageProducer = messageProducer;
+            this.commitStrategy = commitStrategy;
         }
 
         /**
@@ -111,8 +115,18 @@ public abstract class SjmsProducer exten
         public MessageProducer getMessageProducer() {
             return messageProducer;
         }
+
+        /**
+         * Gets the TransactionCommitStrategy value of commitStrategy for this
+         * instance of SjmsProducer.MessageProducerResources.
+         * 
+         * @return the commitStrategy
+         */
+        public TransactionCommitStrategy getCommitStrategy() {
+            return commitStrategy;
+        }
     }
-    
+
     private MessageProducerPool producers;
     private final ExecutorService executor;
 
@@ -135,24 +149,29 @@ public abstract class SjmsProducer exten
         super.doStop();
         if (getProducers() != null) {
             getProducers().drainPool();
-            setProducers(null);   
+            setProducers(null);
         }
     }
-    
+
     public abstract MessageProducerResources doCreateProducerModel() throws Exception;
-    
+
     public abstract void sendMessage(Exchange exchange, final AsyncCallback callback) throws Exception;
-    
+
     @Override
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
-        if (log.isDebugEnabled()) {
-            log.debug("Processing Exchange.id:{}", exchange.getExchangeId());
+        log.debug("Processing Exchange.id:{}", exchange.getExchangeId());
+        
+        Object body = exchange.getIn().getBody();
+        if (body != null) {
+            if (body instanceof InputStream) {
+                byte[] bytes = exchange.getContext().getTypeConverter().convertTo(byte[].class, body);
+                exchange.getIn().setBody(bytes);
+            }
         }
+        
         try {
             if (!isSynchronous()) {
-                if (log.isDebugEnabled()) {
-                    log.debug("  Sending message asynchronously: {}", exchange.getIn().getBody());
-                }
+                log.debug("  Sending message asynchronously: {}", exchange.getIn().getBody());
                 getExecutor().execute(new Runnable() {
                     @Override
                     public void run() {
@@ -164,37 +183,70 @@ public abstract class SjmsProducer exten
                     }
                 });
             } else {
-                if (log.isDebugEnabled()) {
-                    log.debug("  Sending message synchronously: {}", exchange.getIn().getBody());
-                }
+                log.debug("  Sending message synchronously: {}", exchange.getIn().getBody());
                 sendMessage(exchange, callback);
             }
         } catch (Exception e) {
-            if (log.isDebugEnabled()) {
-                log.debug("Processing Exchange.id:{}", exchange.getExchangeId() + " - FAILED");
-            }
-            if (log.isTraceEnabled()) {
-                log.trace("Exception: " + e.getLocalizedMessage(), e);
-            }
+            log.debug("Processing Exchange.id:{}", exchange.getExchangeId() + " - FAILED");
+            log.trace("Exception: " + e.getLocalizedMessage(), e);
             exchange.setException(e);
         }
-        if (log.isDebugEnabled()) {
-            log.debug("Processing Exchange.id:{}", exchange.getExchangeId() + " - SUCCESS");
-        }
+        log.debug("Processing Exchange.id:{}", exchange.getExchangeId() + " - SUCCESS");
+        
         return isSynchronous();
     }
+    
+    public static byte[] getBytes(InputStream is) throws IOException {
+        int len;
+        int size = 1024;
+        byte[] buf;
+
+        if (is instanceof ByteArrayInputStream) {
+            size = is.available();
+            buf = new byte[size];
+            len = is.read(buf, 0, size);
+        } else {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            buf = new byte[size];
+            while ((len = is.read(buf, 0, size)) != -1) {
+                bos.write(buf, 0, len);
+            }
+            buf = bos.toByteArray();
+        }
+        return buf;
+    }
+    
+//    public static byte[] getBytesFromStream(InputStream is) throws IOException {
+//        BufferedInputStream bis = new BufferedInputStream(is);
+//        bis.available();
+//
+//        // Create the byte array to hold the data
+//        byte[] bytes = new byte[(int)bis.available()];
+//
+//        // Read in the bytes
+//        int offset = 0;
+//        int numRead = 0;
+//        while (offset < bytes.length && (numRead = is.read(bytes, offset, bytes.length - offset)) >= 0) {
+//            offset += numRead;
+//        }
+//
+//        // Close the input stream and return bytes
+//        is.close();
+//        return bytes;
+//    }
+    
 
     protected SjmsEndpoint getSjmsEndpoint() {
         return (SjmsEndpoint)this.getEndpoint();
     }
-    
+
     protected ConnectionResource getConnectionResource() {
         return getSjmsEndpoint().getConnectionResource();
     }
-    
+
     /**
      * Gets the acknowledgment mode for this instance of DestinationProducer.
-     *
+     * 
      * @return int
      */
     public int getAcknowledgeMode() {
@@ -203,8 +255,8 @@ public abstract class SjmsProducer exten
 
     /**
      * Gets the synchronous value for this instance of DestinationProducer.
-     *
-     * @return true if synchronous, otherwise false 
+     * 
+     * @return true if synchronous, otherwise false
      */
     public boolean isSynchronous() {
         return getSjmsEndpoint().isSynchronous();
@@ -212,7 +264,7 @@ public abstract class SjmsProducer exten
 
     /**
      * Gets the replyTo for this instance of DestinationProducer.
-     *
+     * 
      * @return String
      */
     public String getReplyTo() {
@@ -221,7 +273,7 @@ public abstract class SjmsProducer exten
 
     /**
      * Gets the destinationName for this instance of DestinationProducer.
-     *
+     * 
      * @return String
      */
     public String getDestinationName() {
@@ -230,7 +282,7 @@ public abstract class SjmsProducer exten
 
     /**
      * Sets the producer pool for this instance of SjmsProducer.
-     *
+     * 
      * @param producers A MessageProducerPool
      */
     public void setProducers(MessageProducerPool producers) {
@@ -238,8 +290,9 @@ public abstract class SjmsProducer exten
     }
 
     /**
-     * Gets the MessageProducerPool value of producers for this instance of SjmsProducer.
-     *
+     * Gets the MessageProducerPool value of producers for this instance of
+     * SjmsProducer.
+     * 
      * @return the producers
      */
     public MessageProducerPool getProducers() {
@@ -275,7 +328,7 @@ public abstract class SjmsProducer exten
 
     /**
      * Gets the producerCount for this instance of SjmsProducer.
-     *
+     * 
      * @return int
      */
     public int getProducerCount() {
@@ -284,7 +337,7 @@ public abstract class SjmsProducer exten
 
     /**
      * Gets consumerCount for this instance of SjmsProducer.
-     *
+     * 
      * @return int
      */
     public int getConsumerCount() {
@@ -293,7 +346,7 @@ public abstract class SjmsProducer exten
 
     /**
      * Gets the executor for this instance of SjmsProducer.
-     *
+     * 
      * @return ExecutorService
      */
     public ExecutorService getExecutor() {
@@ -302,30 +355,38 @@ public abstract class SjmsProducer exten
 
     /**
      * Gets the ttl for this instance of SjmsProducer.
-     *
+     * 
      * @return long
      */
     public long getTtl() {
-        return  getSjmsEndpoint().getTtl();
+        return getSjmsEndpoint().getTtl();
     }
 
     /**
      * Gets the boolean value of persistent for this instance of SjmsProducer.
-     *
+     * 
      * @return true if persistent, otherwise false
      */
     public boolean isPersistent() {
-        return  getSjmsEndpoint().isPersistent();
+        return getSjmsEndpoint().isPersistent();
     }
 
-
     /**
      * Gets responseTimeOut for this instance of SjmsProducer.
-     *
+     * 
      * @return long
      */
     public long getResponseTimeOut() {
         return getSjmsEndpoint().getResponseTimeOut();
     }
 
+    /**
+     * Gets commitStrategy for this instance of SjmsProducer.
+     * 
+     * @return TransactionCommitStrategy
+     */
+    public TransactionCommitStrategy getCommitStrategy() {
+        return getSjmsEndpoint().getCommitStrategy();
+    }
+
 }

Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/TransactionCommitStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/TransactionCommitStrategy.java?rev=1378796&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/TransactionCommitStrategy.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/TransactionCommitStrategy.java Thu Aug 30 03:42:12 2012
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms;
+
+import org.apache.camel.Exchange;
+
+/**
+ * Provides a entry point into the transaction
+ * {@link org.apache.camel.spi.Synchronization} workflow that will allow a user
+ * to control when the {@link javax.jms.Session} commit operation is executed.
+ * 
+ */
+public interface TransactionCommitStrategy {
+
+    /**
+     * Should returns true to allow the commit to proceed. If false, the commit
+     * will be skipped. The default should always be true to avoid messages
+     * remaining uncommitted.
+     * 
+     * @param exchange {@link org.apache.camel.Exchange}
+     * @return true if the {@link javax.jms.Session} should be committed,
+     *         otherwise false
+     * @throws Exception
+     */
+    boolean commit(Exchange exchange) throws Exception;
+
+    /**
+     * Should returns true to allow the commit to proceed. If false, the commit
+     * will be skipped. The default should always be true to avoid messages
+     * remaining uncommitted.
+     * 
+     * @param exchange {@link org.apache.camel.Exchange}
+     * @return true if the {@link javax.jms.Session} should be committed,
+     *         otherwise false
+     * @throws Exception
+     */
+    boolean rollback(Exchange exchange) throws Exception;
+}

Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultConsumer.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultConsumer.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultConsumer.java Thu Aug 30 03:42:12 2012
@@ -27,21 +27,22 @@ import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.component.sjms.SjmsConsumer;
 import org.apache.camel.component.sjms.SjmsEndpoint;
+import org.apache.camel.component.sjms.TransactionCommitStrategy;
 import org.apache.camel.component.sjms.jms.JmsObjectFactory;
 import org.apache.camel.component.sjms.jms.ObjectPool;
+import org.apache.camel.component.sjms.tx.BatchTransactionCommitStrategy;
+import org.apache.camel.component.sjms.tx.DefaultTransactionCommitStrategy;
 import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization;
 
 /**
  * A non-transacted queue consumer for a given JMS Destination
- * 
  */
 public class DefaultConsumer extends SjmsConsumer {
-    
+
     protected MessageConsumerPool consumers;
     private final ExecutorService executor;
 
-    protected class MessageConsumerPool extends
-            ObjectPool<MessageConsumerResources> {
+    protected class MessageConsumerPool extends ObjectPool<MessageConsumerResources> {
 
         public MessageConsumerPool() {
             super(getConsumerCount());
@@ -50,9 +51,7 @@ public class DefaultConsumer extends Sjm
         @Override
         protected MessageConsumerResources createObject() throws Exception {
             MessageConsumerResources model = null;
-            if (isEndpointTransacted()
-                    || getSjmsEndpoint().getExchangePattern().equals(
-                            ExchangePattern.InOut)) {
+            if (isTransacted() || getSjmsEndpoint().getExchangePattern().equals(ExchangePattern.InOut)) {
                 model = createConsumerWithDedicatedSession();
             } else {
                 model = createConsumerListener();
@@ -61,8 +60,7 @@ public class DefaultConsumer extends Sjm
         }
 
         @Override
-        protected void destroyObject(MessageConsumerResources model)
-            throws Exception {
+        protected void destroyObject(MessageConsumerResources model) throws Exception {
             if (model != null) {
                 if (model.getMessageConsumer() != null) {
                     if (model.getMessageConsumer().getMessageListener() != null) {
@@ -90,24 +88,20 @@ public class DefaultConsumer extends Sjm
         private final MessageConsumer messageConsumer;
 
         /**
-         * TODO Add Constructor Javadoc
-         * 
-         * @param session
          * @param messageProducer
          */
         public MessageConsumerResources(MessageConsumer messageConsumer) {
+            super();
             this.session = null;
             this.messageConsumer = messageConsumer;
         }
 
         /**
-         * TODO Add Constructor Javadoc
-         * 
          * @param session
          * @param messageProducer
          */
-        public MessageConsumerResources(Session session,
-                MessageConsumer messageConsumer) {
+        public MessageConsumerResources(Session session, MessageConsumer messageConsumer) {
+            super();
             this.session = session;
             this.messageConsumer = messageConsumer;
         }
@@ -135,8 +129,7 @@ public class DefaultConsumer extends Sjm
 
     public DefaultConsumer(SjmsEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
-        this.executor = endpoint.getCamelContext().getExecutorServiceManager()
-                .newDefaultThreadPool(this, "SjmsConsumer");
+        this.executor = endpoint.getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "SjmsConsumer");
     }
 
     @Override
@@ -170,7 +163,7 @@ public class DefaultConsumer extends Sjm
     private MessageConsumerResources createConsumerWithDedicatedSession() throws Exception {
         Connection conn = getConnectionResource().borrowConnection();
         Session session = null;
-        if (isEndpointTransacted()) {
+        if (isTransacted()) {
             session = conn.createSession(true, Session.SESSION_TRANSACTED);
         } else {
             session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -205,26 +198,30 @@ public class DefaultConsumer extends Sjm
     /**
      * Helper factory method used to create a MessageListener based on the MEP
      * 
-     * @param session
-     *            a session is only required if we are a transacted consumer
+     * @param session a session is only required if we are a transacted consumer
      * @return
      */
     protected MessageListener createMessageHandler(Session session) {
+        
+        TransactionCommitStrategy commitStrategy = null;
+        if (this.getCommitStrategy() != null) {
+            commitStrategy = this.getCommitStrategy();
+        } else if (this.getTransactionBatchCount() > 0) {
+            commitStrategy = new BatchTransactionCommitStrategy(this.getTransactionBatchCount());
+        } else {
+            commitStrategy = new DefaultTransactionCommitStrategy();
+        }
+        
         DefaultMessageHandler messageHandler = null;
-        if (getSjmsEndpoint().getExchangePattern().equals(
-                ExchangePattern.InOnly)) {
-            if (isEndpointTransacted()) {
-                messageHandler = new InOnlyMessageHandler(getEndpoint(),
-                        executor,
-                        new SessionTransactionSynchronization(session));
+        if (getSjmsEndpoint().getExchangePattern().equals(ExchangePattern.InOnly)) {
+            if (isTransacted()) {
+                messageHandler = new InOnlyMessageHandler(getEndpoint(), executor, new SessionTransactionSynchronization(session, commitStrategy));
             } else {
                 messageHandler = new InOnlyMessageHandler(getEndpoint(), executor);
             }
         } else {
-            if (isEndpointTransacted()) {
-                messageHandler = new InOutMessageHandler(getEndpoint(),
-                        executor,
-                        new SessionTransactionSynchronization(session));
+            if (isTransacted()) {
+                messageHandler = new InOutMessageHandler(getEndpoint(), executor, new SessionTransactionSynchronization(session, commitStrategy));
             } else {
                 messageHandler = new InOutMessageHandler(getEndpoint(), executor);
             }
@@ -232,7 +229,7 @@ public class DefaultConsumer extends Sjm
         messageHandler.setSession(session);
         messageHandler.setProcessor(getAsyncProcessor());
         messageHandler.setSynchronous(isSynchronous());
-        messageHandler.setTransacted(isEndpointTransacted());
+        messageHandler.setTransacted(isTransacted());
         messageHandler.setTopic(isTopic());
         return messageHandler;
     }

Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultMessageHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultMessageHandler.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultMessageHandler.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/DefaultMessageHandler.java Thu Aug 30 03:42:12 2012
@@ -18,7 +18,6 @@ package org.apache.camel.component.sjms.
 
 import java.util.concurrent.ExecutorService;
 
-import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageListener;
 import javax.jms.Session;
@@ -27,12 +26,11 @@ import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.component.sjms.jms.JmsMessageHelper;
-import org.apache.camel.component.sjms.jms.SessionAcknowledgementType;
+import org.apache.camel.component.sjms.TransactionCommitStrategy;
+import org.apache.camel.component.sjms.jms.JmsMessageExchangeHelper;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.util.ObjectHelper;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,34 +38,41 @@ import static org.apache.camel.util.Obje
 
 /**
  * TODO Add Class documentation for DefaultMessageHandler
- * 
  */
 public abstract class DefaultMessageHandler implements MessageListener {
 
     protected final Logger log = LoggerFactory.getLogger(getClass());
-    
+
     private final ExecutorService executor;
-    
+
     private Endpoint endpoint;
     private AsyncProcessor processor;
     private Session session;
     private boolean transacted;
-    private SessionAcknowledgementType acknowledgementType = SessionAcknowledgementType.AUTO_ACKNOWLEDGE;
     private boolean synchronous = true;
-    private Destination namedReplyTo;
     private Synchronization synchronization;
     private boolean topic;
+    private TransactionCommitStrategy commitStrategy;
 
     public DefaultMessageHandler(Endpoint endpoint, ExecutorService executor) {
-        this(endpoint, executor, null);
+        this.endpoint = endpoint;
+        this.executor = executor;
     }
 
     public DefaultMessageHandler(Endpoint endpoint, ExecutorService executor, Synchronization synchronization) {
+        super();
         this.synchronization = synchronization;
         this.endpoint = endpoint;
         this.executor = executor;
     }
 
+    public DefaultMessageHandler(Endpoint endpoint, ExecutorService executor, TransactionCommitStrategy commitStrategy) {
+        super();
+        this.endpoint = endpoint;
+        this.executor = executor;
+        this.commitStrategy = commitStrategy;
+    }
+
     @Override
     public void onMessage(Message message) {
         handleMessage(message);
@@ -79,24 +84,19 @@ public abstract class DefaultMessageHand
     private void handleMessage(Message message) {
         RuntimeCamelException rce = null;
         try {
-            final DefaultExchange exchange = (DefaultExchange) JmsMessageHelper
-                    .createExchange(message, getEndpoint());
-            if (log.isDebugEnabled()) {
-                log.debug("Processing Exchange.id:{}", exchange.getExchangeId());
-            }
+            final DefaultExchange exchange = (DefaultExchange)JmsMessageExchangeHelper.createExchange(message, getEndpoint());
+            
+            log.debug("Processing Exchange.id:{}", exchange.getExchangeId());
+            
             if (isTransacted() && synchronization != null) {
                 exchange.addOnCompletion(synchronization);
             }
             try {
                 if (isTransacted() || isSynchronous()) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("  Handling synchronous message: {}", exchange.getIn().getBody());
-                    }
+                    log.debug("  Handling synchronous message: {}", exchange.getIn().getBody());
                     doHandleMessage(exchange);
                 } else {
-                    if (log.isDebugEnabled()) {
-                        log.debug("  Handling asynchronous message: {}", exchange.getIn().getBody());
-                    }
+                    log.debug("  Handling asynchronous message: {}", exchange.getIn().getBody());
                     executor.execute(new Runnable() {
                         @Override
                         public void run() {
@@ -128,7 +128,7 @@ public abstract class DefaultMessageHand
     }
 
     public abstract void doHandleMessage(final Exchange exchange);
-    
+
     public abstract void close();
 
     public void setTransacted(boolean transacted) {
@@ -151,15 +151,6 @@ public abstract class DefaultMessageHand
         this.processor = processor;
     }
 
-    public SessionAcknowledgementType getAcknowledgementType() {
-        return acknowledgementType;
-    }
-
-    public void setAcknowledgementType(
-            SessionAcknowledgementType acknowledgementType) {
-        this.acknowledgementType = acknowledgementType;
-    }
-
     public void setSession(Session session) {
         this.session = session;
     }
@@ -176,14 +167,6 @@ public abstract class DefaultMessageHand
         return synchronous;
     }
 
-    public void setNamedReplyTo(Destination namedReplyToDestination) {
-        this.namedReplyTo = namedReplyToDestination;
-    }
-
-    public Destination getNamedReplyTo() {
-        return namedReplyTo;
-    }
-
     public void setTopic(boolean topic) {
         this.topic = topic;
     }
@@ -191,4 +174,8 @@ public abstract class DefaultMessageHand
     public boolean isTopic() {
         return topic;
     }
+
+    public TransactionCommitStrategy getCommitStrategy() {
+        return commitStrategy;
+    }
 }

Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java Thu Aug 30 03:42:12 2012
@@ -21,27 +21,27 @@ import java.util.concurrent.ExecutorServ
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.component.sjms.TransactionCommitStrategy;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.util.AsyncProcessorHelper;
 
 /**
- * TODO Add Class documentation for DefaultMessageHandler
+ * An InOnly {@link DefaultMessageHandler}
  * 
  */
 public class InOnlyMessageHandler extends DefaultMessageHandler {
 
     /**
-     * 
      * @param endpoint
-     * @param processor
+     * @param executor
      */
     public InOnlyMessageHandler(Endpoint endpoint, ExecutorService executor) {
-        this(endpoint, executor, null);
+        super(endpoint, executor);
     }
 
     /**
-     * 
-     * @param stopped
+     * @param endpoint
+     * @param executor
      * @param synchronization
      */
     public InOnlyMessageHandler(Endpoint endpoint, ExecutorService executor, Synchronization synchronization) {
@@ -49,6 +49,16 @@ public class InOnlyMessageHandler extend
     }
 
     /**
+     * @param endpoint
+     * @param executor
+     * @param commitStrategy
+     * @param rollbackStrategy
+     */
+    public InOnlyMessageHandler(Endpoint endpoint, ExecutorService executor, TransactionCommitStrategy commitStrategy) {
+        super(endpoint, executor, commitStrategy);
+    }
+
+    /**
      * @param message
      */
     @Override
@@ -61,6 +71,7 @@ public class InOnlyMessageHandler extend
         } else {
             NoOpAsyncCallback callback = new NoOpAsyncCallback();
             if (isTransacted() || isSynchronous()) {
+                //SessionTransactionAsyncCallback callback = new SessionTransactionAsyncCallback(exchange, getSession(), getCommitStrategy());
                 // must process synchronous if transacted or configured to
                 // do so
                 if (log.isDebugEnabled()) {
@@ -75,11 +86,10 @@ public class InOnlyMessageHandler extend
                 }
             } else {
                 // process asynchronous using the async routing engine
-                if (log.isDebugEnabled()) {
-                    log.debug("Aynchronous processing: Message[{}], Destination[{}] ", exchange.getIn().getBody(), this.getEndpoint().getEndpointUri());
-                }
-                boolean sync = AsyncProcessorHelper.process(getProcessor(),
-                        exchange, callback);
+                log.debug("Aynchronous processing: Message[{}], Destination[{}] ", exchange.getIn().getBody(), this.getEndpoint().getEndpointUri());
+                boolean sync = false;
+
+                sync = AsyncProcessorHelper.process(getProcessor(), exchange, callback);
                 if (!sync) {
                     // will be done async so return now
                     return;
@@ -87,7 +97,7 @@ public class InOnlyMessageHandler extend
             }
         }
     }
-    
+
     @Override
     public void close() {
         // no-op

Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java Thu Aug 30 03:42:12 2012
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.sjms.consumer;
 
+import java.io.InputStream;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.ExecutorService;
@@ -32,42 +33,58 @@ import javax.jms.Topic;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.component.sjms.jms.JmsMessageHelper;
+import org.apache.camel.component.sjms.TransactionCommitStrategy;
+import org.apache.camel.component.sjms.jms.JmsMessageExchangeHelper;
 import org.apache.camel.component.sjms.jms.JmsObjectFactory;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ObjectHelper;
 
 /**
- * TODO Add Class documentation for DefaultMessageHandler
- * TODO Create a producer cache manager to store and purge unused cashed producers or we will have a memory leak
- * 
+ * TODO Add Class documentation for DefaultMessageHandler 
+ * TODO Create a producer
+ * cache manager to store and purge unused cashed producers or we will have a
+ * memory leak
  */
 public class InOutMessageHandler extends DefaultMessageHandler {
 
     private Map<String, MessageProducer> producerCache = new TreeMap<String, MessageProducer>();
     private ReadWriteLock lock = new ReentrantReadWriteLock();
 
+    
     /**
      * TODO Add Constructor Javadoc
-     * 
+     *
      * @param endpoint
-     * @param processor
+     * @param executor
      */
     public InOutMessageHandler(Endpoint endpoint, ExecutorService executor) {
-        this(endpoint, executor, null);
+        super(endpoint, executor);
     }
-
+    
     /**
      * TODO Add Constructor Javadoc
-     * 
-     * @param stopped
+     *
+     * @param endpoint
+     * @param executor
      * @param synchronization
      */
     public InOutMessageHandler(Endpoint endpoint, ExecutorService executor, Synchronization synchronization) {
         super(endpoint, executor, synchronization);
     }
-    
+
+    /**
+     * TODO Add Constructor Javadoc
+     *
+     * @param endpoint
+     * @param executor
+     * @param commitStrategy
+     * @param rollbackStrategy
+     */
+    public InOutMessageHandler(Endpoint endpoint, ExecutorService executor, TransactionCommitStrategy commitStrategy) {
+        super(endpoint, executor, commitStrategy);
+    }
+
     /**
      * @param message
      */
@@ -83,8 +100,7 @@ public class InOutMessageHandler extends
                 } else if (obj instanceof String) {
                     replyTo = JmsObjectFactory.createDestination(getSession(), (String)obj, isTopic());
                 } else {
-                    throw new Exception("The value of JMSReplyTo must be a valid Destination or String.  Value provided: "
-                                            + obj);
+                    throw new Exception("The value of JMSReplyTo must be a valid Destination or String.  Value provided: " + obj);
                 }
 
                 String destinationName = getDestinationName(replyTo);
@@ -106,7 +122,7 @@ public class InOutMessageHandler extends
                     }
                 }
             }
-            
+
             MessageHanderAsyncCallback callback = new MessageHanderAsyncCallback(exchange, messageProducer);
             if (exchange.isFailed()) {
                 return;
@@ -114,9 +130,7 @@ public class InOutMessageHandler extends
                 if (isTransacted() || isSynchronous()) {
                     // must process synchronous if transacted or configured to
                     // do so
-                    if (log.isDebugEnabled()) {
-                        log.debug("Synchronous processing: Message[{}], Destination[{}] ", exchange.getIn().getBody(), this.getEndpoint().getEndpointUri());
-                    }
+                    log.debug("Synchronous processing: Message[{}], Destination[{}] ", exchange.getIn().getBody(), this.getEndpoint().getEndpointUri());
                     try {
                         AsyncProcessorHelper.process(getProcessor(), exchange);
                     } catch (Exception e) {
@@ -126,9 +140,7 @@ public class InOutMessageHandler extends
                     }
                 } else {
                     // process asynchronous using the async routing engine
-                    if (log.isDebugEnabled()) {
-                        log.debug("Aynchronous processing: Message[{}], Destination[{}] ", exchange.getIn().getBody(), this.getEndpoint().getEndpointUri());
-                    }
+                    log.debug("Aynchronous processing: Message[{}], Destination[{}] ", exchange.getIn().getBody(), this.getEndpoint().getEndpointUri());
                     boolean sync = AsyncProcessorHelper.process(getProcessor(), exchange, callback);
                     if (!sync) {
                         // will be done async so return now
@@ -141,11 +153,10 @@ public class InOutMessageHandler extends
         }
 
         if (log.isDebugEnabled()) {
-            log.debug("SjmsMessageConsumer invoked for Exchange id:{} ",
-                    exchange.getExchangeId());
+            log.debug("SjmsMessageConsumer invoked for Exchange id:{} ", exchange.getExchangeId());
         }
     }
-    
+
     @Override
     public void close() {
         for (String key : producerCache.keySet()) {
@@ -158,11 +169,11 @@ public class InOutMessageHandler extends
         }
         producerCache.clear();
     }
-    
+
     private boolean isDestination(Object object) {
         return object instanceof Destination;
     }
-    
+
     private String getDestinationName(Destination destination) throws Exception {
         String answer = null;
         if (destination instanceof Queue) {
@@ -180,6 +191,7 @@ public class InOutMessageHandler extends
         private MessageProducer localProducer;
 
         public MessageHanderAsyncCallback(Exchange exchange, MessageProducer localProducer) {
+            super();
             this.exchange = exchange;
             this.localProducer = localProducer;
         }
@@ -188,10 +200,15 @@ public class InOutMessageHandler extends
         public void done(boolean sync) {
 
             try {
-                Message response = JmsMessageHelper.createMessage(exchange,
-                        getSession(), true);
-                response.setJMSCorrelationID(exchange.getIn().getHeader(
-                        "JMSCorrelationID", String.class)); 
+                Object body = exchange.getOut().getBody();
+                if (body != null) {
+                    if (body instanceof InputStream) {
+                        byte[] bytes = exchange.getContext().getTypeConverter().convertTo(byte[].class, body);
+                        exchange.getOut().setBody(bytes);
+                    }
+                }
+                Message response = JmsMessageExchangeHelper.createMessage(exchange, getSession(), true);
+                response.setJMSCorrelationID(exchange.getIn().getHeader("JMSCorrelationID", String.class));
                 localProducer.send(response);
             } catch (Exception e) {
                 exchange.setException(e);

Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/ConnectionFactoryResource.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/ConnectionFactoryResource.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/ConnectionFactoryResource.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/ConnectionFactoryResource.java Thu Aug 30 03:42:12 2012
@@ -34,6 +34,7 @@ public class ConnectionFactoryResource e
      * Default Constructor
      */
     public ConnectionFactoryResource() {
+        super();
     }
 
     /**

Copied: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageExchangeHelper.java (from r1378786, camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageExchangeHelper.java?p2=camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageExchangeHelper.java&p1=camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java&r1=1378786&r2=1378796&rev=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageExchangeHelper.java Thu Aug 30 03:42:12 2012
@@ -57,11 +57,11 @@ import static org.apache.camel.util.Obje
  *
  * @version 
  */
-public final class JmsMessageHelper {
+public final class JmsMessageExchangeHelper {
     
-    private static final Logger LOGGER = LoggerFactory.getLogger(JmsMessageHelper.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(JmsMessageExchangeHelper.class);
 
-    private JmsMessageHelper() {
+    private JmsMessageExchangeHelper() {
     }
     
     public static Exchange createExchange(Message message, Endpoint endpoint) {
@@ -72,7 +72,7 @@ public final class JmsMessageHelper {
     @SuppressWarnings("unchecked")
     public static Exchange populateExchange(Message message, Exchange exchange, boolean out) {
         try {
-            JmsMessageHelper.setJmsMessageHeaders(message, exchange, out);
+            JmsMessageExchangeHelper.setJmsMessageHeaders(message, exchange, out);
             if (message != null) {
                 // convert to JMS Message of the given type
 
@@ -82,7 +82,7 @@ public final class JmsMessageHelper {
                 } else {
                     bodyMessage = (DefaultMessage) exchange.getIn();
                 }
-                switch (JmsMessageHelper.discoverType(message)) {
+                switch (JmsMessageExchangeHelper.discoverType(message)) {
                 case Bytes:
                     BytesMessage bytesMessage = (BytesMessage) message;
                     if (bytesMessage.getBodyLength() > Integer.MAX_VALUE) {
@@ -269,9 +269,7 @@ public final class JmsMessageHelper {
         try {
             message.setJMSReplyTo(replyTo);
         } catch (Exception e) {
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug("Error setting the correlationId: {}", replyTo.toString());
-            }
+            LOGGER.debug("Error setting the correlationId: {}", replyTo.toString());
         }
     }
 
@@ -387,7 +385,7 @@ public final class JmsMessageHelper {
             } else if (headerName.equalsIgnoreCase("JMSPriority")) {
                 jmsMessage.setJMSPriority(ExchangeHelper.convertToType(exchange, Integer.class, headerValue));
             } else if (headerName.equalsIgnoreCase("JMSDeliveryMode")) {
-                JmsMessageHelper.setJMSDeliveryMode(exchange, jmsMessage, headerValue);
+                JmsMessageExchangeHelper.setJMSDeliveryMode(exchange, jmsMessage, headerValue);
             } else if (headerName.equalsIgnoreCase("JMSExpiration")) {
                 jmsMessage.setJMSExpiration(ExchangeHelper.convertToType(exchange, Long.class, headerValue));
             } else {
@@ -410,7 +408,7 @@ public final class JmsMessageHelper {
                 } else {
                     if (!(headerValue instanceof JmsMessageType)) {
                         String encodedName = new DefaultJmsKeyFormatStrategy().encodeKey(headerName);
-                        JmsMessageHelper.setProperty(jmsMessage, encodedName, headerValue);
+                        JmsMessageExchangeHelper.setProperty(jmsMessage, encodedName, headerValue);
                     }
                 }
             }            
@@ -492,10 +490,10 @@ public final class JmsMessageHelper {
                         jmsMessage.getJMSTimestamp());
                 headers.put(
                         JmsMessageHeaderType.JMSReplyTo.toString(), 
-                        JmsMessageHelper.getJMSReplyTo(jmsMessage));
+                        JmsMessageExchangeHelper.getJMSReplyTo(jmsMessage));
                 headers.put(
                         JmsMessageHeaderType.JMSType.toString(), 
-                        JmsMessageHelper.getJMSType(jmsMessage));
+                        JmsMessageExchangeHelper.getJMSType(jmsMessage));
 
                 // this works around a bug in the ActiveMQ property handling
                 headers.put(
@@ -537,7 +535,7 @@ public final class JmsMessageHelper {
             } else {
                 body = exchange.getIn().getBody();
             }
-            JmsMessageType messageType = JmsMessageHelper.discoverType(exchange);
+            JmsMessageType messageType = JmsMessageExchangeHelper.discoverType(exchange);
 
             switch (messageType) {
             case Bytes:
@@ -566,6 +564,7 @@ public final class JmsMessageHelper {
                 answer = textMessage;
                 break;
             default:
+                answer = session.createMessage();
                 break;
             }
         } catch (Exception e) {
@@ -573,9 +572,60 @@ public final class JmsMessageHelper {
             throw e;
         }
 
-        answer = JmsMessageHelper.setJmsMessageHeaders(exchange, answer);
+        answer = JmsMessageExchangeHelper.setJmsMessageHeaders(exchange, answer);
         return answer;
     }
+//    
+//    @SuppressWarnings("unchecked")
+//    public static Message createMessage(Session session, Object payload, ) throws Exception {
+//        Message answer = null;
+//        Object body = null;
+//        try {
+//            if (out && exchange.getOut().getBody() != null) {
+//                body = exchange.getOut().getBody();
+//            } else {
+//                body = exchange.getIn().getBody();
+//            }
+//            JmsMessageType messageType = JmsMessageExchangeHelper.discoverType(exchange);
+//
+//            switch (messageType) {
+//            case Bytes:
+//                BytesMessage bytesMessage = session.createBytesMessage();
+//                bytesMessage.writeBytes((byte[])body);
+//                answer = bytesMessage;
+//                break;
+//            case Map:
+//                MapMessage mapMessage = session.createMapMessage();
+//                Map<String, Object> objMap = (Map<String, Object>)body;
+//                Set<String> keys = objMap.keySet();
+//                for (String key : keys) {
+//                    Object value = objMap.get(key);
+//                    mapMessage.setObject(key, value);
+//                }
+//                answer = mapMessage;
+//                break;
+//            case Object:
+//                ObjectMessage objectMessage = session.createObjectMessage();
+//                objectMessage.setObject((Serializable)body);
+//                answer = objectMessage;
+//                break;
+//            case Text:
+//                TextMessage textMessage = session.createTextMessage();
+//                textMessage.setText((String)body);
+//                answer = textMessage;
+//                break;
+//            default:
+//                break;
+//            }
+//        } catch (Exception e) {
+//            LOGGER.error("TODO Auto-generated catch block", e);
+//            throw e;
+//        }
+//
+//        answer = JmsMessageExchangeHelper.setJmsMessageHeaders(exchange, answer);
+//        return answer;
+//    }
+//
     
     private static boolean hasIllegalHeaderKey(String key) {
         if (key == null) {