You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by dm...@apache.org on 2018/07/13 15:28:28 UTC

[camel] branch camel-2.21.x updated: CAMEL-12624: ActiveMQ Artemis AMQP integration issue with topic prefix hardcode

This is an automated email from the ASF dual-hosted git repository.

dmvolod pushed a commit to branch camel-2.21.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.21.x by this push:
     new 1e5b777  CAMEL-12624: ActiveMQ Artemis AMQP integration issue with topic prefix hardcode
1e5b777 is described below

commit 1e5b7772d74c90cd78625461ec052a98f5437f97
Author: Dmitry Volodin <dm...@gmail.com>
AuthorDate: Fri Jul 13 17:57:44 2018 +0300

    CAMEL-12624: ActiveMQ Artemis AMQP integration issue with topic prefix
    hardcode
---
 components/camel-amqp/pom.xml                      |  12 +++
 .../apache/camel/component/amqp/AMQPComponent.java |   4 +-
 .../component/amqp/AMQPConnectionDetails.java      |  19 +++-
 .../amqp/artemis/AMQPEmbeddedBrokerTest.java       | 117 +++++++++++++++++++++
 4 files changed, 150 insertions(+), 2 deletions(-)

diff --git a/components/camel-amqp/pom.xml b/components/camel-amqp/pom.xml
index eb24b66..75282c9 100644
--- a/components/camel-amqp/pom.xml
+++ b/components/camel-amqp/pom.xml
@@ -98,6 +98,18 @@
     </dependency>
     <dependency>
       <groupId>org.apache.activemq</groupId>
+      <artifactId>artemis-server</artifactId>
+      <version>${activemq-artemis-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>artemis-amqp-protocol</artifactId>
+      <version>${activemq-artemis-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-amqp</artifactId>
       <scope>test</scope>
     </dependency>
diff --git a/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPComponent.java b/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPComponent.java
index 43aebab..cc1bda6 100644
--- a/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPComponent.java
+++ b/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPComponent.java
@@ -56,7 +56,9 @@ public class AMQPComponent extends JmsComponent {
         if (connectionDetails.size() == 1) {
             AMQPConnectionDetails details = connectionDetails.iterator().next();
             JmsConnectionFactory connectionFactory = new JmsConnectionFactory(details.username(), details.password(), details.uri());
-            connectionFactory.setTopicPrefix("topic://");
+            if (details.setTopicPrefix()) {
+                connectionFactory.setTopicPrefix("topic://");
+            }
             setConnectionFactory(connectionFactory);
         }
         super.doStart();
diff --git a/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPConnectionDetails.java b/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPConnectionDetails.java
index 029e17b..66732e6 100644
--- a/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPConnectionDetails.java
+++ b/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPConnectionDetails.java
@@ -28,17 +28,29 @@ public class AMQPConnectionDetails {
     public static final String AMQP_USERNAME = "AMQP_SERVICE_USERNAME";
 
     public static final String AMQP_PASSWORD = "AMQP_SERVICE_PASSWORD";
+    
+    public static final String AMQP_SET_TOPIC_PREFIX = "AMQP_SET_TOPIC_PREFIX";
 
     private final String uri;
 
     private final String username;
 
     private final String password;
+    
+    private final boolean setTopicPrefix;
 
     public AMQPConnectionDetails(String uri, String username, String password) {
         this.uri = uri;
         this.username = username;
         this.password = password;
+        this.setTopicPrefix = true; 
+    }
+    
+    public AMQPConnectionDetails(String uri, String username, String password, boolean setTopicPrefix) {
+        this.uri = uri;
+        this.username = username;
+        this.password = password;
+        this.setTopicPrefix = setTopicPrefix;
     }
 
     public AMQPConnectionDetails(String uri) {
@@ -53,8 +65,9 @@ public class AMQPConnectionDetails {
             int port = Integer.parseInt(property(propertiesComponent, AMQP_PORT, "5672"));
             String username = property(propertiesComponent, AMQP_USERNAME, null);
             String password = property(propertiesComponent, AMQP_PASSWORD, null);
+            boolean setTopicPrefix = Boolean.parseBoolean(property(propertiesComponent, AMQP_SET_TOPIC_PREFIX, "true"));
 
-            return new AMQPConnectionDetails("amqp://" + host + ":" + port, username, password);
+            return new AMQPConnectionDetails("amqp://" + host + ":" + port, username, password, setTopicPrefix);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -71,6 +84,10 @@ public class AMQPConnectionDetails {
     public String password() {
         return password;
     }
+    
+    public boolean setTopicPrefix() {
+        return setTopicPrefix;
+    }
 
     // Helpers
 
diff --git a/components/camel-amqp/src/test/java/org/apache/camel/component/amqp/artemis/AMQPEmbeddedBrokerTest.java b/components/camel-amqp/src/test/java/org/apache/camel/component/amqp/artemis/AMQPEmbeddedBrokerTest.java
new file mode 100644
index 0000000..563aa46
--- /dev/null
+++ b/components/camel-amqp/src/test/java/org/apache/camel/component/amqp/artemis/AMQPEmbeddedBrokerTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.amqp.artemis;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.amqp.AMQPComponent;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.impl.PropertyPlaceholderDelegateRegistry;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.apache.camel.component.amqp.AMQPConnectionDetails.AMQP_PORT;
+import static org.apache.camel.component.amqp.AMQPConnectionDetails.AMQP_SET_TOPIC_PREFIX;
+import static org.apache.camel.component.amqp.AMQPConnectionDetails.discoverAMQP;
+
+public class AMQPEmbeddedBrokerTest extends CamelTestSupport {
+    
+    static int amqpPort = AvailablePortFinder.getNextAvailable();
+    
+    static EmbeddedActiveMQ server = new EmbeddedActiveMQ();
+    
+    @EndpointInject(uri = "mock:result")
+    MockEndpoint resultEndpoint;
+
+    String expectedBody = "Hello there!";
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        Configuration config = new ConfigurationImpl();
+        AddressSettings addressSettings = new AddressSettings();
+        // Disable auto create address to make sure that topic name is correct without prefix
+        addressSettings.setAutoCreateAddresses(false);
+        config.addAcceptorConfiguration("amqp", "tcp://0.0.0.0:" + amqpPort 
+                                        + "?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300");
+        config.setPersistenceEnabled(false);
+        config.addAddressesSetting("#", addressSettings);
+        config.setSecurityEnabled(false);
+        
+        // Set explicit topic name
+        CoreAddressConfiguration pingTopicConfig = new CoreAddressConfiguration();
+        pingTopicConfig.setName("topic.ping");
+        pingTopicConfig.addRoutingType(RoutingType.MULTICAST);
+        
+        config.addAddressConfiguration(pingTopicConfig);
+        
+        server.setConfiguration(config);
+        server.start();
+        System.setProperty(AMQP_PORT, amqpPort + "");
+        System.setProperty(AMQP_SET_TOPIC_PREFIX, "false");
+    }
+
+    @AfterClass
+    public static void afterClass() throws Exception {
+        server.stop();
+    }
+    
+    @Test
+    public void testTopicWithoutPrefix() throws Exception {
+        resultEndpoint.expectedMessageCount(1);
+        template.sendBody("direct:send-topic", expectedBody);
+        resultEndpoint.assertIsSatisfied();
+    }
+    
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry registry = super.createRegistry();
+        return registry;
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        JndiRegistry registry = (JndiRegistry)((PropertyPlaceholderDelegateRegistry)camelContext.getRegistry()).getRegistry();
+        registry.bind("amqpConnection", discoverAMQP(camelContext));
+        camelContext.addComponent("amqp-customized", new AMQPComponent());
+        return camelContext;
+    }
+    
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("direct:send-topic")
+                    .to("amqp-customized:topic:topic.ping");
+                
+                from("amqp-customized:topic:topic.ping")
+                    .to("log:routing")
+                    .to("mock:result");
+            }
+        };
+    }
+}