You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/05/16 09:51:07 UTC

[16/50] [abbrv] camel git commit: Allow sjms endpoints to be used without requiring prior initialization of the component with a connection factory to make this endpoint xml friendly. The new endpoint usage is also friendly to context restart

Allow sjms endpoints to be used without requiring prior initialization of the component with a connection factory to make this endpoint xml friendly. The new endpoint usage is also friendly to context restart


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8137ea4a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8137ea4a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8137ea4a

Branch: refs/heads/kube-lb
Commit: 8137ea4a2a471e373b23a561e6da0780e9007e10
Parents: 0fb326d
Author: near-ethic <da...@gmail.com>
Authored: Mon Apr 25 13:25:12 2016 +0300
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon May 16 09:59:33 2016 +0200

----------------------------------------------------------------------
 components/camel-sjms/src/main/docs/sjms.adoc   |  9 ++-
 .../camel/component/sjms/SjmsComponent.java     | 29 --------
 .../camel/component/sjms/SjmsEndpoint.java      | 68 ++++++++++++++++++
 .../sjms/batch/SjmsBatchComponent.java          |  4 ++
 .../SjmsEndpointConnectionSettingsTest.java     | 72 ++++++++++++++++++++
 5 files changed, 152 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8137ea4a/components/camel-sjms/src/main/docs/sjms.adoc
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/docs/sjms.adoc b/components/camel-sjms/src/main/docs/sjms.adoc
index 5d18f9c..56f2e49 100644
--- a/components/camel-sjms/src/main/docs/sjms.adoc
+++ b/components/camel-sjms/src/main/docs/sjms.adoc
@@ -123,8 +123,10 @@ The Simple JMS component supports 9 options which are listed below.
 
 
 
+
+
 // endpoint options: START
-The Simple JMS component supports 29 endpoint options which are listed below:
+The Simple JMS component supports 32 endpoint options which are listed below:
 
 [width="100%",cols="2s,1,1m,1m,5",options="header"]
 |=======================================================================
@@ -147,6 +149,9 @@ The Simple JMS component supports 29 endpoint options which are listed below:
 | responseTimeOut | producer (advanced) | 5000 | long | Sets the amount of time we should wait before timing out a InOut response.
 | asyncStartListener | advanced | false | boolean | Whether to startup the consumer message listener asynchronously when starting a route. For example if a JmsConsumer cannot get a connection to a remote JMS broker then it may block while retrying and/or failover. This will cause Camel to block while starting routes. By setting this option to true you will let routes startup while the JmsConsumer connects to the JMS broker using a dedicated thread in asynchronous mode. If this option is used then beware that if the connection could not be established then an exception is logged at WARN level and the consumer will not be able to receive messages; You can then restart the route to retry.
 | asyncStopListener | advanced | false | boolean | Whether to stop the consumer message listener asynchronously when stopping a route.
+| connectionCount | advanced |  | Integer | The maximum number of connections available to this endpoint
+| connectionFactory | advanced |  | ConnectionFactory | Initializes the connectionFactory for the endpoint which takes precedence over the component's connectionFactory if any
+| connectionResource | advanced |  | ConnectionResource | Initializes the connectionResource for the endpoint which takes precedence over the component's connectionResource if any
 | destinationCreationStrategy | advanced |  | DestinationCreationStrategy | To use a custom DestinationCreationStrategy.
 | exchangePattern | advanced | InOnly | ExchangePattern | Sets the default exchange pattern when creating an exchange
 | headerFilterStrategy | advanced |  | HeaderFilterStrategy | To use a custom HeaderFilterStrategy to filter header to and from Camel message.
@@ -162,6 +167,8 @@ The Simple JMS component supports 29 endpoint options which are listed below:
 // endpoint options: END
 
 
+
+
 Below is an example of how to configure the `SjmsComponent` with its
 required `ConnectionFactory` provider. It will create a single connection
 by default and store it using the components internal pooling APIs to

http://git-wip-us.apache.org/repos/asf/camel/blob/8137ea4a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
index e1132bf..2294f55 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
@@ -23,7 +23,6 @@ import javax.jms.ConnectionFactory;
 import org.apache.camel.CamelException;
 import org.apache.camel.Endpoint;
 import org.apache.camel.ExchangePattern;
-import org.apache.camel.component.sjms.jms.ConnectionFactoryResource;
 import org.apache.camel.component.sjms.jms.ConnectionResource;
 import org.apache.camel.component.sjms.jms.DefaultJmsKeyFormatStrategy;
 import org.apache.camel.component.sjms.jms.DestinationCreationStrategy;
@@ -33,18 +32,14 @@ import org.apache.camel.component.sjms.taskmanager.TimedTaskManager;
 import org.apache.camel.impl.UriEndpointComponent;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.HeaderFilterStrategyAware;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * The <a href="http://camel.apache.org/sjms">Simple JMS</a> component.
  */
 public class SjmsComponent extends UriEndpointComponent implements HeaderFilterStrategyAware {
-    private static final Logger LOGGER = LoggerFactory.getLogger(SjmsComponent.class);
 
     private ConnectionFactory connectionFactory;
     private ConnectionResource connectionResource;
-    private volatile boolean closeConnectionResource;
     private HeaderFilterStrategy headerFilterStrategy = new SjmsHeaderFilterStrategy();
     private JmsKeyFormatStrategy jmsKeyFormatStrategy = new DefaultJmsKeyFormatStrategy();
     private Integer connectionCount = 1;
@@ -106,21 +101,7 @@ public class SjmsComponent extends UriEndpointComponent implements HeaderFilterS
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-
         timedTaskManager = new TimedTaskManager();
-
-        LOGGER.trace("Verify ConnectionResource");
-        if (getConnectionResource() == null) {
-            LOGGER.debug("No ConnectionResource provided. Initialize the ConnectionFactoryResource.");
-            // We always use a connection pool, even for a pool of 1
-            ConnectionFactoryResource connections = new ConnectionFactoryResource(getConnectionCount(), getConnectionFactory());
-            connections.fillPool();
-            setConnectionResource(connections);
-            // we created the resource so we should close it when stopping
-            closeConnectionResource = true;
-        } else if (getConnectionResource() instanceof ConnectionFactoryResource) {
-            ((ConnectionFactoryResource) getConnectionResource()).fillPool();
-        }
     }
 
     @Override
@@ -129,16 +110,6 @@ public class SjmsComponent extends UriEndpointComponent implements HeaderFilterS
             timedTaskManager.cancelTasks();
             timedTaskManager = null;
         }
-
-        if (closeConnectionResource) {
-            if (getConnectionResource() != null) {
-                if (getConnectionResource() instanceof ConnectionFactoryResource) {
-                    ((ConnectionFactoryResource) getConnectionResource()).drainPool();
-                }
-            }
-            connectionResource = null;
-        }
-
         super.doStop();
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/8137ea4a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
index 272e816..f6d5ce9 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.sjms;
 
+import javax.jms.ConnectionFactory;
 import javax.jms.Message;
 import javax.jms.Session;
 
@@ -27,6 +28,7 @@ import org.apache.camel.ExchangePattern;
 import org.apache.camel.MultipleConsumersSupport;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
+import org.apache.camel.component.sjms.jms.ConnectionFactoryResource;
 import org.apache.camel.component.sjms.jms.ConnectionResource;
 import org.apache.camel.component.sjms.jms.DefaultDestinationCreationStrategy;
 import org.apache.camel.component.sjms.jms.DefaultJmsKeyFormatStrategy;
@@ -45,6 +47,7 @@ import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
+import org.apache.camel.util.EndpointHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -115,6 +118,13 @@ public class SjmsEndpoint extends DefaultEndpoint implements AsyncEndpoint, Mult
     private MessageCreatedStrategy messageCreatedStrategy;
     @UriParam(label = "advanced")
     private JmsKeyFormatStrategy jmsKeyFormatStrategy;
+    @UriParam(label = "advanced")
+    private ConnectionResource connectionResource;
+    @UriParam(label = "advanced")
+    private ConnectionFactory connectionFactory;
+    @UriParam(label = "advanced")
+    private Integer connectionCount;
+    private volatile boolean closeConnectionResource;
 
     public SjmsEndpoint() {
     }
@@ -134,10 +144,29 @@ public class SjmsEndpoint extends DefaultEndpoint implements AsyncEndpoint, Mult
     @Override
     protected void doStart() throws Exception {
         super.doStart();
+        if (getConnectionResource() == null) {
+            if (getConnectionFactory() != null) {
+                // We always use a connection pool, even for a pool of 1
+                ConnectionFactoryResource connections = new ConnectionFactoryResource(getConnectionCount(), getConnectionFactory());
+                connections.fillPool();
+                connectionResource = connections;
+                // we created the resource so we should close it when stopping
+                closeConnectionResource = true;
+            }
+        } else if (getConnectionResource() instanceof ConnectionFactoryResource) {
+            ((ConnectionFactoryResource) getConnectionResource()).fillPool();
+        }
     }
 
     @Override
     protected void doStop() throws Exception {
+        if (closeConnectionResource) {
+            if (connectionResource instanceof ConnectionFactoryResource) {
+                ((ConnectionFactoryResource) getConnectionResource()).drainPool();
+            }
+            closeConnectionResource = false;
+            connectionResource = null;
+        }
         super.doStop();
     }
 
@@ -236,9 +265,20 @@ public class SjmsEndpoint extends DefaultEndpoint implements AsyncEndpoint, Mult
     }
 
     public ConnectionResource getConnectionResource() {
+        if (connectionResource != null) {
+            return connectionResource;
+        }
         return getComponent().getConnectionResource();
     }
 
+    /**
+     * Initializes the connectionResource for the endpoint, which takes precedence over the component's connectionResource, if any
+     */
+    public void setConnectionResource(String connectionResource) {
+        this.connectionResource = EndpointHelper.resolveReferenceParameter(getCamelContext(), connectionResource, ConnectionResource.class);
+    }
+
+
     public boolean isSynchronous() {
         return synchronous;
     }
@@ -531,4 +571,32 @@ public class SjmsEndpoint extends DefaultEndpoint implements AsyncEndpoint, Mult
         this.jmsKeyFormatStrategy = jmsKeyFormatStrategy;
     }
 
+    /**
+     * Initializes the connectionFactory for the endpoint, which takes precedence over the component's connectionFactory, if any
+     */
+    public void setConnectionFactory(String connectionFactory) {
+        this.connectionFactory = EndpointHelper.resolveReferenceParameter(getCamelContext(), connectionFactory, ConnectionFactory.class);
+
+    }
+
+    public ConnectionFactory getConnectionFactory() {
+        if (connectionFactory != null) {
+            return connectionFactory;
+        }
+        return getComponent().getConnectionFactory();
+    }
+
+    public int getConnectionCount() {
+        if (connectionCount != null) {
+            return connectionCount;
+        }
+        return getComponent().getConnectionCount();
+    }
+
+    /**
+     * The maximum number of connections available to this endpoint
+     */
+    public void setConnectionCount(Integer connectionCount) {
+        this.connectionCount = connectionCount;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/8137ea4a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
index 421fd8a..9029468 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
@@ -33,6 +33,10 @@ public class SjmsBatchComponent extends UriEndpointComponent {
 
     @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        ConnectionFactory cf = resolveAndRemoveReferenceParameter(parameters, "connectionFactory", ConnectionFactory.class);
+        if(cf != null){
+            setConnectionFactory(cf);
+        }
         ObjectHelper.notNull(connectionFactory, "connectionFactory");
         SjmsBatchEndpoint sjmsBatchEndpoint = new SjmsBatchEndpoint(uri, this, remaining);
         setProperties(sjmsBatchEndpoint, parameters);

http://git-wip-us.apache.org/repos/asf/camel/blob/8137ea4a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointConnectionSettingsTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointConnectionSettingsTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointConnectionSettingsTest.java
new file mode 100644
index 0000000..25045b1
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointConnectionSettingsTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms;
+
+import java.util.Random;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.component.sjms.SjmsEndpoint;
+import org.apache.camel.component.sjms.jms.ConnectionFactoryResource;
+import org.apache.camel.component.sjms.jms.ConnectionResource;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class SjmsEndpointConnectionSettingsTest extends CamelTestSupport {
+    private static final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.persistent=false&broker.useJmx=false");
+    private static final ConnectionResource connectionResource = new ConnectionFactoryResource(2, connectionFactory);
+
+    @Test
+    public void testConnectionFactory() {
+        Endpoint endpoint = context.getEndpoint("sjms:queue:test?connectionFactory=activemq");
+        assertNotNull(endpoint);
+        assertTrue(endpoint instanceof SjmsEndpoint);
+        SjmsEndpoint qe = (SjmsEndpoint) endpoint;
+        assertEquals(connectionFactory, qe.getConnectionFactory());
+    }
+
+    @Test
+    public void testConnectionResource() {
+        Endpoint endpoint = context.getEndpoint("sjms:queue:test?connectionResource=connresource");
+        assertNotNull(endpoint);
+        assertTrue(endpoint instanceof SjmsEndpoint);
+        SjmsEndpoint qe = (SjmsEndpoint) endpoint;
+        assertEquals(connectionResource, qe.getConnectionResource());
+    }
+
+    @Test
+    public void testConnectionCount() {
+        Random random = new Random();
+        int poolSize = random.nextInt(100);
+        Endpoint endpoint = context.getEndpoint("sjms:queue:test?connectionCount=" + poolSize);
+        assertNotNull(endpoint);
+        assertTrue(endpoint instanceof SjmsEndpoint);
+        SjmsEndpoint qe = (SjmsEndpoint) endpoint;
+        assertEquals(poolSize, qe.getConnectionCount());
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        SimpleRegistry registry = new SimpleRegistry();
+        registry.put("activemq", connectionFactory);
+        registry.put("connresource", connectionResource);
+        return new DefaultCamelContext(registry);
+    }
+}