You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by an...@apache.org on 2017/09/06 10:59:16 UTC
[1/2] activemq-artemis git commit: ARTEMIS-1387 Fix AMQPtoMQTT and
Link route issues
Repository: activemq-artemis
Updated Branches:
refs/heads/master 125bd41f9 -> f8ccb6d31
ARTEMIS-1387 Fix AMQPtoMQTT and Link route issues
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/16dfd777
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/16dfd777
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/16dfd777
Branch: refs/heads/master
Commit: 16dfd777b85197786675a34286e58da321ab22c0
Parents: 125bd41
Author: Martyn Taylor <mt...@redhat.com>
Authored: Tue Sep 5 17:27:27 2017 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Wed Sep 6 11:21:56 2017 +0100
----------------------------------------------------------------------
.../activemq/artemis/api/core/ICoreMessage.java | 2 +-
.../activemq/artemis/api/core/Message.java | 2 +-
.../core/protocol/mqtt/MQTTPublishManager.java | 8 ++---
.../core/protocol/mqtt/MQTTSessionCallback.java | 4 +--
.../artemis/core/protocol/mqtt/MQTTUtil.java | 4 +++
.../impl/openmbean/OpenTypeSupport.java | 2 +-
.../integration/mqtt/imported/MQTTTest.java | 37 ++++++++++++++++++++
7 files changed, 49 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/16dfd777/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
index f49ef68..fe2044b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java
@@ -77,7 +77,7 @@ public interface ICoreMessage extends Message {
map.put("userID", "ID:" + userID.toString());
}
- map.put("address", getAddress());
+ map.put("address", getAddress() == null ? "" : getAddress());
map.put("type", getType());
map.put("durable", isDurable());
map.put("expiration", getExpiration());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/16dfd777/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 58433ce..61d887e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -616,7 +616,7 @@ public interface Message {
map.put("userID", "ID:" + userID.toString());
}
- map.put("address", getAddress());
+ map.put("address", getAddress() == null ? "" : getAddress());
map.put("durable", isDurable());
map.put("expiration", getExpiration());
map.put("timestamp", getTimestamp());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/16dfd777/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index e23385c..eb9c631 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -24,12 +24,12 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.EmptyByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
-import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -112,7 +112,7 @@ public class MQTTPublishManager {
* to original ID and consumer in the Session state. This way we can look up the consumer Id and the message Id from
* the PubAck or PubRec message id. *
*/
- protected void sendMessage(CoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
+ protected void sendMessage(ICoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
// This is to allow retries of PubRel.
if (isManagementConsumer(consumer)) {
sendPubRelMessage(message);
@@ -257,8 +257,8 @@ public class MQTTPublishManager {
}
}
- private void sendServerMessage(int messageId, CoreMessage message, int deliveryCount, int qos) {
- String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString(), session.getWildcardConfiguration());
+ private void sendServerMessage(int messageId, ICoreMessage message, int deliveryCount, int qos) {
+ String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress() == null ? "" : message.getAddress().toString(), session.getWildcardConfiguration());
boolean isRetain = message.getBooleanProperty(MQTT_MESSAGE_RETAIN_KEY);
ByteBuf payload;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/16dfd777/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index a5b908f..21b1f2b 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -17,10 +17,8 @@
package org.apache.activemq.artemis.core.protocol.mqtt;
-
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@@ -49,7 +47,7 @@ public class MQTTSessionCallback implements SessionCallback {
ServerConsumer consumer,
int deliveryCount) {
try {
- session.getMqttPublishManager().sendMessage((CoreMessage)message, consumer, deliveryCount);
+ session.getMqttPublishManager().sendMessage(message.toCore(), consumer, deliveryCount);
} catch (Exception e) {
log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/16dfd777/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index 76664f6..098a756 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -84,6 +84,10 @@ public class MQTTUtil {
private static final MQTTLogger logger = MQTTLogger.LOGGER;
public static String convertCoreAddressFilterToMQTT(String filter, WildcardConfiguration wildcardConfiguration) {
+ if (filter == null) {
+ return "";
+ }
+
if (filter.startsWith(MQTT_RETAIN_ADDRESS_PREFIX)) {
filter = filter.substring(MQTT_RETAIN_ADDRESS_PREFIX.length(), filter.length());
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/16dfd777/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
index e7df48b..0c781b7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
@@ -137,7 +137,7 @@ public final class OpenTypeSupport {
} else {
rc.put(CompositeDataConstants.USER_ID, "");
}
- rc.put(CompositeDataConstants.ADDRESS, m.getAddress().toString());
+ rc.put(CompositeDataConstants.ADDRESS, m.getAddress() == null ? "" : m.getAddress().toString());
rc.put(CompositeDataConstants.TYPE, m.getType());
rc.put(CompositeDataConstants.DURABLE, m.isDurable());
rc.put(CompositeDataConstants.EXPIRATION, m.getExpiration());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/16dfd777/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index 794b002..e3c4856 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -25,6 +25,7 @@ import javax.jms.Session;
import java.io.EOFException;
import java.lang.reflect.Field;
import java.net.ProtocolException;
+import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
@@ -48,6 +49,11 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.MQTTException;
@@ -71,6 +77,8 @@ public class MQTTTest extends MQTTTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
+ private static final String AMQP_URI = "tcp://localhost:61616";
+
@Override
@Before
public void setUp() throws Exception {
@@ -1162,6 +1170,35 @@ public class MQTTTest extends MQTTTestSupport {
doTestSendMQTTReceiveJMS("foo.*", "foo/bar");
}
+ @Test(timeout = 60 * 1000)
+ public void testLinkRouteAmqpReceiveMQTT() throws Exception {
+ AmqpClient client = new AmqpClient(new URI(AMQP_URI), null, null);
+ AmqpConnection connection = client.connect();
+
+ try {
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender("test", true);
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+ sender.send(message);
+ sender.close();
+ } finally {
+ connection.close();
+ }
+
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setClientId("TestClient");
+ BlockingConnection blockingConnection = mqtt.blockingConnection();
+ try {
+ blockingConnection.connect();
+ Topic t = new Topic("test", QoS.AT_LEAST_ONCE);
+ blockingConnection.subscribe(new Topic[] {t});
+ assertNotNull(blockingConnection.receive(5, TimeUnit.SECONDS));
+ } finally {
+ blockingConnection.kill();
+ }
+ }
+
public void doTestSendMQTTReceiveJMS(String jmsTopicAddress, String mqttAddress) throws Exception {
final MQTTClientProvider provider = getMQTTClientProvider();
initializeConnection(provider);
[2/2] activemq-artemis git commit: This closes #1512 ARTEMIS-1387 Fix
AMQPtoMQTT and Link route issues
Posted by an...@apache.org.
This closes #1512 ARTEMIS-1387 Fix AMQPtoMQTT and Link route issues
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f8ccb6d3
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f8ccb6d3
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f8ccb6d3
Branch: refs/heads/master
Commit: f8ccb6d31dbe5ec1016c753cbd035de15c912a2d
Parents: 125bd41 16dfd77
Author: Andy Taylor <an...@gmail.com>
Authored: Wed Sep 6 11:58:52 2017 +0100
Committer: Andy Taylor <an...@gmail.com>
Committed: Wed Sep 6 11:58:52 2017 +0100
----------------------------------------------------------------------
.../activemq/artemis/api/core/ICoreMessage.java | 2 +-
.../activemq/artemis/api/core/Message.java | 2 +-
.../core/protocol/mqtt/MQTTPublishManager.java | 8 ++---
.../core/protocol/mqtt/MQTTSessionCallback.java | 4 +--
.../artemis/core/protocol/mqtt/MQTTUtil.java | 4 +++
.../impl/openmbean/OpenTypeSupport.java | 2 +-
.../integration/mqtt/imported/MQTTTest.java | 37 ++++++++++++++++++++
7 files changed, 49 insertions(+), 10 deletions(-)
----------------------------------------------------------------------