You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/06 03:58:20 UTC
[02/21] activemq-artemis git commit: ARTEMIS-1019 removing vertx
integration
ARTEMIS-1019 removing vertx integration
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c0fe1876
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c0fe1876
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c0fe1876
Branch: refs/heads/artemis-1009
Commit: c0fe187666c0082b8491b1d282cdc93ddc7cdb1e
Parents: 2a3885d
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Feb 6 19:57:16 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Sun Mar 5 22:57:07 2017 -0500
----------------------------------------------------------------------
artemis-distribution/pom.xml | 5 -
docs/user-manual/en/SUMMARY.md | 1 -
docs/user-manual/en/vertx-integration.md | 88 ---
examples/features/sub-modules/pom.xml | 1 -
examples/features/sub-modules/vertx/pom.xml | 142 ----
examples/features/sub-modules/vertx/readme.html | 103 ---
.../artemis/core/example/ExampleVerticle.java | 53 --
.../core/example/VertxConnectorExample.java | 103 ---
.../main/resources/activemq/server0/broker.xml | 82 --
integration/activemq-vertx-integration/pom.xml | 144 ----
.../integration/vertx/ActiveMQVertxLogger.java | 55 --
.../vertx/IncomingVertxEventHandler.java | 265 -------
.../vertx/OutgoingVertxEventHandler.java | 290 -------
.../integration/vertx/VertxConstants.java | 82 --
.../VertxIncomingConnectorServiceFactory.java | 51 --
.../VertxOutgoingConnectorServiceFactory.java | 49 --
pom.xml | 6 -
tests/integration-tests/pom.xml | 38 -
.../integration/mqtt/imported/MQTTTest.java | 2 +-
.../vertx/ActiveMQVertxUnitTest.java | 774 -------------------
20 files changed, 1 insertion(+), 2333 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c0fe1876/artemis-distribution/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-distribution/pom.xml b/artemis-distribution/pom.xml
index 1236f17..7d5cc49 100644
--- a/artemis-distribution/pom.xml
+++ b/artemis-distribution/pom.xml
@@ -78,11 +78,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>artemis-vertx-integration</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
<groupId>org.apache.activemq.rest</groupId>
<artifactId>artemis-rest</artifactId>
<version>${project.version}</version>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c0fe1876/docs/user-manual/en/SUMMARY.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/SUMMARY.md b/docs/user-manual/en/SUMMARY.md
index 4030aa9..b612a5e 100644
--- a/docs/user-manual/en/SUMMARY.md
+++ b/docs/user-manual/en/SUMMARY.md
@@ -52,7 +52,6 @@
* [Apache Karaf](karaf.md)
* [Spring Integration](spring-integration.md)
* [AeroGear Integration](aerogear-integration.md)
-* [VertX Integration](vertx-integration.md)
* [CDI Integration](cdi-integration.md)
* [Intercepting Operations](intercepting-operations.md)
* [Protocols and Interoperability](protocols-interoperability.md)
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c0fe1876/docs/user-manual/en/vertx-integration.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/vertx-integration.md b/docs/user-manual/en/vertx-integration.md
deleted file mode 100644
index f823f11..0000000
--- a/docs/user-manual/en/vertx-integration.md
+++ /dev/null
@@ -1,88 +0,0 @@
-# Vert.x Integration
-
-[Vert.x](http://vertx.io/) is a lightweight, high performance
-application platform for the JVM that's designed for modern mobile, web,
-and enterprise applications. Vert.x provides a distributed event bus
-that allows messages to be sent across vert.x instances and clients. You
-can now redirect and persist any vert.x messages to Apache ActiveMQ Artemis and route
-those messages to a specified vertx address by configuring Apache ActiveMQ Artemis
-vertx incoming and outgoing vertx connector services.
-
-## Configuring a Vertx Incoming Connector Service
-
-Vertx Incoming Connector services receive messages from vertx event bus
-and route them to an Apache ActiveMQ Artemis queue. Such a service can be configured as
-follows:
-
- <connector-service name="vertx-incoming-connector">
- <factory-class>org.apache.activemq.integration.vertx.VertxIncomingConnectorServiceFactory</factory-class>
- <param key="host" value="127.0.0.1"/>
- <param key="port" value="0"/>
- <param key="queue" value="jms.queue.vertxQueue"/>
- <param key="vertx-address" value="vertx.in.eventaddress"/>
- </connector-service>
-
-
-Shown are the required params for the connector service:
-
-- `queue`. The name of the Apache ActiveMQ Artemis queue to send message to.
-
-As well as these required parameters there are the following optional
-parameters
-
-- `host`. The host name on which the vertx target container is
- running. Default is localhost.
-
-- `port`. The port number to which the target vertx listens. Default
- is zero.
-
-- `quorum-size`. The quorum size of the target vertx instance.
-
-- `ha-group`. The name of the ha-group of target vertx instance.
- Default is `activemq`.
-
-- `vertx-address`. The vertx address to listen to. default is
- `org.apache.activemq`.
-
-## Configuring a Vertx Outgoing Connector Service
-
-Vertx Outgoing Connector services fetch vertx messages from a ActiveMQ
-queue and put them to vertx event bus. Such a service can be configured
-as follows:
-
- <connector-service name="vertx-outgoing-connector">
- <factory-class>org.apache.activemq.integration.vertx.VertxOutgoingConnectorServiceFactory</factory-class>
- <param key="host" value="127.0.0.1"/>
- <param key="port" value="0"/>
- <param key="queue" value="jms.queue.vertxQueue"/>
- <param key="vertx-address" value="vertx.out.eventaddress"/>
- <param key="publish" value="true"/>
- </connector-service>
-
-
-Shown are the required params for the connector service:
-
-- `queue`. The name of the Apache ActiveMQ Artemis queue to fetch message from.
-
-As well as these required parameters there are the following optional
-parameters
-
-- `host`. The host name on which the vertx target container is
- running. Default is localhost.
-
-- `port`. The port number to which the target vertx listens. Default
- is zero.
-
-- `quorum-size`. The quorum size of the target vertx instance.
-
-- `ha-group`. The name of the ha-group of target vertx instance.
- Default is `activemq`.
-
-- `vertx-address`. The vertx address to put messages to. default is
- org.apache.activemq.
-
-- `publish`. How messages is sent to vertx event bus. "true" means
- using publish style. "false" means using send style. Default is
- false.
-
-
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c0fe1876/examples/features/sub-modules/pom.xml
----------------------------------------------------------------------
diff --git a/examples/features/sub-modules/pom.xml b/examples/features/sub-modules/pom.xml
index d59f915..ef47365 100644
--- a/examples/features/sub-modules/pom.xml
+++ b/examples/features/sub-modules/pom.xml
@@ -51,7 +51,6 @@ under the License.
<modules>
<module>aerogear</module>
<module>artemis-ra-rar</module>
- <module>vertx</module>
</modules>
</profile>
</profiles>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c0fe1876/examples/features/sub-modules/vertx/pom.xml
----------------------------------------------------------------------
diff --git a/examples/features/sub-modules/vertx/pom.xml b/examples/features/sub-modules/vertx/pom.xml
deleted file mode 100644
index c09aada..0000000
--- a/examples/features/sub-modules/vertx/pom.xml
+++ /dev/null
@@ -1,142 +0,0 @@
-<?xml version='1.0'?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.activemq.examples.modules</groupId>
- <artifactId>broker-modules</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>artemis-vertx-example</artifactId>
- <packaging>jar</packaging>
- <name>ActiveMQ Artemis Vert.x Example</name>
-
- <properties>
- <activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
- <vertx.version>2.1.2</vertx.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>artemis-server</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>artemis-core-client</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>artemis-commons</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- <version>${netty.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-jms_2.0_spec</artifactId>
- </dependency>
- <dependency>
- <groupId>io.vertx</groupId>
- <artifactId>vertx-core</artifactId>
- <version>${vertx.version}</version>
- </dependency>
- <dependency>
- <groupId>io.vertx</groupId>
- <artifactId>vertx-platform</artifactId>
- <version>${vertx.version}</version>
- </dependency>
- <dependency>
- <groupId>io.vertx</groupId>
- <artifactId>vertx-hazelcast</artifactId>
- <version>${vertx.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>artemis-vertx-integration</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.activemq</groupId>
- <artifactId>artemis-maven-plugin</artifactId>
- <executions>
- <execution>
- <id>create0</id>
- <goals>
- <goal>create</goal>
- </goals>
- <configuration>
- <libListWithDeps>
- <arg>org.apache.activemq.examples.modules:artemis-vertx-example:${project.version}</arg>
- </libListWithDeps>
-
- <instance>${basedir}/target/server0</instance>
- <configuration>${basedir}/target/classes/activemq/server0</configuration>
- </configuration>
- </execution>
- <execution>
- <id>start0</id>
- <goals>
- <goal>cli</goal>
- </goals>
- <configuration>
- <ignore>${noServer}</ignore>
- <spawn>true</spawn>
- <location>${basedir}/target/server0</location>
- <testURI>tcp://localhost:61616</testURI>
- <args>
- <param>run</param>
- </args>
- <name>server0</name>
- </configuration>
- </execution>
- <execution>
- <id>runClient</id>
- <goals>
- <goal>runClient</goal>
- </goals>
- <configuration>
- <clientClass>org.apache.activemq.artemis.core.example.VertxConnectorExample</clientClass>
- </configuration>
- </execution>
- </executions>
- <dependencies>
- <dependency>
- <groupId>org.apache.activemq.examples.modules</groupId>
- <artifactId>artemis-vertx-example</artifactId>
- <version>${project.version}</version>
- </dependency>
- </dependencies>
- </plugin>
- </plugins>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c0fe1876/examples/features/sub-modules/vertx/readme.html
----------------------------------------------------------------------
diff --git a/examples/features/sub-modules/vertx/readme.html b/examples/features/sub-modules/vertx/readme.html
deleted file mode 100644
index e8f053a..0000000
--- a/examples/features/sub-modules/vertx/readme.html
+++ /dev/null
@@ -1,103 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-<html>
- <head>
- <title>ActiveMQ Artemis Vert.x Connector Service Example</title>
- <link rel="stylesheet" type="text/css" href="../../../common/common.css" />
- <link rel="stylesheet" type="text/css" href="../../../common/prettify.css" />
- <script type="text/javascript" src="../../../common/prettify.js"></script>
- </head>
- <body onload="prettyPrint()">
- <h1>Vert.x Connector Service Example</h1>
-
- <p>This example shows you how to configure ActiveMQ Artemis to use the Vert.x Connector Service.</p>
-
- <p>ActiveMQ Artemis supports 2 types of Vert.x connector, incoming and outgoing.
- Incoming connector consumes from Vert.x event bus and forwards to a configurable address.
- Outgoing connector consumes from a configurable address and forwards to a configurable Vert.x event bus.
- </p>
-
- <p>In this example, an incoming connector and an outgoing connector are configured. A simple java Verticle
- is deployed. The verticle registers a message handler on the outgoing connector's address ("outgoing.vertx.address").
- A String message is sent to Vert.x event bus on the incoming connector's address("incoming.vertx.address").
- The message then will be forwarded to a ActiveMQ Artemis queue by the incoming connector. The outgoing connector listens to
- the ActiveMQ Artemis queue and forwards the message from ActiveMQ Artemis to Vert.x event bus on the outgoing connector's address.
- The verticle finally receives the message from it's event bus.</p>
-
- <p>For more information on Vert.x concept please visit the <a href="http://vertx.io/">Vertx site</a></p>
-
- <h2>Example step-by-step</h2>
- <p><i>To run the server, simply type <code>mvn verify</code>
- from this directory.</p>
-
- <ol>
- <li>First we need to create a Vert.x PlatformManager</li>
- <pre class="prettyprint">
- <code>platformManager = PlatformLocator.factory.createPlatformManager(PORT, HOST);</code>
- </pre>
-
- <li>We deploy a Verticle using the platformManager</li>
- <pre class="prettyprint">
- <code>String verticle = "org.apache.activemq.artemis.core.example.ExampleVerticle";
- platformManager.deployVerticle(verticle, null, new URL[0], 1, null,
- new Handler<AsyncResult<String>>(){
-
- @Override
- public void handle(AsyncResult<String> result)
- {
- if (!result.succeeded())
- {
- throw new RuntimeException("failed to deploy verticle", result.cause());
- }
- latch0.countDown();
- }
-
- });</code>
- </pre>
-
- <li>We register a message handler with the event bus in the Verticle to listen on the outgoing connector's address.</li>
- <pre class="prettyprint">
- <code>EventBus eventBus = vertx.eventBus();
- eventBus.registerHandler(VertxConnectorExample.OUTGOING,
- new Handler<Message<?>>() {
- @Override
- public void handle(Message<?> startMsg)
- {
- Object body = startMsg.body();
- System.out.println("Verticle receives a message: " + body);
- VertxConnectorExample.result.set(VertxConnectorExample.MSG.equals(body));
- latch0.countDown();
- }
- });
- </code>
- </pre>
-
- <li>We send a message to incoming connector's address via event bus</li>
- <pre class="prettyprint">
- <code>
- EventBus bus = platformManager.vertx().eventBus();
- bus.send(INCOMING, MSG);
- </code>
- </pre>
-
- <li>The message will eventually arrives at the Verticle's message handler.</li>
- </ol>
- </body>
-</html>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c0fe1876/examples/features/sub-modules/vertx/src/main/java/org/apache/activemq/artemis/core/example/ExampleVerticle.java
----------------------------------------------------------------------
diff --git a/examples/features/sub-modules/vertx/src/main/java/org/apache/activemq/artemis/core/example/ExampleVerticle.java b/examples/features/sub-modules/vertx/src/main/java/org/apache/activemq/artemis/core/example/ExampleVerticle.java
deleted file mode 100644
index 3f248af..0000000
--- a/examples/features/sub-modules/vertx/src/main/java/org/apache/activemq/artemis/core/example/ExampleVerticle.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.example;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.vertx.java.core.Handler;
-import org.vertx.java.core.eventbus.EventBus;
-import org.vertx.java.core.eventbus.Message;
-import org.vertx.java.platform.Verticle;
-
-public class ExampleVerticle extends Verticle {
-
- @Override
- public void start() {
- EventBus eventBus = vertx.eventBus();
-
- final CountDownLatch latch0 = new CountDownLatch(1);
-
- // Register a handler on the outgoing connector's address
- eventBus.registerHandler(VertxConnectorExample.OUTGOING, new Handler<Message<?>>() {
- @Override
- public void handle(Message<?> startMsg) {
- Object body = startMsg.body();
- System.out.println("Verticle receives a message: " + body);
- VertxConnectorExample.result.set(VertxConnectorExample.MSG.equals(body));
- latch0.countDown();
- //Tell the example to finish.
- VertxConnectorExample.latch.countDown();
- }
- });
-
- try {
- latch0.await(5000, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c0fe1876/examples/features/sub-modules/vertx/src/main/java/org/apache/activemq/artemis/core/example/VertxConnectorExample.java
----------------------------------------------------------------------
diff --git a/examples/features/sub-modules/vertx/src/main/java/org/apache/activemq/artemis/core/example/VertxConnectorExample.java b/examples/features/sub-modules/vertx/src/main/java/org/apache/activemq/artemis/core/example/VertxConnectorExample.java
deleted file mode 100644
index b8e6d98..0000000
--- a/examples/features/sub-modules/vertx/src/main/java/org/apache/activemq/artemis/core/example/VertxConnectorExample.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.example;
-
-import java.net.URL;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.vertx.java.core.AsyncResult;
-import org.vertx.java.core.Handler;
-import org.vertx.java.core.eventbus.EventBus;
-import org.vertx.java.platform.PlatformLocator;
-import org.vertx.java.platform.PlatformManager;
-import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory;
-
-/**
- * A simple example of using Vert.x connector service.
- */
-public class VertxConnectorExample {
-
- public static final String INCOMING = "incoming.vertx.address";
- public static final String OUTGOING = "outgoing.vertx.address";
- public static final String MSG = "Welcome to Vertx world!";
-
- public static final CountDownLatch latch = new CountDownLatch(1);
- public static final AtomicBoolean result = new AtomicBoolean(false);
-
- private static final String HOST = "127.0.0.1";
- private static final int PORT = 0;
-
- public static void main(final String[] args) throws Exception {
- System.setProperty("vertx.clusterManagerFactory", HazelcastClusterManagerFactory.class.getName());
- PlatformManager platformManager = null;
-
- try {
- // Step 1 Create a Vert.x PlatformManager
- platformManager = PlatformLocator.factory.createPlatformManager(PORT, HOST);
-
- final CountDownLatch latch0 = new CountDownLatch(1);
-
- // Step 2 Deploy a Verticle to receive message
- String verticle = "org.apache.activemq.artemis.core.example.ExampleVerticle";
- platformManager.deployVerticle(verticle, null, new URL[0], 1, null, new Handler<AsyncResult<String>>() {
-
- @Override
- public void handle(AsyncResult<String> result) {
- if (!result.succeeded()) {
- throw new RuntimeException("failed to deploy verticle", result.cause());
- }
- latch0.countDown();
- }
-
- });
-
- latch0.await();
-
- // Step 3 Send a message to the incoming connector's address
- EventBus bus = platformManager.vertx().eventBus();
- bus.send(INCOMING, MSG);
-
- // Step 4 Waiting for the Verticle to process the message
- latch.await(10000, TimeUnit.MILLISECONDS);
- } finally {
- if (platformManager != null) {
- platformManager.undeployAll(null);
- platformManager.stop();
- }
- reportResultAndExit();
- }
- }
-
- private static void reportResultAndExit() {
- if (!result.get()) {
- System.err.println();
- System.err.println("#####################");
- System.err.println("### FAILURE! ###");
- System.err.println("#####################");
- System.exit(1);
- } else {
- System.out.println();
- System.out.println("#####################");
- System.out.println("### SUCCESS! ###");
- System.out.println("#####################");
- System.exit(0);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c0fe1876/examples/features/sub-modules/vertx/src/main/resources/activemq/server0/broker.xml
----------------------------------------------------------------------
diff --git a/examples/features/sub-modules/vertx/src/main/resources/activemq/server0/broker.xml b/examples/features/sub-modules/vertx/src/main/resources/activemq/server0/broker.xml
deleted file mode 100644
index c550203..0000000
--- a/examples/features/sub-modules/vertx/src/main/resources/activemq/server0/broker.xml
+++ /dev/null
@@ -1,82 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
---><configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
-
- <core xmlns="urn:activemq:core">
- <bindings-directory>target/server0/data/messaging/bindings</bindings-directory>
-
- <journal-directory>target/server0/data/messaging/journal</journal-directory>
-
- <large-messages-directory>target/server0/data/messaging/largemessages</large-messages-directory>
-
- <paging-directory>target/server0/data/messaging/paging</paging-directory>
- <!-- Connectors -->
-
- <connectors>
- <connector name="netty-connector">tcp://localhost:61616</connector>
- </connectors>
-
- <!-- Acceptors -->
- <acceptors>
- <acceptor name="netty-acceptor">tcp://localhost:61616</acceptor>
- </acceptors>
-
- <!-- Other config -->
-
- <security-settings>
- <!--security for example queue-->
- <security-setting match="queue.vertxQueue">
- <permission roles="guest" type="consume"/>
- <permission roles="guest" type="send"/>
- </security-setting>
- </security-settings>
-
-
-
- <connector-services>
- <connector-service name="my-incoming-vertx">
- <factory-class>org.apache.activemq.artemis.integration.vertx.VertxIncomingConnectorServiceFactory</factory-class>
- <param key="queue" value="queue.vertxQueue"/>
- <param key="host" value="localhost"/>
- <param key="port" value="0"/>
- <param key="vertx-address" value="incoming.vertx.address"/>
- </connector-service>
- <connector-service name="my-outgoing-vertx">
- <factory-class>org.apache.activemq.artemis.integration.vertx.VertxOutgoingConnectorServiceFactory</factory-class>
- <param key="queue" value="queue.vertxQueue"/>
- <param key="host" value="localhost"/>
- <param key="port" value="0"/>
- <param key="vertx-address" value="outgoing.vertx.address"/>
- </connector-service>
- </connector-services>
- <addresses>
- <address name="queue.vertxQueue">
- <multicast>
- <queue name="queue.vertxQueue"/>
- </multicast>
- </address>
- <address name="exampleQueue">
- <anycast>
- <queue name="jms.queue.exampleQueue"/>
- </anycast>
- </address>
- </addresses>
- </core>
-
-</configuration>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c0fe1876/integration/activemq-vertx-integration/pom.xml
----------------------------------------------------------------------
diff --git a/integration/activemq-vertx-integration/pom.xml b/integration/activemq-vertx-integration/pom.xml
deleted file mode 100644
index ef8d31c..0000000
--- a/integration/activemq-vertx-integration/pom.xml
+++ /dev/null
@@ -1,144 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.activemq</groupId>
- <artifactId>artemis-pom</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <artifactId>artemis-vertx-integration</artifactId>
- <packaging>jar</packaging>
- <name>ActiveMQ Artemis Vert.x Integration</name>
-
- <properties>
-
- <activemq.basedir>${project.basedir}/../..</activemq.basedir>
-
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-
- <!-- Set pullInDeps to true if you want any modules specified in the 'includes' and 'deploys' fields
- in your mod.json to be automatically pulled in during packaging and added inside your module. Doing this means your
- module won't download and install those dependencies at run-time when they're first requested. -->
- <vertx.pullInDeps>false</vertx.pullInDeps>
-
- <!-- Set createFatJar to true if you want to create a fat executable jar which contains the Vert.x binaries
- along with the module so it can be run with java -jar <jarname> -->
- <vertx.createFatJar>false</vertx.createFatJar>
-
- <!--Vertx module name-->
- <module.name>${project.groupId}~${project.artifactId}~${project.version}</module.name>
-
- <!-- The directory where the module will be assembled - you can override this on the command line
- with -Dmods.directory=mydir -->
- <mods.directory>target/mods</mods.directory>
-
- <!--Dependency versions-->
- <vertx.version>2.1.2</vertx.version>
- <vertx.testtools.version>2.0.3-final</vertx.testtools.version>
- <junit.version>4.11</junit.version>
-
- <!--Plugin versions-->
- <maven.compiler.plugin.version>3.0</maven.compiler.plugin.version>
- <maven.resources.plugin.version>2.6</maven.resources.plugin.version>
- <maven.clean.plugin.version>2.5</maven.clean.plugin.version>
- <maven.vertx.plugin.version>2.0.8-final</maven.vertx.plugin.version>
- <maven.surefire.plugin.version>2.14</maven.surefire.plugin.version>
- <maven.failsafe.plugin.version>2.14</maven.failsafe.plugin.version>
- <maven.surefire.report.plugin.version>2.14</maven.surefire.report.plugin.version>
- <maven.javadoc.plugin.version>2.9</maven.javadoc.plugin.version>
- <maven.dependency.plugin.version>2.7</maven.dependency.plugin.version>
- </properties>
-
- <repositories>
- <repository>
- <id>sonatype-nexus-snapshots</id>
- <url>https://oss.sonatype.org/content/repositories/public</url>
- </repository>
- </repositories>
-
- <dependencies>
- <dependency>
- <groupId>org.jboss.logging</groupId>
- <artifactId>jboss-logging-processor</artifactId>
- <scope>provided</scope>
- <optional>true</optional>
- </dependency>
-
- <!--
- JBoss Logging
- -->
- <dependency>
- <groupId>org.jboss.logging</groupId>
- <artifactId>jboss-logging</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>artemis-server</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <!--Vertx provided dependencies-->
- <dependency>
- <groupId>io.vertx</groupId>
- <artifactId>vertx-core</artifactId>
- <version>${vertx.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>io.vertx</groupId>
- <artifactId>vertx-platform</artifactId>
- <version>${vertx.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>io.vertx</groupId>
- <artifactId>vertx-hazelcast</artifactId>
- <version>${vertx.version}</version>
- <scope>provided</scope>
- </dependency>
- <!--Test dependencies-->
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.11</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>io.vertx</groupId>
- <artifactId>testtools</artifactId>
- <version>${vertx.testtools.version}</version>
- <scope>test</scope>
- </dependency>
-
- <!-- Add any other dependencies that you want packaged into your module (in the lib dir) here
- as 'compile' dependencies. Here is an example
- <dependency>
- <groupId>org.hamcrest</groupId>
- <artifactId>hamcrest-core</artifactId>
- <version>1.3</version>
- <scope>compile</scope>
- </dependency>
- -->
-
- </dependencies>
-
-</project>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c0fe1876/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/ActiveMQVertxLogger.java
----------------------------------------------------------------------
diff --git a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/ActiveMQVertxLogger.java b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/ActiveMQVertxLogger.java
deleted file mode 100644
index 1f30c4c..0000000
--- a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/ActiveMQVertxLogger.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.integration.vertx;
-
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.jboss.logging.BasicLogger;
-import org.jboss.logging.Logger;
-import org.jboss.logging.annotations.LogMessage;
-import org.jboss.logging.annotations.Message;
-import org.jboss.logging.annotations.MessageLogger;
-
-/**
- * Logger Code 19
- *
- * each message id must be 6 digits long starting with 19, the 3rd digit donates the level so
- *
- * INF0 1
- * WARN 2
- * DEBUG 3
- * ERROR 4
- * TRACE 5
- * FATAL 6
- *
- * so an INFO message would be 191000 to 191999
- */
-@MessageLogger(projectCode = "AMQ")
-interface ActiveMQVertxLogger extends BasicLogger {
-
- /**
- * The vertx logger.
- */
- ActiveMQVertxLogger LOGGER = Logger.getMessageLogger(ActiveMQVertxLogger.class, ActiveMQVertxLogger.class.getPackage().getName());
-
- @LogMessage(level = Logger.Level.WARN)
- @Message(id = 192001, value = "Non vertx message: {0}", format = Message.Format.MESSAGE_FORMAT)
- void nonVertxMessage(ServerMessage message);
-
- @LogMessage(level = Logger.Level.WARN)
- @Message(id = 192002, value = "Invalid vertx type: {0}", format = Message.Format.MESSAGE_FORMAT)
- void invalidVertxType(Integer type);
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c0fe1876/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/IncomingVertxEventHandler.java
----------------------------------------------------------------------
diff --git a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/IncomingVertxEventHandler.java b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/IncomingVertxEventHandler.java
deleted file mode 100644
index 4d89e6d..0000000
--- a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/IncomingVertxEventHandler.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.integration.vertx;
-
-import java.util.Map;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.persistence.StorageManager;
-import org.apache.activemq.artemis.core.postoffice.Binding;
-import org.apache.activemq.artemis.core.postoffice.PostOffice;
-import org.apache.activemq.artemis.core.server.ConnectorService;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
-import org.apache.activemq.artemis.utils.ConfigurationHelper;
-import org.vertx.java.core.Handler;
-import org.vertx.java.core.buffer.Buffer;
-import org.vertx.java.core.eventbus.EventBus;
-import org.vertx.java.core.eventbus.Message;
-import org.vertx.java.core.eventbus.ReplyException;
-import org.vertx.java.core.eventbus.impl.PingMessage;
-import org.vertx.java.core.json.JsonArray;
-import org.vertx.java.core.json.JsonObject;
-import org.vertx.java.platform.PlatformLocator;
-import org.vertx.java.platform.PlatformManager;
-import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory;
-
-class IncomingVertxEventHandler implements ConnectorService {
-
- private final String connectorName;
-
- private final String queueName;
-
- private final int port;
-
- private final String host;
-
- private final int quorumSize;
-
- private final String haGroup;
-
- private final String vertxAddress;
-
- private EventBus eventBus;
-
- private PlatformManager platformManager;
-
- private EventHandler handler;
-
- private final StorageManager storageManager;
-
- private final PostOffice postOffice;
-
- private boolean isStarted = false;
-
- IncomingVertxEventHandler(String connectorName,
- Map<String, Object> configuration,
- StorageManager storageManager,
- PostOffice postOffice) {
- this.connectorName = connectorName;
- this.queueName = ConfigurationHelper.getStringProperty(VertxConstants.QUEUE_NAME, null, configuration);
-
- this.port = ConfigurationHelper.getIntProperty(VertxConstants.PORT, 0, configuration);
- this.host = ConfigurationHelper.getStringProperty(VertxConstants.HOST, "localhost", configuration);
- this.quorumSize = ConfigurationHelper.getIntProperty(VertxConstants.VERTX_QUORUM_SIZE, -1, configuration);
- this.haGroup = ConfigurationHelper.getStringProperty(VertxConstants.VERTX_HA_GROUP, "activemq", configuration);
- this.vertxAddress = ConfigurationHelper.getStringProperty(VertxConstants.VERTX_ADDRESS, "org.apache.activemq", configuration);
-
- this.storageManager = storageManager;
- this.postOffice = postOffice;
- }
-
- @Override
- public void start() throws Exception {
- if (this.isStarted) {
- return;
- }
- System.setProperty("vertx.clusterManagerFactory", HazelcastClusterManagerFactory.class.getName());
- if (quorumSize != -1) {
- platformManager = PlatformLocator.factory.createPlatformManager(port, host, quorumSize, haGroup);
- } else {
- platformManager = PlatformLocator.factory.createPlatformManager(port, host);
- }
-
- eventBus = platformManager.vertx().eventBus();
-
- Binding b = postOffice.getBinding(new SimpleString(queueName));
- if (b == null) {
- throw new Exception(connectorName + ": queue " + queueName + " not found");
- }
-
- handler = new EventHandler();
- eventBus.registerHandler(vertxAddress, handler);
-
- isStarted = true;
- ActiveMQVertxLogger.LOGGER.debug(connectorName + ": started");
- }
-
- @Override
- public void stop() throws Exception {
- if (!isStarted) {
- return;
- }
- eventBus.unregisterHandler(vertxAddress, handler);
- platformManager.stop();
- System.clearProperty("vertx.clusterManagerFactory");
- isStarted = false;
- ActiveMQVertxLogger.LOGGER.debug(connectorName + ": stopped");
- }
-
- @Override
- public boolean isStarted() {
- return isStarted;
- }
-
- @Override
- public String getName() {
- return connectorName;
- }
-
- private class EventHandler implements Handler<Message<?>> {
-
- @Override
- public void handle(Message<?> message) {
- ServerMessage msg = new ServerMessageImpl(storageManager.generateID(), VertxConstants.INITIAL_MESSAGE_BUFFER_SIZE);
- msg.setAddress(new SimpleString(queueName));
- msg.setDurable(true);
- msg.encodeMessageIDToBuffer();
-
- String replyAddress = message.replyAddress();
- if (replyAddress != null) {
- msg.putStringProperty(VertxConstants.VERTX_MESSAGE_REPLYADDRESS, replyAddress);
- }
-
- // it'd be better that Message expose its type information
- int type = getMessageType(message);
-
- msg.putIntProperty(VertxConstants.VERTX_MESSAGE_TYPE, type);
-
- manualEncodeVertxMessageBody(msg.getBodyBuffer(), message.body(), type);
-
- try {
- postOffice.route(msg, false);
- } catch (Exception e) {
- ActiveMQVertxLogger.LOGGER.error("failed to route msg " + msg, e);
- }
- }
-
- private void manualEncodeVertxMessageBody(ActiveMQBuffer bodyBuffer, Object body, int type) {
- switch (type) {
- case VertxConstants.TYPE_BOOLEAN:
- bodyBuffer.writeBoolean(((Boolean) body));
- break;
- case VertxConstants.TYPE_BUFFER:
- Buffer buff = (Buffer) body;
- int len = buff.length();
- bodyBuffer.writeInt(len);
- bodyBuffer.writeBytes(((Buffer) body).getBytes());
- break;
- case VertxConstants.TYPE_BYTEARRAY:
- byte[] bytes = (byte[]) body;
- bodyBuffer.writeInt(bytes.length);
- bodyBuffer.writeBytes(bytes);
- break;
- case VertxConstants.TYPE_BYTE:
- bodyBuffer.writeByte((byte) body);
- break;
- case VertxConstants.TYPE_CHARACTER:
- bodyBuffer.writeChar((Character) body);
- break;
- case VertxConstants.TYPE_DOUBLE:
- bodyBuffer.writeDouble((double) body);
- break;
- case VertxConstants.TYPE_FLOAT:
- bodyBuffer.writeFloat((Float) body);
- break;
- case VertxConstants.TYPE_INT:
- bodyBuffer.writeInt((Integer) body);
- break;
- case VertxConstants.TYPE_LONG:
- bodyBuffer.writeLong((Long) body);
- break;
- case VertxConstants.TYPE_SHORT:
- bodyBuffer.writeShort((Short) body);
- break;
- case VertxConstants.TYPE_STRING:
- case VertxConstants.TYPE_PING:
- bodyBuffer.writeString((String) body);
- break;
- case VertxConstants.TYPE_JSON_OBJECT:
- bodyBuffer.writeString(((JsonObject) body).encode());
- break;
- case VertxConstants.TYPE_JSON_ARRAY:
- bodyBuffer.writeString(((JsonArray) body).encode());
- break;
- case VertxConstants.TYPE_REPLY_FAILURE:
- ReplyException except = (ReplyException) body;
- bodyBuffer.writeInt(except.failureType().toInt());
- bodyBuffer.writeInt(except.failureCode());
- bodyBuffer.writeString(except.getMessage());
- break;
- default:
- throw new IllegalArgumentException("Invalid body type: " + type);
- }
- }
-
- private int getMessageType(Message<?> message) {
-
- Object body = message.body();
-
- if (message instanceof PingMessage) {
- return VertxConstants.TYPE_PING;
- } else if (body instanceof Buffer) {
- return VertxConstants.TYPE_BUFFER;
- } else if (body instanceof Boolean) {
- return VertxConstants.TYPE_BOOLEAN;
- } else if (body instanceof byte[]) {
- return VertxConstants.TYPE_BYTEARRAY;
- } else if (body instanceof Byte) {
- return VertxConstants.TYPE_BYTE;
- } else if (body instanceof Character) {
- return VertxConstants.TYPE_CHARACTER;
- } else if (body instanceof Double) {
- return VertxConstants.TYPE_DOUBLE;
- } else if (body instanceof Float) {
- return VertxConstants.TYPE_FLOAT;
- } else if (body instanceof Integer) {
- return VertxConstants.TYPE_INT;
- } else if (body instanceof Long) {
- return VertxConstants.TYPE_LONG;
- } else if (body instanceof Short) {
- return VertxConstants.TYPE_SHORT;
- } else if (body instanceof String) {
- return VertxConstants.TYPE_STRING;
- } else if (body instanceof JsonArray) {
- return VertxConstants.TYPE_JSON_ARRAY;
- } else if (body instanceof JsonObject) {
- return VertxConstants.TYPE_JSON_OBJECT;
- } else if (body instanceof ReplyException) {
- return VertxConstants.TYPE_REPLY_FAILURE;
- }
- throw new IllegalArgumentException("Type not supported: " + message);
- }
-
- }
-
- @Override
- public String toString() {
- return "[IncomingVertxEventHandler(" + connectorName + "), queueName: " + queueName + " host: " + host + " port: " + port + " vertxAddress: " + vertxAddress + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c0fe1876/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/OutgoingVertxEventHandler.java
----------------------------------------------------------------------
diff --git a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/OutgoingVertxEventHandler.java b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/OutgoingVertxEventHandler.java
deleted file mode 100644
index 8820c39..0000000
--- a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/OutgoingVertxEventHandler.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.integration.vertx;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.postoffice.Binding;
-import org.apache.activemq.artemis.core.postoffice.PostOffice;
-import org.apache.activemq.artemis.core.server.ConnectorService;
-import org.apache.activemq.artemis.core.server.Consumer;
-import org.apache.activemq.artemis.core.server.HandleStatus;
-import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.utils.ConfigurationHelper;
-import org.vertx.java.core.buffer.Buffer;
-import org.vertx.java.core.eventbus.EventBus;
-import org.vertx.java.core.eventbus.ReplyException;
-import org.vertx.java.core.eventbus.ReplyFailure;
-import org.vertx.java.core.json.JsonArray;
-import org.vertx.java.core.json.JsonObject;
-import org.vertx.java.platform.PlatformLocator;
-import org.vertx.java.platform.PlatformManager;
-import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory;
-
-class OutgoingVertxEventHandler implements Consumer, ConnectorService {
-
- private final String connectorName;
-
- private final String queueName;
-
- private final int port;
-
- private final String host;
-
- private final int quorumSize;
-
- private final String haGroup;
-
- private final String vertxAddress;
-
- private final boolean publish;
-
- private final PostOffice postOffice;
-
- private Queue queue = null;
-
- private Filter filter = null;
-
- private EventBus eventBus;
-
- private PlatformManager platformManager;
-
- private boolean isStarted = false;
-
- OutgoingVertxEventHandler(String connectorName, Map<String, Object> configuration, PostOffice postOffice) {
- this.connectorName = connectorName;
- this.queueName = ConfigurationHelper.getStringProperty(VertxConstants.QUEUE_NAME, null, configuration);
- this.postOffice = postOffice;
-
- this.port = ConfigurationHelper.getIntProperty(VertxConstants.PORT, 0, configuration);
- this.host = ConfigurationHelper.getStringProperty(VertxConstants.HOST, "localhost", configuration);
- this.quorumSize = ConfigurationHelper.getIntProperty(VertxConstants.VERTX_QUORUM_SIZE, -1, configuration);
- this.haGroup = ConfigurationHelper.getStringProperty(VertxConstants.VERTX_HA_GROUP, "activemq", configuration);
- this.vertxAddress = ConfigurationHelper.getStringProperty(VertxConstants.VERTX_ADDRESS, "org.apache.activemq", configuration);
- this.publish = ConfigurationHelper.getBooleanProperty(VertxConstants.VERTX_PUBLISH, false, configuration);
- }
-
- @Override
- public void start() throws Exception {
- if (this.isStarted) {
- return;
- }
- System.setProperty("vertx.clusterManagerFactory", HazelcastClusterManagerFactory.class.getName());
- if (quorumSize != -1) {
- platformManager = PlatformLocator.factory.createPlatformManager(port, host, quorumSize, haGroup);
- } else {
- platformManager = PlatformLocator.factory.createPlatformManager(port, host);
- }
-
- eventBus = platformManager.vertx().eventBus();
-
- if (this.connectorName == null || this.connectorName.trim().equals("")) {
- throw new Exception("invalid connector name: " + this.connectorName);
- }
-
- if (this.queueName == null || this.queueName.trim().equals("")) {
- throw new Exception("invalid queue name: " + queueName);
- }
-
- SimpleString name = new SimpleString(this.queueName);
- Binding b = this.postOffice.getBinding(name);
- if (b == null) {
- throw new Exception(connectorName + ": queue " + queueName + " not found");
- }
- this.queue = (Queue) b.getBindable();
- this.queue.addConsumer(this);
-
- this.queue.deliverAsync();
- this.isStarted = true;
-
- ActiveMQVertxLogger.LOGGER.debug(connectorName + ": started");
- }
-
- @Override
- public void stop() throws Exception {
- if (!this.isStarted) {
- return;
- }
-
- ActiveMQVertxLogger.LOGGER.debug(connectorName + ": receive shutdown request");
-
- this.queue.removeConsumer(this);
-
- this.platformManager.stop();
- System.clearProperty("vertx.clusterManagerFactory");
- this.isStarted = false;
- ActiveMQVertxLogger.LOGGER.debug(connectorName + ": stopped");
- }
-
- @Override
- public boolean isStarted() {
- return this.isStarted;
- }
-
- @Override
- public String getName() {
- return this.connectorName;
- }
-
- @Override
- public HandleStatus handle(MessageReference ref) throws Exception {
- if (filter != null && !filter.match(ref.getMessage())) {
- return HandleStatus.NO_MATCH;
- }
-
- synchronized (this) {
- ref.handled();
-
- ServerMessage message = ref.getMessage();
-
- Object vertxMsgBody;
- // extract information from message
- Integer type = message.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
-
- if (type == null) {
- // log a warning and default to raw bytes
- ActiveMQVertxLogger.LOGGER.nonVertxMessage(message);
- type = VertxConstants.TYPE_RAWBYTES;
- }
-
- // from vertx
- vertxMsgBody = extractMessageBody(message, type);
-
- if (vertxMsgBody == null) {
- return HandleStatus.NO_MATCH;
- }
-
- // send to bus
- if (!publish) {
- eventBus.send(vertxAddress, vertxMsgBody);
- } else {
- eventBus.publish(vertxAddress, vertxMsgBody);
- }
-
- queue.acknowledge(ref);
-
- ActiveMQVertxLogger.LOGGER.debug(connectorName + ": forwarded to vertx: " + message.getMessageID());
- return HandleStatus.HANDLED;
- }
- }
-
- private Object extractMessageBody(ServerMessage message, Integer type) throws Exception {
- Object vertxMsgBody = null;
- ActiveMQBuffer bodyBuffer = message.getBodyBuffer();
- switch (type) {
- case VertxConstants.TYPE_PING:
- case VertxConstants.TYPE_STRING:
- bodyBuffer.resetReaderIndex();
- vertxMsgBody = bodyBuffer.readString();
- break;
- case VertxConstants.TYPE_BUFFER:
- int len = bodyBuffer.readInt();
- byte[] bytes = new byte[len];
- bodyBuffer.readBytes(bytes);
- vertxMsgBody = new Buffer(bytes);
- break;
- case VertxConstants.TYPE_BOOLEAN:
- vertxMsgBody = bodyBuffer.readBoolean();
- break;
- case VertxConstants.TYPE_BYTEARRAY:
- int length = bodyBuffer.readInt();
- byte[] byteArray = new byte[length];
- bodyBuffer.readBytes(byteArray);
- vertxMsgBody = byteArray;
- break;
- case VertxConstants.TYPE_BYTE:
- vertxMsgBody = bodyBuffer.readByte();
- break;
- case VertxConstants.TYPE_CHARACTER:
- vertxMsgBody = bodyBuffer.readChar();
- break;
- case VertxConstants.TYPE_DOUBLE:
- vertxMsgBody = bodyBuffer.readDouble();
- break;
- case VertxConstants.TYPE_FLOAT:
- vertxMsgBody = bodyBuffer.readFloat();
- break;
- case VertxConstants.TYPE_INT:
- vertxMsgBody = bodyBuffer.readInt();
- break;
- case VertxConstants.TYPE_LONG:
- vertxMsgBody = bodyBuffer.readLong();
- break;
- case VertxConstants.TYPE_SHORT:
- vertxMsgBody = bodyBuffer.readShort();
- break;
- case VertxConstants.TYPE_JSON_OBJECT:
- vertxMsgBody = new JsonObject(bodyBuffer.readString());
- break;
- case VertxConstants.TYPE_JSON_ARRAY:
- vertxMsgBody = new JsonArray(bodyBuffer.readString());
- break;
- case VertxConstants.TYPE_REPLY_FAILURE:
- int failureType = bodyBuffer.readInt();
- int failureCode = bodyBuffer.readInt();
- String errMsg = bodyBuffer.readString();
- vertxMsgBody = new ReplyException(ReplyFailure.fromInt(failureType), failureCode, errMsg);
- break;
- case VertxConstants.TYPE_RAWBYTES:
- int size = bodyBuffer.readableBytes();
- byte[] rawBytes = new byte[size];
- bodyBuffer.readBytes(rawBytes);
- vertxMsgBody = rawBytes;
- break;
- default:
- ActiveMQVertxLogger.LOGGER.invalidVertxType(type);
- break;
- }
- return vertxMsgBody;
- }
-
- @Override
- public void proceedDeliver(MessageReference reference) throws Exception {
- // no op
- }
-
- @Override
- public Filter getFilter() {
- return this.filter;
- }
-
- @Override
- public String debug() {
- return null;
- }
-
- @Override
- public String toManagementString() {
- return null;
- }
-
- @Override
- public List<MessageReference> getDeliveringMessages() {
- return null;
- }
-
- @Override
- public void disconnect() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c0fe1876/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxConstants.java
----------------------------------------------------------------------
diff --git a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxConstants.java b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxConstants.java
deleted file mode 100644
index e0d1537..0000000
--- a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxConstants.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.integration.vertx;
-
-import java.util.HashSet;
-import java.util.Set;
-
-public class VertxConstants {
-
- // org.vertx.java.core.eventbus.impl.MessageFactory
- public static final int TYPE_PING = 0;
- public static final int TYPE_BUFFER = 1;
- public static final int TYPE_BOOLEAN = 2;
- public static final int TYPE_BYTEARRAY = 3;
- public static final int TYPE_BYTE = 4;
- public static final int TYPE_CHARACTER = 5;
- public static final int TYPE_DOUBLE = 6;
- public static final int TYPE_FLOAT = 7;
- public static final int TYPE_INT = 8;
- public static final int TYPE_LONG = 9;
- public static final int TYPE_SHORT = 10;
- public static final int TYPE_STRING = 11;
- public static final int TYPE_JSON_OBJECT = 12;
- public static final int TYPE_JSON_ARRAY = 13;
- public static final int TYPE_REPLY_FAILURE = 100;
- public static final int TYPE_RAWBYTES = 200;
-
- public static final String PORT = "port";
- public static final String HOST = "host";
- public static final String QUEUE_NAME = "queue";
- public static final String VERTX_ADDRESS = "vertx-address";
- public static final String VERTX_PUBLISH = "publish";
- public static final String VERTX_QUORUM_SIZE = "quorum-size";
- public static final String VERTX_HA_GROUP = "ha-group";
-
- public static final Set<String> ALLOWABLE_INCOMING_CONNECTOR_KEYS;
- public static final Set<String> REQUIRED_INCOMING_CONNECTOR_KEYS;
- public static final Set<String> ALLOWABLE_OUTGOING_CONNECTOR_KEYS;
- public static final Set<String> REQUIRED_OUTGOING_CONNECTOR_KEYS;
- public static final int INITIAL_MESSAGE_BUFFER_SIZE = 50;
- public static final String VERTX_MESSAGE_REPLYADDRESS = "VertxMessageReplyAddress";
- public static final String VERTX_MESSAGE_TYPE = "VertxMessageType";
-
- static {
- ALLOWABLE_INCOMING_CONNECTOR_KEYS = new HashSet<>();
- ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(PORT);
- ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(HOST);
- ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(QUEUE_NAME);
- ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(VERTX_ADDRESS);
- ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(VERTX_QUORUM_SIZE);
- ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(VERTX_HA_GROUP);
-
- REQUIRED_INCOMING_CONNECTOR_KEYS = new HashSet<>();
- REQUIRED_INCOMING_CONNECTOR_KEYS.add(QUEUE_NAME);
-
- ALLOWABLE_OUTGOING_CONNECTOR_KEYS = new HashSet<>();
- ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(PORT);
- ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(HOST);
- ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(QUEUE_NAME);
- ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(VERTX_ADDRESS);
- ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(VERTX_PUBLISH);
- ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(VERTX_QUORUM_SIZE);
- ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(VERTX_HA_GROUP);
-
- REQUIRED_OUTGOING_CONNECTOR_KEYS = new HashSet<>();
- REQUIRED_OUTGOING_CONNECTOR_KEYS.add(QUEUE_NAME);
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c0fe1876/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxIncomingConnectorServiceFactory.java
----------------------------------------------------------------------
diff --git a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxIncomingConnectorServiceFactory.java b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxIncomingConnectorServiceFactory.java
deleted file mode 100644
index 03afe20..0000000
--- a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxIncomingConnectorServiceFactory.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.integration.vertx;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.apache.activemq.artemis.core.persistence.StorageManager;
-import org.apache.activemq.artemis.core.postoffice.PostOffice;
-import org.apache.activemq.artemis.core.server.ConnectorService;
-import org.apache.activemq.artemis.core.server.ConnectorServiceFactory;
-
-public class VertxIncomingConnectorServiceFactory implements ConnectorServiceFactory {
-
- @Override
- public ConnectorService createConnectorService(String connectorName,
- Map<String, Object> configuration,
- StorageManager storageManager,
- PostOffice postOffice,
- ScheduledExecutorService scheduledThreadPool) {
-
- return new IncomingVertxEventHandler(connectorName, configuration, storageManager, postOffice);
-
- }
-
- @Override
- public Set<String> getAllowableProperties() {
- return VertxConstants.ALLOWABLE_INCOMING_CONNECTOR_KEYS;
- }
-
- @Override
- public Set<String> getRequiredProperties() {
- return VertxConstants.REQUIRED_INCOMING_CONNECTOR_KEYS;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c0fe1876/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxOutgoingConnectorServiceFactory.java
----------------------------------------------------------------------
diff --git a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxOutgoingConnectorServiceFactory.java b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxOutgoingConnectorServiceFactory.java
deleted file mode 100644
index 2ae0848..0000000
--- a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/VertxOutgoingConnectorServiceFactory.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.integration.vertx;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.apache.activemq.artemis.core.persistence.StorageManager;
-import org.apache.activemq.artemis.core.postoffice.PostOffice;
-import org.apache.activemq.artemis.core.server.ConnectorService;
-import org.apache.activemq.artemis.core.server.ConnectorServiceFactory;
-
-public class VertxOutgoingConnectorServiceFactory implements ConnectorServiceFactory {
-
- @Override
- public ConnectorService createConnectorService(String connectorName,
- Map<String, Object> configuration,
- StorageManager storageManager,
- PostOffice postOffice,
- ScheduledExecutorService scheduledThreadPool) {
- return new OutgoingVertxEventHandler(connectorName, configuration, postOffice);
- }
-
- @Override
- public Set<String> getAllowableProperties() {
- return VertxConstants.ALLOWABLE_OUTGOING_CONNECTOR_KEYS;
- }
-
- @Override
- public Set<String> getRequiredProperties() {
- return VertxConstants.REQUIRED_OUTGOING_CONNECTOR_KEYS;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c0fe1876/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 454aad2..d182bbb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,7 +54,6 @@
<module>artemis-server-osgi</module>
<module>integration/activemq-spring-integration</module>
<module>integration/activemq-aerogear-integration</module>
- <module>integration/activemq-vertx-integration</module>
<module>artemis-distribution</module>
<module>artemis-tools</module>
<module>tests</module>
@@ -736,7 +735,6 @@
<module>artemis-jdbc-store</module>
<module>integration/activemq-spring-integration</module>
<module>integration/activemq-aerogear-integration</module>
- <module>integration/activemq-vertx-integration</module>
<module>tests</module>
</modules>
<properties>
@@ -772,7 +770,6 @@
<module>artemis-maven-plugin</module>
<module>integration/activemq-spring-integration</module>
<module>integration/activemq-aerogear-integration</module>
- <module>integration/activemq-vertx-integration</module>
<module>examples</module>
<module>tests</module>
<module>artemis-distribution</module>
@@ -831,7 +828,6 @@
<module>artemis-maven-plugin</module>
<module>integration/activemq-spring-integration</module>
<module>integration/activemq-aerogear-integration</module>
- <module>integration/activemq-vertx-integration</module>
<module>tests</module>
</modules>
<properties>
@@ -874,7 +870,6 @@
<module>artemis-maven-plugin</module>
<module>integration/activemq-spring-integration</module>
<module>integration/activemq-aerogear-integration</module>
- <module>integration/activemq-vertx-integration</module>
<module>tests</module>
</modules>
<properties>
@@ -909,7 +904,6 @@
<module>artemis-maven-plugin</module>
<module>integration/activemq-spring-integration</module>
<module>integration/activemq-aerogear-integration</module>
- <module>integration/activemq-vertx-integration</module>
<module>tests</module>
<module>examples</module>
</modules>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c0fe1876/tests/integration-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml
index 8dd8b16..afdaee2 100644
--- a/tests/integration-tests/pom.xml
+++ b/tests/integration-tests/pom.xml
@@ -30,8 +30,6 @@
<activemq.basedir>${project.basedir}/../..</activemq.basedir>
<karaf.version>4.0.6</karaf.version>
<pax.exam.version>4.9.1</pax.exam.version>
- <vertx.version>2.1.6</vertx.version>
- <vertx.testtools.version>2.0.3-final</vertx.testtools.version>
</properties>
<repositories>
@@ -123,11 +121,6 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
- <artifactId>artemis-vertx-integration</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.activemq</groupId>
<artifactId>artemis-journal</artifactId>
<version>${project.version}</version>
</dependency>
@@ -248,37 +241,6 @@
<scope>test</scope>
</dependency>
- <!--Vertx provided dependencies-->
- <dependency>
- <groupId>io.vertx</groupId>
- <artifactId>vertx-core</artifactId>
- <version>${vertx.version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>io.vertx</groupId>
- <artifactId>vertx-platform</artifactId>
- <version>${vertx.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>io.vertx</groupId>
- <artifactId>vertx-hazelcast</artifactId>
- <version>${vertx.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>io.vertx</groupId>
- <artifactId>testtools</artifactId>
- <version>${vertx.testtools.version}</version>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c0fe1876/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index 7a12f42..91db1d2a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.MQTTException;
@@ -57,7 +58,6 @@ import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.vertx.java.core.impl.ConcurrentHashSet;
/**
* QT