You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/07/12 18:19:06 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6353

Repository: activemq
Updated Branches:
  refs/heads/master 6c2ce67ff -> 16c487a7b


https://issues.apache.org/jira/browse/AMQ-6353

Fix and test for encoding the correlation Id value to the ActiveMQ
message object's string value

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/16c487a7
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/16c487a7
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/16c487a7

Branch: refs/heads/master
Commit: 16c487a7b9ba54c013254a1c642d31eba86acc8b
Parents: 6c2ce67
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Jul 12 14:18:47 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Jul 12 14:19:01 2016 -0400

----------------------------------------------------------------------
 .../amqp/message/AMQPMessageIdHelper.java       |   1 -
 .../amqp/message/ActiveMQJMSVendor.java         |   2 +-
 .../amqp/message/InboundTransformer.java        |   2 +-
 .../message/JMSMappingOutboundTransformer.java  |   9 +-
 .../transport/amqp/client/AmqpMessage.java      |  54 ++++-
 .../AmqpCorrelationIdPreservationTest.java      | 197 +++++++++++++++++++
 6 files changed, 259 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/16c487a7/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPMessageIdHelper.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPMessageIdHelper.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPMessageIdHelper.java
index dad365d..5afa995 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPMessageIdHelper.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPMessageIdHelper.java
@@ -46,7 +46,6 @@ import org.apache.qpid.proton.amqp.UnsignedLong;
  *
  * <p>When provided a string for conversion which attempts to identify itself as an encoded binary, uuid, or
  * ulong but can't be converted into the indicated format, an exception will be thrown.
- *
  */
 public class AMQPMessageIdHelper {
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/16c487a7/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java
index a976240..216daa9 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.

http://git-wip-us.apache.org/repos/asf/activemq/blob/16c487a7/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
index 2223b5a..e883bcf 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
@@ -207,7 +207,7 @@ public abstract class InboundTransformer {
                 jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo()));
             }
             if (properties.getCorrelationId() != null) {
-                jms.setJMSCorrelationID(properties.getCorrelationId().toString());
+                jms.setJMSCorrelationID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getCorrelationId()));
             }
             if (properties.getContentType() != null) {
                 jms.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString());

http://git-wip-us.apache.org/repos/asf/activemq/blob/16c487a7/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
index 7e6af2f..c9a94fa 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -211,7 +211,12 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
             maMap.put(LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationAttributes(msg.getJMSReplyTo()));
         }
         if (msg.getJMSCorrelationID() != null) {
-            props.setCorrelationId(msg.getJMSCorrelationID());
+            String correlationId = msg.getJMSCorrelationID();
+            try {
+                props.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId));
+            } catch (AmqpProtocolException e) {
+                props.setCorrelationId(correlationId);
+            }
         }
         if (msg.getJMSExpiration() != 0) {
             long ttl = msg.getJMSExpiration() - System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/activemq/blob/16c487a7/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index d974690..b954e04 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -225,6 +225,58 @@ public class AmqpMessage {
     }
 
     /**
+     * Sets the CorrelationId property on an outbound message using the provided String
+     *
+     * @param correlationId
+     *        the String Correlation ID value to set.
+     */
+    public void setCorrelationId(String correlationId) {
+        checkReadOnly();
+        lazyCreateProperties();
+        getWrappedMessage().setCorrelationId(correlationId);
+    }
+
+    /**
+     * Return the set CorrelationId value in String form, if there are no properties
+     * in the given message return null.
+     *
+     * @return the set correlation ID in String form or null if not set.
+     */
+    public String getCorrelationId() {
+        if (message.getProperties() == null) {
+            return null;
+        }
+
+        return message.getProperties().getCorrelationId().toString();
+    }
+
+    /**
+     * Return the set CorrelationId value in the original form, if there are no properties
+     * in the given message return null.
+     *
+     * @return the set message ID in its original form or null if not set.
+     */
+    public Object getRawCorrelationId() {
+        if (message.getProperties() == null) {
+            return null;
+        }
+
+        return message.getProperties().getCorrelationId();
+    }
+
+    /**
+     * Sets the CorrelationId property on an outbound message using the provided value
+     *
+     * @param correlationId
+     *        the correlation ID value to set.
+     */
+    public void setRawCorrelationId(Object correlationId) {
+        checkReadOnly();
+        lazyCreateProperties();
+        getWrappedMessage().setCorrelationId(correlationId);
+    }
+
+    /**
      * Sets the GroupId property on an outbound message using the provided String
      *
      * @param messageId

http://git-wip-us.apache.org/repos/asf/activemq/blob/16c487a7/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorrelationIdPreservationTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorrelationIdPreservationTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorrelationIdPreservationTest.java
new file mode 100644
index 0000000..b155060
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorrelationIdPreservationTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.interop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.transport.amqp.JMSInteroperabilityTest;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests that the AMQP CorrelationId value and type are preserved.
+ */
+@RunWith(Parameterized.class)
+public class AmqpCorrelationIdPreservationTest extends AmqpClientTestSupport {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(JMSInteroperabilityTest.class);
+
+    private final String transformer;
+
+    @Parameters(name="Transformer->{0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {"jms"},
+                {"native"},
+                {"raw"},
+            });
+    }
+
+    public AmqpCorrelationIdPreservationTest(String transformer) {
+        this.transformer = transformer;
+    }
+
+    @Override
+    protected String getAmqpTransformer() {
+        return transformer;
+    }
+
+    @Override
+    protected boolean isPersistent() {
+        return true;
+    }
+
+    @Test(timeout = 60000)
+    public void testStringCorrelationIdIsPreserved() throws Exception {
+        doTestCorrelationIdPreservation("msg-id-string:1");
+    }
+
+    @Test(timeout = 60000)
+    public void testStringCorrelationIdIsPreservedAfterRestart() throws Exception {
+        doTestCorrelationIdPreservationOnBrokerRestart("msg-id-string:1");
+    }
+
+    @Test(timeout = 60000)
+    public void testUUIDCorrelationIdIsPreserved() throws Exception {
+        doTestCorrelationIdPreservation(UUID.randomUUID());
+    }
+
+    @Test(timeout = 60000)
+    public void testUUIDCorrelationIdIsPreservedAfterRestart() throws Exception {
+        doTestCorrelationIdPreservationOnBrokerRestart(UUID.randomUUID());
+    }
+
+    @Test(timeout = 60000)
+    public void testUnsignedLongCorrelationIdIsPreserved() throws Exception {
+        doTestCorrelationIdPreservation(new UnsignedLong(255l));
+    }
+
+    @Test(timeout = 60000)
+    public void testUnsignedLongCorrelationIdIsPreservedAfterRestart() throws Exception {
+        doTestCorrelationIdPreservationOnBrokerRestart(new UnsignedLong(255l));
+    }
+
+    @Test(timeout = 60000)
+    public void testBinaryLongCorrelationIdIsPreserved() throws Exception {
+        byte[] payload = new byte[32];
+        for (int i = 0; i < 32; ++i) {
+            payload[i] = (byte) ('a' + i);
+        }
+
+        doTestCorrelationIdPreservation(new Binary(payload));
+    }
+
+    @Test(timeout = 60000)
+    public void testBinaryLongCorrelationIdIsPreservedAfterRestart() throws Exception {
+        byte[] payload = new byte[32];
+        for (int i = 0; i < 32; ++i) {
+            payload[i] = (byte) ('a' + i);
+        }
+
+        doTestCorrelationIdPreservationOnBrokerRestart(new Binary(payload));
+    }
+
+    @Test(timeout = 60000)
+    public void testStringCorrelationIdPrefixIsPreserved() throws Exception {
+        doTestCorrelationIdPreservation("ID:msg-id-string:1");
+    }
+
+    public void doTestCorrelationIdPreservation(Object messageId) throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+
+        AmqpMessage message = new AmqpMessage();
+
+        message.setRawCorrelationId(messageId);
+        message.setText("Test-Message");
+
+        sender.send(message);
+
+        sender.close();
+
+        QueueViewMBean queue = getProxyToQueue(getTestName());
+        assertEquals(1, queue.getQueueSize());
+
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+        receiver.flow(1);
+        AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+        assertNotNull("Should have got a message", received);
+        assertEquals(received.getRawCorrelationId().getClass(), messageId.getClass());
+        assertEquals(messageId, received.getRawCorrelationId());
+        receiver.close();
+        connection.close();
+    }
+
+    public void doTestCorrelationIdPreservationOnBrokerRestart(Object messageId) throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+
+        AmqpMessage message = new AmqpMessage();
+
+        message.setRawCorrelationId(messageId);
+        message.setText("Test-Message");
+        message.setDurable(true);
+
+        sender.send(message);
+
+        sender.close();
+        connection.close();
+
+        restartBroker();
+
+        QueueViewMBean queue = getProxyToQueue(getTestName());
+        assertEquals(1, queue.getQueueSize());
+
+        connection = client.connect();
+        session = connection.createSession();
+
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+        receiver.flow(1);
+        AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+        assertNotNull("Should have got a message", received);
+        assertEquals(received.getRawCorrelationId().getClass(), messageId.getClass());
+        assertEquals(messageId, received.getRawCorrelationId());
+        receiver.close();
+        connection.close();
+    }
+}