You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2022/10/13 03:54:16 UTC

[activemq] branch main updated: Fix serialization of RemoveInfo advisory message for AMQP consumers

This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new e0a37a5c3 Fix serialization of RemoveInfo advisory message for AMQP consumers
     new 3bcbc5b84 Merge pull request #919 from lucastetreault/AMQ-9119
e0a37a5c3 is described below

commit e0a37a5c304367e693d180b014f679ad68ca0b80
Author: Lucas Tétreault <te...@amazon.com>
AuthorDate: Wed Oct 12 19:01:06 2022 -0700

    Fix serialization of RemoveInfo advisory message for AMQP consumers
---
 .../message/JMSMappingOutboundTransformer.java     |  2 +-
 .../activemq/transport/amqp/AmqpAdvisoryTest.java  | 78 ++++++++++++++++++++++
 .../activemq/transport/amqp/AmqpTestSupport.java   |  3 +-
 3 files changed, 81 insertions(+), 2 deletions(-)

diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
index 67d034447..dbae00f99 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
@@ -417,7 +417,7 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer {
             		removeMap.put(ConsumerId.class.getSimpleName(), ((ConsumerId)removeInfo.getObjectId()).getValue());
             		removeMap.put("SessionId", ((ConsumerId)removeInfo.getObjectId()).getSessionId());
             		removeMap.put("ConnectionId", ((ConsumerId)removeInfo.getObjectId()).getConnectionId());
-            		removeMap.put("ParentId", ((ConsumerId)removeInfo.getObjectId()).getParentId());
+            		removeMap.put("ParentId", ((ConsumerId)removeInfo.getObjectId()).getParentId().getValue());
             	}
             	
             	body = new AmqpValue(removeMap);
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAdvisoryTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAdvisoryTest.java
new file mode 100644
index 000000000..5b32c54d9
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAdvisoryTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport.amqp;
+
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class AmqpAdvisoryTest extends AmqpTestSupport {
+    protected Connection connection1;
+    protected Connection connection2;
+
+    @Override
+    public void setUp() throws Exception {
+        advisorySupport = true;
+        super.setUp();
+    }
+
+    @Test()
+    public void testConnectionAdvisory() throws Exception {
+        connection1 = createAmqpConnection();
+        connection1.start();
+        Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination advisoryTopic = session1.createTopic("ActiveMQ.Advisory.Consumer.Queue.workshop.queueA");
+        MessageConsumer advisoryTopicConsumer = session1.createConsumer(advisoryTopic);
+
+
+        connection2 = createAmqpConnection();
+        connection2.start();
+        Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session2.createQueue("workshop.queueA");
+        session2.createConsumer(queue);
+
+        Message connectMessage = advisoryTopicConsumer.receive(100);
+        assertNotNull(connectMessage);
+        assertEquals("ConsumerInfo", connectMessage.getStringProperty("ActiveMqDataStructureType"));
+
+        connection2.close();
+
+        Message removeMessage = advisoryTopicConsumer.receive(100);
+        assertNotNull(removeMessage);
+        assertEquals("RemoveInfo", removeMessage.getStringProperty("ActiveMqDataStructureType"));
+        connection1.close();
+    }
+
+    public Connection createAmqpConnection() throws JMSException {
+        final JmsConnectionFactory factory = new JmsConnectionFactory(amqpURI);
+        final Connection connection = factory.createConnection();
+        connection.setExceptionListener(Throwable::printStackTrace);
+        connection.start();
+        return connection;
+    }
+}
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
index 35f607fa7..b25e802fc 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
@@ -71,6 +71,7 @@ public class AmqpTestSupport {
     protected Vector<Throwable> exceptions = new Vector<>();
     protected int numberOfMessages;
 
+    protected boolean advisorySupport = false;
     protected URI amqpURI;
     protected int amqpPort;
     protected URI amqpSslURI;
@@ -118,7 +119,7 @@ public class AmqpTestSupport {
             brokerService.setPersistenceAdapter(kaha);
         }
         brokerService.setSchedulerSupport(isSchedulerEnabled());
-        brokerService.setAdvisorySupport(false);
+        brokerService.setAdvisorySupport(advisorySupport);
         brokerService.setUseJmx(isUseJmx());
         brokerService.getManagementContext().setCreateConnector(false);