You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by dm...@apache.org on 2018/07/13 15:28:28 UTC
[camel] branch camel-2.21.x updated: CAMEL-12624: ActiveMQ Artemis
AMQP integration issue with topic prefix hardcode
This is an automated email from the ASF dual-hosted git repository.
dmvolod pushed a commit to branch camel-2.21.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.21.x by this push:
new 1e5b777 CAMEL-12624: ActiveMQ Artemis AMQP integration issue with topic prefix hardcode
1e5b777 is described below
commit 1e5b7772d74c90cd78625461ec052a98f5437f97
Author: Dmitry Volodin <dm...@gmail.com>
AuthorDate: Fri Jul 13 17:57:44 2018 +0300
CAMEL-12624: ActiveMQ Artemis AMQP integration issue with topic prefix
hardcode
---
components/camel-amqp/pom.xml | 12 +++
.../apache/camel/component/amqp/AMQPComponent.java | 4 +-
.../component/amqp/AMQPConnectionDetails.java | 19 +++-
.../amqp/artemis/AMQPEmbeddedBrokerTest.java | 117 +++++++++++++++++++++
4 files changed, 150 insertions(+), 2 deletions(-)
diff --git a/components/camel-amqp/pom.xml b/components/camel-amqp/pom.xml
index eb24b66..75282c9 100644
--- a/components/camel-amqp/pom.xml
+++ b/components/camel-amqp/pom.xml
@@ -98,6 +98,18 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
+ <artifactId>artemis-server</artifactId>
+ <version>${activemq-artemis-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>artemis-amqp-protocol</artifactId>
+ <version>${activemq-artemis-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
<artifactId>activemq-amqp</artifactId>
<scope>test</scope>
</dependency>
diff --git a/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPComponent.java b/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPComponent.java
index 43aebab..cc1bda6 100644
--- a/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPComponent.java
+++ b/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPComponent.java
@@ -56,7 +56,9 @@ public class AMQPComponent extends JmsComponent {
if (connectionDetails.size() == 1) {
AMQPConnectionDetails details = connectionDetails.iterator().next();
JmsConnectionFactory connectionFactory = new JmsConnectionFactory(details.username(), details.password(), details.uri());
- connectionFactory.setTopicPrefix("topic://");
+ if (details.setTopicPrefix()) {
+ connectionFactory.setTopicPrefix("topic://");
+ }
setConnectionFactory(connectionFactory);
}
super.doStart();
diff --git a/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPConnectionDetails.java b/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPConnectionDetails.java
index 029e17b..66732e6 100644
--- a/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPConnectionDetails.java
+++ b/components/camel-amqp/src/main/java/org/apache/camel/component/amqp/AMQPConnectionDetails.java
@@ -28,17 +28,29 @@ public class AMQPConnectionDetails {
public static final String AMQP_USERNAME = "AMQP_SERVICE_USERNAME";
public static final String AMQP_PASSWORD = "AMQP_SERVICE_PASSWORD";
+
+ public static final String AMQP_SET_TOPIC_PREFIX = "AMQP_SET_TOPIC_PREFIX";
private final String uri;
private final String username;
private final String password;
+
+ private final boolean setTopicPrefix;
public AMQPConnectionDetails(String uri, String username, String password) {
this.uri = uri;
this.username = username;
this.password = password;
+ this.setTopicPrefix = true;
+ }
+
+ public AMQPConnectionDetails(String uri, String username, String password, boolean setTopicPrefix) {
+ this.uri = uri;
+ this.username = username;
+ this.password = password;
+ this.setTopicPrefix = setTopicPrefix;
}
public AMQPConnectionDetails(String uri) {
@@ -53,8 +65,9 @@ public class AMQPConnectionDetails {
int port = Integer.parseInt(property(propertiesComponent, AMQP_PORT, "5672"));
String username = property(propertiesComponent, AMQP_USERNAME, null);
String password = property(propertiesComponent, AMQP_PASSWORD, null);
+ boolean setTopicPrefix = Boolean.parseBoolean(property(propertiesComponent, AMQP_SET_TOPIC_PREFIX, "true"));
- return new AMQPConnectionDetails("amqp://" + host + ":" + port, username, password);
+ return new AMQPConnectionDetails("amqp://" + host + ":" + port, username, password, setTopicPrefix);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -71,6 +84,10 @@ public class AMQPConnectionDetails {
public String password() {
return password;
}
+
+ public boolean setTopicPrefix() {
+ return setTopicPrefix;
+ }
// Helpers
diff --git a/components/camel-amqp/src/test/java/org/apache/camel/component/amqp/artemis/AMQPEmbeddedBrokerTest.java b/components/camel-amqp/src/test/java/org/apache/camel/component/amqp/artemis/AMQPEmbeddedBrokerTest.java
new file mode 100644
index 0000000..563aa46
--- /dev/null
+++ b/components/camel-amqp/src/test/java/org/apache/camel/component/amqp/artemis/AMQPEmbeddedBrokerTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.amqp.artemis;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.amqp.AMQPComponent;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.impl.PropertyPlaceholderDelegateRegistry;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.apache.camel.component.amqp.AMQPConnectionDetails.AMQP_PORT;
+import static org.apache.camel.component.amqp.AMQPConnectionDetails.AMQP_SET_TOPIC_PREFIX;
+import static org.apache.camel.component.amqp.AMQPConnectionDetails.discoverAMQP;
+
+public class AMQPEmbeddedBrokerTest extends CamelTestSupport {
+
+ static int amqpPort = AvailablePortFinder.getNextAvailable();
+
+ static EmbeddedActiveMQ server = new EmbeddedActiveMQ();
+
+ @EndpointInject(uri = "mock:result")
+ MockEndpoint resultEndpoint;
+
+ String expectedBody = "Hello there!";
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ Configuration config = new ConfigurationImpl();
+ AddressSettings addressSettings = new AddressSettings();
+ // Disable auto create address to make sure that topic name is correct without prefix
+ addressSettings.setAutoCreateAddresses(false);
+ config.addAcceptorConfiguration("amqp", "tcp://0.0.0.0:" + amqpPort
+ + "?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300");
+ config.setPersistenceEnabled(false);
+ config.addAddressesSetting("#", addressSettings);
+ config.setSecurityEnabled(false);
+
+ // Set explicit topic name
+ CoreAddressConfiguration pingTopicConfig = new CoreAddressConfiguration();
+ pingTopicConfig.setName("topic.ping");
+ pingTopicConfig.addRoutingType(RoutingType.MULTICAST);
+
+ config.addAddressConfiguration(pingTopicConfig);
+
+ server.setConfiguration(config);
+ server.start();
+ System.setProperty(AMQP_PORT, amqpPort + "");
+ System.setProperty(AMQP_SET_TOPIC_PREFIX, "false");
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ server.stop();
+ }
+
+ @Test
+ public void testTopicWithoutPrefix() throws Exception {
+ resultEndpoint.expectedMessageCount(1);
+ template.sendBody("direct:send-topic", expectedBody);
+ resultEndpoint.assertIsSatisfied();
+ }
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry registry = super.createRegistry();
+ return registry;
+ }
+
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext camelContext = super.createCamelContext();
+ JndiRegistry registry = (JndiRegistry)((PropertyPlaceholderDelegateRegistry)camelContext.getRegistry()).getRegistry();
+ registry.bind("amqpConnection", discoverAMQP(camelContext));
+ camelContext.addComponent("amqp-customized", new AMQPComponent());
+ return camelContext;
+ }
+
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ from("direct:send-topic")
+ .to("amqp-customized:topic:topic.ping");
+
+ from("amqp-customized:topic:topic.ping")
+ .to("log:routing")
+ .to("mock:result");
+ }
+ };
+ }
+}