You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/06/27 14:42:46 UTC
[activemq-artemis] 01/03: ARTEMIS-2390 JMSMessageID header can be
null when messages are cross-protocol
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit c66d62e4b09f7a529c4067f83a333a55d11667d7
Author: Howard Gao <ho...@gmail.com>
AuthorDate: Tue Jun 25 20:21:40 2019 +0800
ARTEMIS-2390 JMSMessageID header can be null when messages are cross-protocol
If a jms client (be it openwire, amqp, or core jms) receives a message that
is from a different protocol, the JMSMessageID maybe null when the
jms client expects it.
---
.../org/apache/activemq/artemis/utils/UUID.java | 4 +
.../activemq/artemis/utils/UUIDGenerator.java | 13 ++
.../protocol/amqp/converter/AmqpCoreConverter.java | 12 +-
.../protocol/amqp/converter/CoreAmqpConverter.java | 4 +
.../openwire/OpenWireMessageConverter.java | 6 +-
.../management/impl/ManagementServiceImpl.java | 3 +-
.../crossprotocol/MessageIDMultiProtocolTest.java | 146 +++++++++++++++++++++
.../artemis/tests/unit/util/UUIDGeneratorTest.java | 10 ++
8 files changed, 195 insertions(+), 3 deletions(-)
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java
index 1d84fea..c3f99a4 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java
@@ -107,6 +107,10 @@ public final class UUID {
mId[UUID.INDEX_VARIATION] |= (byte) 0x80;
}
+ public UUID(final byte[] data) {
+ mId = data;
+ }
+
public byte[] asBytes() {
return mId;
}
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUIDGenerator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUIDGenerator.java
index c111617..3d4f10e 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUIDGenerator.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUIDGenerator.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.utils;
import java.net.NetworkInterface;
import java.net.SocketException;
+import java.nio.ByteBuffer;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
@@ -109,6 +110,18 @@ public final class UUIDGenerator {
return new UUID(UUID.TYPE_TIME_BASED, contents);
}
+ public UUID fromJavaUUID(java.util.UUID uuid) {
+ long msb = uuid.getMostSignificantBits();
+ long lsb = uuid.getLeastSignificantBits();
+
+ ByteBuffer buffer = ByteBuffer.allocate(16);
+ buffer.putLong(msb);
+ buffer.putLong(lsb);
+ byte[] contents = buffer.array();
+
+ return new UUID(contents);
+ }
+
public byte[] generateDummyAddress() {
Random rnd = getRandomNumberGenerator();
byte[] dummy = new byte[6];
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
index 32d3596..739d437 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
@@ -55,6 +55,7 @@ import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
@@ -67,6 +68,7 @@ import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Decimal128;
@@ -311,7 +313,15 @@ public class AmqpCoreConverter {
if (properties != null) {
if (properties.getMessageId() != null) {
jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toMessageIdString(properties.getMessageId()));
+ //core jms clients get JMSMessageID from UserID which is a UUID object
+ if (properties.getMessageId() instanceof UUID) {
+ //AMQP's message ID can be a UUID, keep it
+ jms.getInnerMessage().setUserID(UUIDGenerator.getInstance().fromJavaUUID((UUID) properties.getMessageId()));
+ } else {
+ jms.getInnerMessage().setUserID(UUIDGenerator.getInstance().generateUUID());
+ }
}
+
Binary userId = properties.getUserId();
if (userId != null) {
jms.setStringProperty("JMSXUserID", new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8));
@@ -345,6 +355,7 @@ public class AmqpCoreConverter {
}
}
Object correlationID = properties.getCorrelationId();
+
if (correlationID != null) {
try {
jms.getInnerMessage().setCorrelationID(AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(correlationID));
@@ -374,7 +385,6 @@ public class AmqpCoreConverter {
jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime());
}
}
-
return jms;
}
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
index 1099d51..453b7ec 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
@@ -159,6 +159,10 @@ public class CoreAmqpConverter {
} catch (ActiveMQAMQPIllegalStateException e) {
properties.setMessageId(messageId);
}
+ } else {
+ if (message.getInnerMessage().getUserID() != null) {
+ properties.setMessageId("ID:" + message.getInnerMessage().getUserID().toString());
+ }
}
Destination destination = message.getJMSDestination();
if (destination != null) {
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index c6c91f3..63082d9 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -187,6 +187,8 @@ public final class OpenWireMessageConverter {
midBytes.compact();
coreMessage.putBytesProperty(AMQ_MSG_MESSAGE_ID, midBytes.data);
+ coreMessage.setUserID(UUIDGenerator.getInstance().generateUUID());
+
final ProducerId producerId = messageSend.getProducerId();
if (producerId != null) {
final ByteSequence producerIdBytes = marshaller.marshal(producerId);
@@ -629,7 +631,9 @@ public final class OpenWireMessageConverter {
ByteSequence midSeq = new ByteSequence(midBytes);
mid = (MessageId) marshaller.unmarshal(midSeq);
} else {
- mid = new MessageId(UUIDGenerator.getInstance().generateStringUUID() + ":-1");
+ //JMSMessageID should be started with "ID:"
+ String midd = "ID:" + UUIDGenerator.getInstance().generateStringUUID() + ":-1:-1:-1:-1";
+ mid = new MessageId(midd);
}
amqMsg.setMessageId(mid);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index 0cb0f19..2ba8e4e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -801,9 +801,10 @@ public class ManagementServiceImpl implements ManagementService {
// CoreMessage#getUserId returns UUID, so to implement this part a alternative API that returned object. This part of the
// change is a nice to have for my point of view. I suggested it for completeness. The application could
// always supply unique correl ids on the request and achieve the same effect. I'd be happy to drop this part.
- Object underlying = request.getUserID() != null ? request.getUserID() : request.getStringProperty(NATIVE_MESSAGE_ID);
+ Object underlying = request.getStringProperty(NATIVE_MESSAGE_ID) != null ? request.getStringProperty(NATIVE_MESSAGE_ID) : request.getUserID();
correlationId = underlying == null ? null : String.valueOf(underlying);
}
+
return correlationId;
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/MessageIDMultiProtocolTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/MessageIDMultiProtocolTest.java
new file mode 100644
index 0000000..e652f79
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/MessageIDMultiProtocolTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.crossprotocol;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.Arrays;
+
+import static org.apache.activemq.artemis.tests.util.CFUtil.createConnectionFactory;
+
+@RunWith(Parameterized.class)
+public class MessageIDMultiProtocolTest extends OpenWireTestBase {
+
+ String protocolSender;
+ String protocolConsumer;
+ ConnectionFactory senderCF;
+ ConnectionFactory consumerCF;
+ private static final SimpleString queueName = SimpleString.toSimpleString("MessageIDueueTest");
+
+ public MessageIDMultiProtocolTest(String protocolSender, String protocolConsumer) {
+ this.protocolSender = protocolSender;
+ this.protocolConsumer = protocolConsumer;
+ }
+
+ @Parameterized.Parameters(name = "sender={0},consumer={1}")
+ public static Iterable<Object[]> data() {
+ return Arrays.asList(new Object[][]{
+ {"OPENWIRE", "OPENWIRE"},
+ {"OPENWIRE", "CORE"},
+ {"OPENWIRE", "AMQP"},
+ {"CORE", "OPENWIRE"},
+ {"CORE", "CORE"},
+ {"CORE", "AMQP"},
+ {"AMQP", "OPENWIRE"},
+ {"AMQP", "CORE"},
+ {"AMQP", "AMQP"},
+ });
+ }
+
+
+ @Before
+ public void setupCF() {
+ senderCF = createConnectionFactory(protocolSender, urlString);
+ consumerCF = createConnectionFactory(protocolConsumer, urlString);
+ }
+
+ @Before
+ public void setupQueue() throws Exception {
+ Wait.assertTrue(server::isStarted);
+ Wait.assertTrue(server::isActive);
+ this.server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false, -1, false, true);
+ }
+
+
+ @Test
+ public void testMessageIDNotNullCorrelationIDPreserved() throws Throwable {
+ Connection senderConn = senderCF.createConnection();
+ Connection consumerConn = consumerCF.createConnection();
+ consumerConn.setClientID("consumer");
+
+ try (Session senderSession = senderConn.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
+ Queue senderDestination = senderSession.createQueue(queueName.toString());
+ MessageProducer senderProducer = senderSession.createProducer(senderDestination);
+ Message sentMessage = senderSession.createMessage();
+ sentMessage.setJMSCorrelationID("ID:MessageIDCorrelationId");
+ senderProducer.send(sentMessage);
+ senderConn.start();
+
+ String sentMid = sentMessage.getJMSMessageID();
+
+ try (Session consumerSess = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
+ Destination consumerDestination = consumerSess.createQueue(queueName.toString());
+ MessageConsumer consumer = consumerSess.createConsumer(consumerDestination);
+ consumerConn.start();
+
+ Message receivedMessage = consumer.receive(3000);
+ Assert.assertNotNull(receivedMessage);
+
+ Assert.assertEquals(sentMessage.getJMSCorrelationID(), receivedMessage.getJMSCorrelationID());
+
+ String messageId = receivedMessage.getJMSMessageID();
+ Assert.assertNotNull(messageId);
+
+ Assert.assertTrue(messageId.startsWith("ID:"));
+
+ System.out.println("[" + protocolSender + "][" + protocolConsumer + "] " + messageId);
+ System.out.println("[" + protocolSender + "][" + protocolConsumer + "] " + sentMid);
+
+ if (protocolConsumer.equals(protocolSender)) {
+ //only same protocol we guarantee the same JMSMessageID
+ assertEquals(sentMid, messageId);
+ }
+
+ //specific case [CORE]->[AMQP]
+ if ("CORE".equals(protocolSender) && "AMQP".equals(protocolConsumer)) {
+ assertEquals(sentMid, messageId);
+ }
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ } finally {
+ try {
+ senderConn.close();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ try {
+ consumerConn.close();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/UUIDGeneratorTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/UUIDGeneratorTest.java
index 46a9065..2eaa419 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/UUIDGeneratorTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/UUIDGeneratorTest.java
@@ -21,6 +21,8 @@ import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.junit.Assert;
import org.junit.Test;
+import java.util.UUID;
+
public class UUIDGeneratorTest extends ActiveMQTestBase {
// Constants -----------------------------------------------------
@@ -33,6 +35,14 @@ public class UUIDGeneratorTest extends ActiveMQTestBase {
// Public --------------------------------------------------------
@Test
+ public void testFromJavaUUID() throws Exception {
+ UUID javaId = UUID.randomUUID();
+ UUIDGenerator gen = UUIDGenerator.getInstance();
+ org.apache.activemq.artemis.utils.UUID nativeId = gen.fromJavaUUID(javaId);
+ assertEquals(javaId.toString(), nativeId.toString());
+ }
+
+ @Test
public void testGetHardwareAddress() throws Exception {
byte[] bytes = UUIDGenerator.getHardwareAddress();
Assert.assertNotNull(bytes);