You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/10/21 19:29:51 UTC

[camel] 02/06: CAMEL-11931: camel-jms - Add better support for Stream JMS message type

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 314322590adcf181348dcdb8ab8842fc289fbdf7
Author: Claus Ibsen <da...@apache.org>
AuthorDate: Fri Oct 20 16:48:29 2017 +0200

    CAMEL-11931: camel-jms - Add better support for Stream JMS message type
---
 .../camel-jms/src/main/docs/jms-component.adoc     |  6 ++--
 .../org/apache/camel/component/jms/JmsBinding.java | 16 ++++++----
 .../apache/camel/component/jms/JmsComponent.java   | 20 ++++++++++++
 .../camel/component/jms/JmsConfiguration.java      | 21 ++++++++++++-
 .../component/jms/JmsStreamMessageTypeTest.java    |  9 +++---
 .../jms/springboot/JmsComponentConfiguration.java  | 36 ++++++++++++++++++++++
 6 files changed, 94 insertions(+), 14 deletions(-)

diff --git a/components/camel-jms/src/main/docs/jms-component.adoc b/components/camel-jms/src/main/docs/jms-component.adoc
index cf43483..351fdd0 100644
--- a/components/camel-jms/src/main/docs/jms-component.adoc
+++ b/components/camel-jms/src/main/docs/jms-component.adoc
@@ -199,7 +199,7 @@ about these properties by consulting the relevant Spring documentation.
 
 
 // component options: START
-The JMS component supports 78 options which are listed below.
+The JMS component supports 79 options which are listed below.
 
 
 
@@ -282,6 +282,7 @@ The JMS component supports 78 options which are listed below.
 | *subscriptionDurable* (consumer) | Set whether to make the subscription durable. The durable subscription name to be used can be specified through the subscriptionName property. Default is false. Set this to true to register a durable subscription typically in combination with a subscriptionName value (unless your message listener class name is good enough as subscription name). Only makes sense when listening to a topic (pub-sub domain) therefore this method switches the pubSubDomain  [...]
 | *subscriptionShared* (consumer) | Set whether to make the subscription shared. The shared subscription name to be used can be specified through the subscriptionName property. Default is false. Set this to true to register a shared subscription typically in combination with a subscriptionName value (unless your message listener class name is good enough as subscription name). Note that shared subscriptions may also be durable so this flag can (and often will) be combined with subscripti [...]
 | *subscriptionName* (consumer) | Set the name of a subscription to create. To be applied in case of a topic (pub-sub domain) with a shared or durable subscription. The subscription name needs to be unique within this client's JMS client id. Default is the class name of the specified message listener. Note: Only 1 concurrent consumer (which is the default of this message listener container) is allowed for each subscription except for a shared subscription (which requires JMS 2.0). |  | String
+| *streamMessageType Enabled* (producer) | Sets whether StreamMessage type is enabled or not. Message payloads of streaming kind such as files InputStream etc will either by sent as BytesMessage or StreamMessage. This option controls which kind will be used. By default BytesMessage is used which enforces the entire message payload to be read into memory. By enabling this option the message payload is read into memory in chunks and each chunk is then written to the StreamMessage until no  [...]
 | *headerFilterStrategy* (filter) | To use a custom org.apache.camel.spi.HeaderFilterStrategy to filter header to and from Camel message. |  | HeaderFilterStrategy
 | *resolveProperty Placeholders* (advanced) | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | boolean
 |===
@@ -322,7 +323,7 @@ with the following path and query parameters:
 | *destinationName* | *Required* Name of the queue or topic to use as destination |  | String
 |===
 
-==== Query Parameters (89 parameters):
+==== Query Parameters (90 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
@@ -378,6 +379,7 @@ with the following path and query parameters:
 | *includeSentJMSMessageID* (producer) | Only applicable when sending to JMS destination using InOnly (eg fire and forget). Enabling this option will enrich the Camel Exchange with the actual JMSMessageID that was used by the JMS client when the message was sent to the JMS destination. | false | boolean
 | *replyToCacheLevelName* (producer) | Sets the cache level by name for the reply consumer when doing request/reply over JMS. This option only applies when using fixed reply queues (not temporary). Camel will by default use: CACHE_CONSUMER for exclusive or shared w/ replyToSelectorName. And CACHE_SESSION for shared without replyToSelectorName. Some JMS brokers such as IBM WebSphere may require to set the replyToCacheLevelName=CACHE_NONE to work. Note: If using temporary queues then CACHE [...]
 | *replyToDestinationSelector Name* (producer) | Sets the JMS Selector using the fixed name to be used so you can filter out your own replies from the others when using a shared queue (that is if you are not using a temporary reply queue). |  | String
+| *streamMessageTypeEnabled* (producer) | Sets whether StreamMessage type is enabled or not. Message payloads of streaming kind such as files InputStream etc will either by sent as BytesMessage or StreamMessage. This option controls which kind will be used. By default BytesMessage is used which enforces the entire message payload to be read into memory. By enabling this option the message payload is read into memory in chunks and each chunk is then written to the StreamMessage until no m [...]
 | *allowSerializedHeaders* (advanced) | Controls whether or not to include serialized headers. Applies only when link isTransferExchange() is true. This requires that the objects are serializable. Camel will exclude any non-serializable objects and log it at WARN level. | false | boolean
 | *asyncStartListener* (advanced) | Whether to startup the JmsConsumer message listener asynchronously when starting a route. For example if a JmsConsumer cannot get a connection to a remote JMS broker then it may block while retrying and/or failover. This will cause Camel to block while starting routes. By setting this option to true you will let routes startup while the JmsConsumer connects to the JMS broker using a dedicated thread in asynchronous mode. If this option is used then bew [...]
 | *asyncStopListener* (advanced) | Whether to stop the JmsConsumer message listener asynchronously when stopping a route. | false | boolean
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
index 3a8439f..2684ea8 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
@@ -42,8 +42,6 @@ import javax.jms.Session;
 import javax.jms.StreamMessage;
 import javax.jms.TextMessage;
 
-import org.apache.camel.util.FileUtil;
-import org.apache.camel.util.IOHelper;
 import org.w3c.dom.Node;
 
 import org.apache.camel.CamelContext;
@@ -57,6 +55,7 @@ import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.EndpointHelper;
 import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.FileUtil;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,6 +65,7 @@ import static org.apache.camel.component.jms.JmsMessageHelper.normalizeDestinati
 import static org.apache.camel.component.jms.JmsMessageType.Bytes;
 import static org.apache.camel.component.jms.JmsMessageType.Map;
 import static org.apache.camel.component.jms.JmsMessageType.Object;
+import static org.apache.camel.component.jms.JmsMessageType.Stream;
 import static org.apache.camel.component.jms.JmsMessageType.Text;
 
 /**
@@ -293,7 +293,7 @@ public class JmsBinding {
                             answer = answer instanceof MapMessage ? answer : null;
                         } else if (type == JmsMessageType.Object) {
                             answer = answer instanceof ObjectMessage ? answer : null;
-                        } else if (type == JmsMessageType.Stream) {
+                        } else if (type == Stream) {
                             answer = answer instanceof StreamMessage ? answer : null;
                         }
                     }
@@ -555,20 +555,24 @@ public class JmsBinding {
      * @return type or null if no mapping was possible
      */
     protected JmsMessageType getJMSMessageTypeForBody(Exchange exchange, Object body, Map<String, Object> headers, Session session, CamelContext context) {
+        boolean streamingEnabled = endpoint.getConfiguration().isStreamMessageTypeEnabled();
+
         JmsMessageType type = null;
         // let body determine the type
         if (body instanceof Node || body instanceof String) {
             type = Text;
-        } else if (body instanceof byte[] || body instanceof WrappedFile || body instanceof File || body instanceof Reader
-                || body instanceof InputStream || body instanceof ByteBuffer || body instanceof StreamCache) {
+        } else if (body instanceof byte[] || body instanceof ByteBuffer) {
             type = Bytes;
+        } else if (body instanceof WrappedFile || body instanceof File || body instanceof Reader
+                || body instanceof InputStream || body instanceof StreamCache) {
+            type = streamingEnabled ? Stream : Bytes;
         } else if (body instanceof Map) {
             type = Map;
         } else if (body instanceof Serializable) {
             type = Object;            
         } else if (exchange.getContext().getTypeConverter().tryConvertTo(File.class, body) != null
                 || exchange.getContext().getTypeConverter().tryConvertTo(InputStream.class, body) != null) {
-            type = Bytes;
+            type = streamingEnabled ? Stream : Bytes;
         }
         return type;
     }
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
index 3e03966..134f8fa 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
@@ -1213,6 +1213,26 @@ public class JmsComponent extends HeaderFilterStrategyComponent implements Appli
         getConfiguration().setSubscriptionName(subscriptionName);
     }
 
+
+    public boolean isStreamMessageTypeEnabled() {
+        return getConfiguration().isStreamMessageTypeEnabled();
+    }
+
+    /**
+     * Sets whether StreamMessage type is enabled or not.
+     * Message payloads of streaming kind such as files, InputStream, etc will either by sent as BytesMessage or StreamMessage.
+     * This option controls which kind will be used. By default BytesMessage is used which enforces the entire message payload to be read into memory.
+     * By enabling this option the message payload is read into memory in chunks and each chunk is then written to the StreamMessage until no more data.
+     */
+    @Metadata(label = "producer,advanced", description = "Sets whether StreamMessage type is enabled or not."
+        + " Message payloads of streaming kind such as files, InputStream, etc will either by sent as BytesMessage or StreamMessage."
+        + " This option controls which kind will be used. By default BytesMessage is used which enforces the entire message payload to be read into memory."
+        + " By enabling this option the message payload is read into memory in chunks and each chunk is then written to the StreamMessage until no more data.")
+    public void setStreamMessageTypeEnabled(boolean streamMessageTypeEnabled) {
+        getConfiguration().setStreamMessageTypeEnabled(streamMessageTypeEnabled);
+    }
+
+
     // Implementation methods
     // -------------------------------------------------------------------------
 
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
index 27ac6b8..eb054ce 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
@@ -485,6 +485,12 @@ public class JmsConfiguration implements Cloneable {
         + " Requires a JMS 2.0 compatible message broker.")
     private boolean subscriptionShared;
 
+    @UriParam(label = "producer,advanced", description = "Sets whether StreamMessage type is enabled or not."
+        + " Message payloads of streaming kind such as files, InputStream, etc will either by sent as BytesMessage or StreamMessage."
+        + " This option controls which kind will be used. By default BytesMessage is used which enforces the entire message payload to be read into memory."
+        + " By enabling this option the message payload is read into memory in chunks and each chunk is then written to the StreamMessage until no more data.")
+    private boolean streamMessageTypeEnabled;
+
     public JmsConfiguration() {
     }
 
@@ -2193,5 +2199,18 @@ public class JmsConfiguration implements Cloneable {
     public void setSubscriptionName(String subscriptionName) {
         this.subscriptionName = subscriptionName;
     }
-    
+
+    public boolean isStreamMessageTypeEnabled() {
+        return streamMessageTypeEnabled;
+    }
+
+    /**
+     * Sets whether StreamMessage type is enabled or not.
+     * Message payloads of streaming kind such as files, InputStream, etc will either by sent as BytesMessage or StreamMessage.
+     * This option controls which kind will be used. By default BytesMessage is used which enforces the entire message payload to be read into memory.
+     * By enabling this option the message payload is read into memory in chunks and each chunk is then written to the StreamMessage until no more data.
+     */
+    public void setStreamMessageTypeEnabled(boolean streamMessageTypeEnabled) {
+        this.streamMessageTypeEnabled = streamMessageTypeEnabled;
+    }
 }
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsStreamMessageTypeTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsStreamMessageTypeTest.java
index d9ee858..72d3d64 100644
--- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsStreamMessageTypeTest.java
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsStreamMessageTypeTest.java
@@ -28,9 +28,6 @@ import org.junit.Test;
 
 import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
 
-/**
- * @version 
- */
 public class JmsStreamMessageTypeTest extends CamelTestSupport {
 
     @Override
@@ -43,7 +40,9 @@ public class JmsStreamMessageTypeTest extends CamelTestSupport {
         CamelContext camelContext = super.createCamelContext();
 
         ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory();
-        camelContext.addComponent("jms", jmsComponentAutoAcknowledge(connectionFactory));
+        JmsComponent jms = jmsComponentAutoAcknowledge(connectionFactory);
+        jms.setStreamMessageTypeEnabled(true); // turn on streaming
+        camelContext.addComponent("jms", jms);
         return camelContext;
     }
 
@@ -68,7 +67,7 @@ public class JmsStreamMessageTypeTest extends CamelTestSupport {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("file:target/stream/in").to("jms:queue:foo?jmsMessageType=Stream");
+                from("file:target/stream/in").to("jms:queue:foo");
 
                 from("jms:queue:foo").to("file:target/stream/out").to("mock:result");
             }
diff --git a/platforms/spring-boot/components-starter/camel-jms-starter/src/main/java/org/apache/camel/component/jms/springboot/JmsComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-jms-starter/src/main/java/org/apache/camel/component/jms/springboot/JmsComponentConfiguration.java
index 66f737b..4e76189 100644
--- a/platforms/spring-boot/components-starter/camel-jms-starter/src/main/java/org/apache/camel/component/jms/springboot/JmsComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-jms-starter/src/main/java/org/apache/camel/component/jms/springboot/JmsComponentConfiguration.java
@@ -599,6 +599,16 @@ public class JmsComponentConfiguration
      */
     private String subscriptionName;
     /**
+     * Sets whether StreamMessage type is enabled or not. Message payloads of
+     * streaming kind such as files InputStream etc will either by sent as
+     * BytesMessage or StreamMessage. This option controls which kind will be
+     * used. By default BytesMessage is used which enforces the entire message
+     * payload to be read into memory. By enabling this option the message
+     * payload is read into memory in chunks and each chunk is then written to
+     * the StreamMessage until no more data.
+     */
+    private Boolean streamMessageTypeEnabled = false;
+    /**
      * To use a custom org.apache.camel.spi.HeaderFilterStrategy to filter
      * header to and from Camel message.
      */
@@ -1233,6 +1243,14 @@ public class JmsComponentConfiguration
         this.subscriptionName = subscriptionName;
     }
 
+    public Boolean getStreamMessageTypeEnabled() {
+        return streamMessageTypeEnabled;
+    }
+
+    public void setStreamMessageTypeEnabled(Boolean streamMessageTypeEnabled) {
+        this.streamMessageTypeEnabled = streamMessageTypeEnabled;
+    }
+
     public HeaderFilterStrategy getHeaderFilterStrategy() {
         return headerFilterStrategy;
     }
@@ -1918,6 +1936,16 @@ public class JmsComponentConfiguration
          * for a shared subscription (which requires JMS 2.0).
          */
         private String subscriptionName;
+        /**
+         * Sets whether StreamMessage type is enabled or not. Message payloads
+         * of streaming kind such as files, InputStream, etc will either by sent
+         * as BytesMessage or StreamMessage. This option controls which kind
+         * will be used. By default BytesMessage is used which enforces the
+         * entire message payload to be read into memory. By enabling this
+         * option the message payload is read into memory in chunks and each
+         * chunk is then written to the StreamMessage until no more data.
+         */
+        private Boolean streamMessageTypeEnabled = false;
 
         public ConsumerType getConsumerType() {
             return consumerType;
@@ -2664,5 +2692,13 @@ public class JmsComponentConfiguration
         public void setSubscriptionName(String subscriptionName) {
             this.subscriptionName = subscriptionName;
         }
+
+        public Boolean getStreamMessageTypeEnabled() {
+            return streamMessageTypeEnabled;
+        }
+
+        public void setStreamMessageTypeEnabled(Boolean streamMessageTypeEnabled) {
+            this.streamMessageTypeEnabled = streamMessageTypeEnabled;
+        }
     }
 }
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
"commits@camel.apache.org" <co...@camel.apache.org>.