You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/06/27 14:42:46 UTC

[activemq-artemis] 01/03: ARTEMIS-2390 JMSMessageID header can be null when messages are cross-protocol

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit c66d62e4b09f7a529c4067f83a333a55d11667d7
Author: Howard Gao <ho...@gmail.com>
AuthorDate: Tue Jun 25 20:21:40 2019 +0800

    ARTEMIS-2390 JMSMessageID header can be null when messages are cross-protocol
    
    If a jms client (be it openwire, amqp, or core jms) receives a message that
    is from a different protocol, the JMSMessageID maybe null when the
    jms client expects it.
---
 .../org/apache/activemq/artemis/utils/UUID.java    |   4 +
 .../activemq/artemis/utils/UUIDGenerator.java      |  13 ++
 .../protocol/amqp/converter/AmqpCoreConverter.java |  12 +-
 .../protocol/amqp/converter/CoreAmqpConverter.java |   4 +
 .../openwire/OpenWireMessageConverter.java         |   6 +-
 .../management/impl/ManagementServiceImpl.java     |   3 +-
 .../crossprotocol/MessageIDMultiProtocolTest.java  | 146 +++++++++++++++++++++
 .../artemis/tests/unit/util/UUIDGeneratorTest.java |  10 ++
 8 files changed, 195 insertions(+), 3 deletions(-)

diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java
index 1d84fea..c3f99a4 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUID.java
@@ -107,6 +107,10 @@ public final class UUID {
       mId[UUID.INDEX_VARIATION] |= (byte) 0x80;
    }
 
+   public UUID(final byte[] data) {
+      mId = data;
+   }
+
    public byte[] asBytes() {
       return mId;
    }
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUIDGenerator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUIDGenerator.java
index c111617..3d4f10e 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUIDGenerator.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/UUIDGenerator.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.utils;
 
 import java.net.NetworkInterface;
 import java.net.SocketException;
+import java.nio.ByteBuffer;
 import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -109,6 +110,18 @@ public final class UUIDGenerator {
       return new UUID(UUID.TYPE_TIME_BASED, contents);
    }
 
+   public UUID fromJavaUUID(java.util.UUID uuid) {
+      long msb = uuid.getMostSignificantBits();
+      long lsb = uuid.getLeastSignificantBits();
+
+      ByteBuffer buffer = ByteBuffer.allocate(16);
+      buffer.putLong(msb);
+      buffer.putLong(lsb);
+      byte[] contents = buffer.array();
+
+      return new UUID(contents);
+   }
+
    public byte[] generateDummyAddress() {
       Random rnd = getRandomNumberGenerator();
       byte[] dummy = new byte[6];
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
index 32d3596..739d437 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
@@ -55,6 +55,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 
 import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
@@ -67,6 +68,7 @@ import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Decimal128;
@@ -311,7 +313,15 @@ public class AmqpCoreConverter {
       if (properties != null) {
          if (properties.getMessageId() != null) {
             jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toMessageIdString(properties.getMessageId()));
+            //core jms clients get JMSMessageID from UserID which is a UUID object
+            if (properties.getMessageId() instanceof UUID) {
+               //AMQP's message ID can be a UUID, keep it
+               jms.getInnerMessage().setUserID(UUIDGenerator.getInstance().fromJavaUUID((UUID) properties.getMessageId()));
+            } else {
+               jms.getInnerMessage().setUserID(UUIDGenerator.getInstance().generateUUID());
+            }
          }
+
          Binary userId = properties.getUserId();
          if (userId != null) {
             jms.setStringProperty("JMSXUserID", new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8));
@@ -345,6 +355,7 @@ public class AmqpCoreConverter {
             }
          }
          Object correlationID = properties.getCorrelationId();
+
          if (correlationID != null) {
             try {
                jms.getInnerMessage().setCorrelationID(AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(correlationID));
@@ -374,7 +385,6 @@ public class AmqpCoreConverter {
             jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime());
          }
       }
-
       return jms;
    }
 
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
index 1099d51..453b7ec 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
@@ -159,6 +159,10 @@ public class CoreAmqpConverter {
          } catch (ActiveMQAMQPIllegalStateException e) {
             properties.setMessageId(messageId);
          }
+      } else {
+         if (message.getInnerMessage().getUserID() != null) {
+            properties.setMessageId("ID:" + message.getInnerMessage().getUserID().toString());
+         }
       }
       Destination destination = message.getJMSDestination();
       if (destination != null) {
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index c6c91f3..63082d9 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -187,6 +187,8 @@ public final class OpenWireMessageConverter {
       midBytes.compact();
       coreMessage.putBytesProperty(AMQ_MSG_MESSAGE_ID, midBytes.data);
 
+      coreMessage.setUserID(UUIDGenerator.getInstance().generateUUID());
+
       final ProducerId producerId = messageSend.getProducerId();
       if (producerId != null) {
          final ByteSequence producerIdBytes = marshaller.marshal(producerId);
@@ -629,7 +631,9 @@ public final class OpenWireMessageConverter {
          ByteSequence midSeq = new ByteSequence(midBytes);
          mid = (MessageId) marshaller.unmarshal(midSeq);
       } else {
-         mid = new MessageId(UUIDGenerator.getInstance().generateStringUUID() + ":-1");
+         //JMSMessageID should be started with "ID:"
+         String midd = "ID:" + UUIDGenerator.getInstance().generateStringUUID() + ":-1:-1:-1:-1";
+         mid = new MessageId(midd);
       }
 
       amqMsg.setMessageId(mid);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index 0cb0f19..2ba8e4e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -801,9 +801,10 @@ public class ManagementServiceImpl implements ManagementService {
          // CoreMessage#getUserId returns UUID, so to implement this part a alternative API that returned object. This part of the
          // change is a nice to have for my point of view. I suggested it for completeness.  The application could
          // always supply unique correl ids on the request and achieve the same effect.  I'd be happy to drop this part.
-         Object underlying = request.getUserID() != null ? request.getUserID() : request.getStringProperty(NATIVE_MESSAGE_ID);
+         Object underlying = request.getStringProperty(NATIVE_MESSAGE_ID) != null ? request.getStringProperty(NATIVE_MESSAGE_ID) : request.getUserID();
          correlationId = underlying == null ? null : String.valueOf(underlying);
       }
+
       return correlationId;
    }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/MessageIDMultiProtocolTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/MessageIDMultiProtocolTest.java
new file mode 100644
index 0000000..e652f79
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/crossprotocol/MessageIDMultiProtocolTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.crossprotocol;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.Arrays;
+
+import static org.apache.activemq.artemis.tests.util.CFUtil.createConnectionFactory;
+
+@RunWith(Parameterized.class)
+public class MessageIDMultiProtocolTest extends OpenWireTestBase {
+
+   String protocolSender;
+   String protocolConsumer;
+   ConnectionFactory senderCF;
+   ConnectionFactory consumerCF;
+   private static final SimpleString queueName = SimpleString.toSimpleString("MessageIDueueTest");
+
+   public MessageIDMultiProtocolTest(String protocolSender, String protocolConsumer) {
+      this.protocolSender = protocolSender;
+      this.protocolConsumer = protocolConsumer;
+   }
+
+   @Parameterized.Parameters(name = "sender={0},consumer={1}")
+   public static Iterable<Object[]> data() {
+      return Arrays.asList(new Object[][]{
+         {"OPENWIRE", "OPENWIRE"},
+         {"OPENWIRE", "CORE"},
+         {"OPENWIRE", "AMQP"},
+         {"CORE", "OPENWIRE"},
+         {"CORE", "CORE"},
+         {"CORE", "AMQP"},
+         {"AMQP", "OPENWIRE"},
+         {"AMQP", "CORE"},
+         {"AMQP", "AMQP"},
+      });
+   }
+
+
+   @Before
+   public void setupCF() {
+      senderCF = createConnectionFactory(protocolSender, urlString);
+      consumerCF = createConnectionFactory(protocolConsumer, urlString);
+   }
+
+   @Before
+   public void setupQueue() throws Exception {
+      Wait.assertTrue(server::isStarted);
+      Wait.assertTrue(server::isActive);
+      this.server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false, -1, false, true);
+   }
+
+
+   @Test
+   public void testMessageIDNotNullCorrelationIDPreserved() throws Throwable {
+      Connection senderConn = senderCF.createConnection();
+      Connection consumerConn = consumerCF.createConnection();
+      consumerConn.setClientID("consumer");
+
+      try (Session senderSession = senderConn.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
+         Queue senderDestination = senderSession.createQueue(queueName.toString());
+         MessageProducer senderProducer = senderSession.createProducer(senderDestination);
+         Message sentMessage = senderSession.createMessage();
+         sentMessage.setJMSCorrelationID("ID:MessageIDCorrelationId");
+         senderProducer.send(sentMessage);
+         senderConn.start();
+
+         String sentMid = sentMessage.getJMSMessageID();
+
+         try (Session consumerSess = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
+            Destination consumerDestination = consumerSess.createQueue(queueName.toString());
+            MessageConsumer consumer = consumerSess.createConsumer(consumerDestination);
+            consumerConn.start();
+
+            Message receivedMessage = consumer.receive(3000);
+            Assert.assertNotNull(receivedMessage);
+
+            Assert.assertEquals(sentMessage.getJMSCorrelationID(), receivedMessage.getJMSCorrelationID());
+
+            String messageId = receivedMessage.getJMSMessageID();
+            Assert.assertNotNull(messageId);
+
+            Assert.assertTrue(messageId.startsWith("ID:"));
+
+            System.out.println("[" + protocolSender + "][" + protocolConsumer + "] " + messageId);
+            System.out.println("[" + protocolSender + "][" + protocolConsumer + "] " + sentMid);
+
+            if (protocolConsumer.equals(protocolSender)) {
+               //only same protocol we guarantee the same JMSMessageID
+               assertEquals(sentMid, messageId);
+            }
+
+            //specific case [CORE]->[AMQP]
+            if ("CORE".equals(protocolSender) && "AMQP".equals(protocolConsumer)) {
+               assertEquals(sentMid, messageId);
+            }
+         }
+      } catch (Throwable e) {
+         e.printStackTrace();
+         throw e;
+      } finally {
+         try {
+            senderConn.close();
+         } catch (Throwable e) {
+            e.printStackTrace();
+         }
+         try {
+            consumerConn.close();
+         } catch (Throwable e) {
+            e.printStackTrace();
+         }
+      }
+   }
+}
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/UUIDGeneratorTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/UUIDGeneratorTest.java
index 46a9065..2eaa419 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/UUIDGeneratorTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/UUIDGeneratorTest.java
@@ -21,6 +21,8 @@ import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.UUID;
+
 public class UUIDGeneratorTest extends ActiveMQTestBase {
    // Constants -----------------------------------------------------
 
@@ -33,6 +35,14 @@ public class UUIDGeneratorTest extends ActiveMQTestBase {
    // Public --------------------------------------------------------
 
    @Test
+   public void testFromJavaUUID() throws Exception {
+      UUID javaId = UUID.randomUUID();
+      UUIDGenerator gen = UUIDGenerator.getInstance();
+      org.apache.activemq.artemis.utils.UUID nativeId = gen.fromJavaUUID(javaId);
+      assertEquals(javaId.toString(), nativeId.toString());
+   }
+
+   @Test
    public void testGetHardwareAddress() throws Exception {
       byte[] bytes = UUIDGenerator.getHardwareAddress();
       Assert.assertNotNull(bytes);