You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/05/07 21:35:06 UTC
[2/4] activemq-artemis git commit: ACTIVEMQ6-100 Add support for
HornetQ clients
ACTIVEMQ6-100 Add support for HornetQ clients
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/77efc950
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/77efc950
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/77efc950
Branch: refs/heads/master
Commit: 77efc950af9bd6af69561d6b7f5e701c2740cf22
Parents: f07af67
Author: Martyn Taylor <mt...@redhat.com>
Authored: Thu May 7 12:54:24 2015 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu May 7 14:59:46 2015 -0400
----------------------------------------------------------------------
.../artemis-hornetq-protocol/pom.xml | 53 +++++
.../HQPropertiesConversionInterceptor.java | 101 ++++++++
.../hornetq/HornetQProtocolManager.java | 62 +++++
.../hornetq/HornetQProtocolManagerFactory.java | 46 ++++
...mis.spi.core.protocol.ProtocolManagerFactory | 1 +
artemis-protocols/pom.xml | 1 +
.../protocol/core/impl/CoreProtocolManager.java | 4 +-
tests/extra-tests/pom.xml | 44 ++--
.../protocols/hornetq/HornetQProtocolTest.java | 232 +++++++++++++++++++
9 files changed, 525 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77efc950/artemis-protocols/artemis-hornetq-protocol/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-hornetq-protocol/pom.xml b/artemis-protocols/artemis-hornetq-protocol/pom.xml
new file mode 100644
index 0000000..7cfc745
--- /dev/null
+++ b/artemis-protocols/artemis-hornetq-protocol/pom.xml
@@ -0,0 +1,53 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>artemis-protocols</artifactId>
+ <groupId>org.apache.activemq</groupId>
+ <version>1.0.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>artemis-hornetq-protocol</artifactId>
+
+ <properties>
+ <activemq.basedir>${project.basedir}/../..</activemq.basedir>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.jboss.logging</groupId>
+ <artifactId>jboss-logging-processor</artifactId>
+ <scope>provided</scope>
+ <optional>true</optional>
+ </dependency>
+
+ <!--
+ JBoss Logging
+ -->
+ <dependency>
+ <groupId>org.jboss.logging</groupId>
+ <artifactId>jboss-logging</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>artemis-server</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77efc950/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java
new file mode 100644
index 0000000..93ec8bf
--- /dev/null
+++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.hornetq;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.protocol.core.Packet;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePacket;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+public class HQPropertiesConversionInterceptor implements Interceptor
+{
+ private static Map<SimpleString, SimpleString> dictionary;
+
+ static
+ {
+ Map<SimpleString, SimpleString> d = new HashMap<SimpleString, SimpleString>();
+
+ // Add entries for outgoing messages
+ d.put(new SimpleString("_HQ_ACTUAL_EXPIRY"), new SimpleString("_AMQ_ACTUAL_EXPIRY"));
+ d.put(new SimpleString("_HQ_ORIG_ADDRESS"), new SimpleString("_AMQ_ORIG_ADDRESS"));
+ d.put(new SimpleString("_HQ_ORIG_QUEUE"), new SimpleString("_AMQ_ORIG_QUEUE"));
+ d.put(new SimpleString("_HQ_ORIG_MESSAGE_ID"), new SimpleString("_AMQ_ORIG_MESSAGE_ID"));
+ d.put(new SimpleString("_HQ_GROUP_ID"), new SimpleString("_AMQ_GROUP_ID"));
+ d.put(new SimpleString("_HQ_LARGE_COMPRESSED"), new SimpleString("_AMQ_LARGE_COMPRESSED"));
+ d.put(new SimpleString("_HQ_LARGE_SIZE"), new SimpleString("_AMQ_LARGE_SIZE"));
+ d.put(new SimpleString("_HQ_SCHED_DELIVERY"), new SimpleString("_AMQ_SCHED_DELIVERY"));
+ d.put(new SimpleString("_HQ_DUPL_ID"), new SimpleString("_AMQ_DUPL_ID"));
+ d.put(new SimpleString("_HQ_LVQ_NAME"), new SimpleString("_AMQ_LVQ_NAME"));
+
+ // Add entries for incoming messages
+ d.put(new SimpleString("_AMQ_ACTUAL_EXPIRY"), new SimpleString("_HQ_ACTUAL_EXPIRY"));
+ d.put(new SimpleString("_AMQ_ORIG_ADDRESS"), new SimpleString("_HQ_ORIG_ADDRESS"));
+ d.put(new SimpleString("_AMQ_ORIG_QUEUE"), new SimpleString("_HQ_ORIG_QUEUE"));
+ d.put(new SimpleString("_AMQ_ORIG_MESSAGE_ID"), new SimpleString("_HQ_ORIG_MESSAGE_ID"));
+ d.put(new SimpleString("_AMQ_GROUP_ID"), new SimpleString("_HQ_GROUP_ID"));
+ d.put(new SimpleString("_AMQ_LARGE_COMPRESSED"), new SimpleString("_HQ_LARGE_COMPRESSED"));
+ d.put(new SimpleString("_AMQ_LARGE_SIZE"), new SimpleString("_HQ_LARGE_SIZE"));
+ d.put(new SimpleString("_AMQ_SCHED_DELIVERY"), new SimpleString("_HQ_SCHED_DELIVERY"));
+ d.put(new SimpleString("_AMQ_DUPL_ID"), new SimpleString("_HQ_DUPL_ID"));
+ d.put(new SimpleString("_AMQ_LVQ_NAME"), new SimpleString("_HQ_LVQ_NAME"));
+
+ dictionary = Collections.unmodifiableMap(d);
+ }
+
+ @Override
+ public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException
+ {
+ if (isMessagePacket(packet))
+ {
+ handleReceiveMessage((MessagePacket) packet);
+ }
+ return true;
+ }
+
+ private void handleReceiveMessage(MessagePacket messagePacket)
+ {
+ Message message = messagePacket.getMessage();
+ // We are modifying the key set so we iterate over a shallow copy.
+ for (SimpleString property : new HashSet<>(message.getPropertyNames()))
+ {
+ if (dictionary.containsKey(property))
+ {
+ message.putObjectProperty(dictionary.get(property), message.removeProperty(property));
+ }
+ }
+ }
+
+ private boolean isMessagePacket(Packet packet)
+ {
+ int type = packet.getType();
+ return type == PacketImpl.SESS_SEND ||
+ type == PacketImpl.SESS_SEND_CONTINUATION ||
+ type == PacketImpl.SESS_SEND_LARGE ||
+ type == PacketImpl.SESS_RECEIVE_LARGE_MSG ||
+ type == PacketImpl.SESS_RECEIVE_MSG;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77efc950/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java
new file mode 100644
index 0000000..c8d113e
--- /dev/null
+++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.hornetq;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManager;
+import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+/**
+ * HornetQ Protocol Manager
+ */
+class HornetQProtocolManager extends CoreProtocolManager
+{
+ HornetQProtocolManager(CoreProtocolManagerFactory factory, ActiveMQServer server, List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors)
+ {
+ super(factory, server, incomingInterceptors, outgoingInterceptors);
+ }
+
+ @Override
+ public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer)
+ {
+ //if we are not an old client then handshake
+ if (buffer.getByte(0) == 'H' &&
+ buffer.getByte(1) == 'O' &&
+ buffer.getByte(2) == 'R' &&
+ buffer.getByte(3) == 'N' &&
+ buffer.getByte(4) == 'E' &&
+ buffer.getByte(5) == 'T' &&
+ buffer.getByte(6) == 'Q')
+ {
+ //todo add some handshaking
+ buffer.readBytes(7);
+ }
+ }
+
+ @Override
+ public boolean isProtocol(byte[] array)
+ {
+ String frameStart = new String(array, StandardCharsets.US_ASCII);
+ return frameStart.startsWith("HORNETQ");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77efc950/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java
new file mode 100644
index 0000000..956fdb7
--- /dev/null
+++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.hornetq;
+
+import java.util.List;
+
+
+import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
+
+public class HornetQProtocolManagerFactory extends CoreProtocolManagerFactory
+{
+ public static final String HORNETQ_PROTOCOL_NAME = "HORNETQ";
+
+ private static String[] SUPPORTED_PROTOCOLS = {HORNETQ_PROTOCOL_NAME};
+
+ public ProtocolManager createProtocolManager(final ActiveMQServer server, final List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors)
+ {
+ Interceptor propertyConversionInterceptor = new HQPropertiesConversionInterceptor();
+ incomingInterceptors.add(propertyConversionInterceptor);
+ outgoingInterceptors.add(propertyConversionInterceptor);
+ return new HornetQProtocolManager(this, server, incomingInterceptors, outgoingInterceptors);
+ }
+
+ @Override
+ public String[] getProtocols()
+ {
+ return SUPPORTED_PROTOCOLS;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77efc950/artemis-protocols/artemis-hornetq-protocol/src/main/resources/META-INF/services/org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/resources/META-INF/services/org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory b/artemis-protocols/artemis-hornetq-protocol/src/main/resources/META-INF/services/org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory
new file mode 100644
index 0000000..059e800
--- /dev/null
+++ b/artemis-protocols/artemis-hornetq-protocol/src/main/resources/META-INF/services/org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory
@@ -0,0 +1 @@
+org.apache.activemq.artemis.core.protocol.hornetq.HornetQProtocolManagerFactory
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77efc950/artemis-protocols/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-protocols/pom.xml b/artemis-protocols/pom.xml
index 4434430..8ced7bb 100644
--- a/artemis-protocols/pom.xml
+++ b/artemis-protocols/pom.xml
@@ -35,6 +35,7 @@
<module>artemis-stomp-protocol</module>
<module>artemis-openwire-protocol</module>
<module>artemis-proton-plug</module>
+ <module>artemis-hornetq-protocol</module>
</modules>
</project>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77efc950/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
index f56e017..4c5b5ca 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
@@ -59,7 +59,7 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
-class CoreProtocolManager implements ProtocolManager<Interceptor>
+public class CoreProtocolManager implements ProtocolManager<Interceptor>
{
private static final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
@@ -71,7 +71,7 @@ class CoreProtocolManager implements ProtocolManager<Interceptor>
private final CoreProtocolManagerFactory protocolManagerFactory;
- CoreProtocolManager(final CoreProtocolManagerFactory factory, final ActiveMQServer server, final List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors)
+ public CoreProtocolManager(final CoreProtocolManagerFactory factory, final ActiveMQServer server, final List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors)
{
this.protocolManagerFactory = factory;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77efc950/tests/extra-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/extra-tests/pom.xml b/tests/extra-tests/pom.xml
index 8ad559c..364f2c6 100644
--- a/tests/extra-tests/pom.xml
+++ b/tests/extra-tests/pom.xml
@@ -36,6 +36,8 @@
<tools.jar>${java.home}/../lib/tools.jar</tools.jar>
<byteman.version>2.2.0</byteman.version>
<activemq.basedir>${project.basedir}/../..</activemq.basedir>
+ <jboss-jts.version>4.17.13.Final</jboss-jts.version>
+ <hornetq.version>2.4.7.Final</hornetq.version>
</properties>
<dependencies>
@@ -179,19 +181,38 @@
<scope>test</scope>
</dependency>
-
- <!-- Needed for JMS Bridge Tests -->
<dependency>
- <groupId>org.jboss.jbossts.jts</groupId>
- <artifactId>jbossjts-jacorb</artifactId>
- <version>4.17.13.Final</version>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>artemis-hornetq-protocol</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
</dependency>
+
+ <!-- Needed for JMS Bridge Tests -->
<dependency>
<groupId>org.jboss</groupId>
<artifactId>jboss-transaction-spi</artifactId>
<version>7.1.0.Final</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.hornetq</groupId>
+ <artifactId>hornetq-commons</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hornetq</groupId>
+ <artifactId>hornetq-core-client</artifactId>
+ <version>${hornetq.version}</version>
</dependency>
+ <!-- Needed for XA tests -->
+ <dependency>
+ <groupId>org.jboss.jbossts.jts</groupId>
+ <artifactId>jbossjts-jacorb</artifactId>
+ <version>4.17.13.Final</version>
+ </dependency>
</dependencies>
<build>
@@ -211,7 +232,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
- <version>2.18.1</version>
<configuration>
<skipTests>${skipExtraTests}</skipTests>
<!-- ensure we don't inherit a byteman jar form any env settings -->
@@ -219,14 +239,6 @@
<BYTEMAN_HOME></BYTEMAN_HOME>
</environmentVariables>
<systemProperties>
- <property>
- <name>com.arjuna.ats.arjuna.objectstore.objectStoreDir</name>
- <value>target/ObjectStore</value>
- </property>
- <property>
- <name>ObjectStoreEnvironmentBean.objectStoreDir</name>
- <value>target/ObjectStore</value>
- </property>
<!--
<property>
<name>org.jboss.byteman.home</name>
@@ -248,9 +260,7 @@
</systemProperties>
<!-- make sure maven puts the byteman jar in the classpath rather than in a manifest jar -->
<useManifestOnlyJar>false</useManifestOnlyJar>
- <!-- when upgrading this plugin from 2.4 to 2.18.1 <forkMode>once</forkMode> was replaced with these: -->
- <forkCount>1</forkCount>
- <reuseForks>true</reuseForks>
+ <forkMode>once</forkMode>
<!--
<debugForkedProcess>true</debugForkedProcess>
-->
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77efc950/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/protocols/hornetq/HornetQProtocolTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/protocols/hornetq/HornetQProtocolTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/protocols/hornetq/HornetQProtocolTest.java
new file mode 100644
index 0000000..0b37887
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/protocols/hornetq/HornetQProtocolTest.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.extras.protocols.hornetq;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.tests.util.ServiceTestBase;
+
+import org.hornetq.api.core.client.HornetQClient;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+public class HornetQProtocolTest extends ServiceTestBase
+{
+ protected ActiveMQServer server;
+
+ private static final Logger LOG = LoggerFactory.getLogger(HornetQProtocolTest.class);
+
+ @Before
+ public void setUp() throws Exception
+ {
+ startBroker();
+ }
+
+ @After
+ public void tearDown() throws Exception
+ {
+ stopBroker();
+ }
+
+ public void startBroker() throws Exception
+ {
+ super.setUp();
+ server = createServer(true, true);
+ addHornetQConnector();
+ server.start();
+ waitForServer(server);
+ }
+
+ public void stopBroker() throws Exception
+ {
+ if (server.isStarted())
+ {
+ server.stop();
+ server = null;
+ }
+ }
+
+ protected void addHornetQConnector() throws Exception
+ {
+ HashMap<String, Object> params = new HashMap<String, Object>();
+ params.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, "" + 5445);
+ params.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PROTOCOLS_PROP_NAME, "HORNETQ");
+ TransportConfiguration transportConfig = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
+ server.getConfiguration().getAcceptorConfigurations().add(transportConfig);
+ LOG.info("Added connector {} to broker", "HornetQ");
+ }
+
+ @Test
+ public void testMessagePropertiesAreTransformedBetweenCoreAndHQProtocols() throws Exception
+ {
+ org.hornetq.api.core.client.ClientSession hqSession = createHQClientSession();
+ ClientSession coreSession = createCoreClientSession();
+
+ // Create Queue
+ String queueName = "test.hq.queue";
+ hqSession.createQueue(queueName, queueName, true);
+
+ // HornetQ Client Objects
+ hqSession.start();
+ org.hornetq.api.core.client.ClientProducer hqProducer = hqSession.createProducer(queueName);
+ org.hornetq.api.core.client.ClientConsumer hqConsumer = hqSession.createConsumer(queueName);
+
+ // Core Client Objects
+ coreSession.start();
+ ClientConsumer coreConsumer = coreSession.createConsumer(queueName);
+
+ // Check that HornetQ Properties are correctly converted to core properties.
+ for (int i = 0; i < 2; i++)
+ {
+ hqProducer.send(createHQTestMessage(hqSession));
+ }
+
+ ClientMessage coreMessage1 = coreConsumer.receive(1000);
+ assertTrue(coreMessage1.containsProperty(Message.HDR_DUPLICATE_DETECTION_ID));
+ coreSession.close();
+
+ // Check that HornetQ Properties are correctly transformed from then to HornetQ properties
+ org.hornetq.api.core.client.ClientMessage hqMessage1 = hqConsumer.receive(1000);
+ assertTrue(hqMessage1.containsProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID));
+
+ hqSession.close();
+ }
+
+ @Test
+ public void testDuplicateIDPropertyWithHornetQProtocol() throws Exception
+ {
+ org.hornetq.api.core.client.ClientSession session = createHQClientSession();
+
+ String queueName = "test.hq.queue";
+ session.createQueue(queueName, queueName, true);
+
+ org.hornetq.api.core.client.ClientProducer producer = session.createProducer(queueName);
+ org.hornetq.api.core.client.ClientConsumer consumer = session.createConsumer(queueName);
+ org.hornetq.api.core.client.ClientMessage message = session.createMessage(false);
+
+ String messageId = UUID.randomUUID().toString();
+ message.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageId);
+
+ session.start();
+ producer.send(message);
+ org.hornetq.api.core.client.ClientMessage m = consumer.receive(1000);
+ assertTrue(m.containsProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID));
+ assertNotNull(m);
+
+ producer.send(message);
+ m = consumer.receive(1000);
+ assertNull(m);
+
+ producer.send(message);
+ m = consumer.receive(1000);
+ assertNull(m);
+
+ session.close();
+ }
+
+ @Test
+ public void testDuplicateIDPropertyWithHornetQAndCoreProtocol() throws Exception
+ {
+ org.hornetq.api.core.client.ClientSession hqSession = createHQClientSession();
+
+ String queueName = "test.hq.queue";
+ hqSession.createQueue(queueName, queueName, true);
+
+ org.hornetq.api.core.client.ClientProducer hqProducer = hqSession.createProducer(queueName);
+ org.hornetq.api.core.client.ClientMessage message = hqSession.createMessage(false);
+
+ String messageId = UUID.randomUUID().toString();
+ message.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageId);
+
+ ClientSession coreSession = createCoreClientSession();
+ ClientConsumer coreConsumer = coreSession.createConsumer(queueName);
+
+ hqSession.start();
+ coreSession.start();
+
+ hqProducer.send(message);
+ Message m = coreConsumer.receive(1000);
+ assertTrue(m.containsProperty(Message.HDR_DUPLICATE_DETECTION_ID));
+ assertNotNull(m);
+
+
+ hqProducer.send(message);
+ m = coreConsumer.receive(1000);
+ assertNull(m);
+
+ hqProducer.send(message);
+ m = coreConsumer.receive(1000);
+ assertNull(m);
+ }
+
+ private org.hornetq.api.core.client.ClientMessage createHQTestMessage(org.hornetq.api.core.client.ClientSession session)
+ {
+ org.hornetq.api.core.client.ClientMessage message = session.createMessage(false);
+ String v = UUID.randomUUID().toString();
+ message.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), v);
+ return message;
+ }
+
+ private ClientMessage createCoreTestMessage(ClientSession session)
+ {
+ ClientMessage message = session.createMessage(false);
+ String v = UUID.randomUUID().toString();
+ message.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), v);
+ return message;
+ }
+
+ private org.hornetq.api.core.client.ClientSession createHQClientSession() throws Exception
+ {
+ Map<String, Object> map = new HashMap<String, Object>();
+ map.put("host", "localhost");
+ map.put("port", 5445);
+
+ org.hornetq.api.core.client.ServerLocator serverLocator = HornetQClient.createServerLocatorWithoutHA(new org.hornetq.api.core.TransportConfiguration(org.hornetq.core.remoting.impl.netty.NettyConnectorFactory.class.getName(), map));
+ org.hornetq.api.core.client.ClientSessionFactory sf = serverLocator.createSessionFactory();
+
+ return sf.createSession();
+ }
+
+ private ClientSession createCoreClientSession() throws Exception
+ {
+ Map<String, Object> map = new HashMap<String, Object>();
+ map.put("host", "localhost");
+ map.put("port", 61616);
+
+ ServerLocator serverLocator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), map));
+ ClientSessionFactory sf = serverLocator.createSessionFactory();
+
+ return sf.createSession();
+ }
+}