You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2022/10/13 03:55:13 UTC
[activemq] branch activemq-5.16.x updated: Fix serialization of RemoveInfo advisory message for AMQP consumers
This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch activemq-5.16.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.16.x by this push:
new 9e127bd39 Fix serialization of RemoveInfo advisory message for AMQP consumers
9e127bd39 is described below
commit 9e127bd39086c270d4c36a13c294c9dae600800c
Author: Lucas Tétreault <te...@amazon.com>
AuthorDate: Wed Oct 12 19:01:06 2022 -0700
Fix serialization of RemoveInfo advisory message for AMQP consumers
(cherry picked from commit e0a37a5c304367e693d180b014f679ad68ca0b80)
---
.../message/JMSMappingOutboundTransformer.java | 2 +-
.../activemq/transport/amqp/AmqpAdvisoryTest.java | 78 ++++++++++++++++++++++
.../activemq/transport/amqp/AmqpTestSupport.java | 3 +-
3 files changed, 81 insertions(+), 2 deletions(-)
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
index 67d034447..dbae00f99 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
@@ -417,7 +417,7 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer {
removeMap.put(ConsumerId.class.getSimpleName(), ((ConsumerId)removeInfo.getObjectId()).getValue());
removeMap.put("SessionId", ((ConsumerId)removeInfo.getObjectId()).getSessionId());
removeMap.put("ConnectionId", ((ConsumerId)removeInfo.getObjectId()).getConnectionId());
- removeMap.put("ParentId", ((ConsumerId)removeInfo.getObjectId()).getParentId());
+ removeMap.put("ParentId", ((ConsumerId)removeInfo.getObjectId()).getParentId().getValue());
}
body = new AmqpValue(removeMap);
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAdvisoryTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAdvisoryTest.java
new file mode 100644
index 000000000..5b32c54d9
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAdvisoryTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport.amqp;
+
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class AmqpAdvisoryTest extends AmqpTestSupport {
+ protected Connection connection1;
+ protected Connection connection2;
+
+ @Override
+ public void setUp() throws Exception {
+ advisorySupport = true;
+ super.setUp();
+ }
+
+ @Test()
+ public void testConnectionAdvisory() throws Exception {
+ connection1 = createAmqpConnection();
+ connection1.start();
+ Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination advisoryTopic = session1.createTopic("ActiveMQ.Advisory.Consumer.Queue.workshop.queueA");
+ MessageConsumer advisoryTopicConsumer = session1.createConsumer(advisoryTopic);
+
+
+ connection2 = createAmqpConnection();
+ connection2.start();
+ Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session2.createQueue("workshop.queueA");
+ session2.createConsumer(queue);
+
+ Message connectMessage = advisoryTopicConsumer.receive(100);
+ assertNotNull(connectMessage);
+ assertEquals("ConsumerInfo", connectMessage.getStringProperty("ActiveMqDataStructureType"));
+
+ connection2.close();
+
+ Message removeMessage = advisoryTopicConsumer.receive(100);
+ assertNotNull(removeMessage);
+ assertEquals("RemoveInfo", removeMessage.getStringProperty("ActiveMqDataStructureType"));
+ connection1.close();
+ }
+
+ public Connection createAmqpConnection() throws JMSException {
+ final JmsConnectionFactory factory = new JmsConnectionFactory(amqpURI);
+ final Connection connection = factory.createConnection();
+ connection.setExceptionListener(Throwable::printStackTrace);
+ connection.start();
+ return connection;
+ }
+}
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
index 73a22cc25..27efe99f3 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
@@ -71,6 +71,7 @@ public class AmqpTestSupport {
protected Vector<Throwable> exceptions = new Vector<>();
protected int numberOfMessages;
+ protected boolean advisorySupport = false;
protected URI amqpURI;
protected int amqpPort;
protected URI amqpSslURI;
@@ -119,7 +120,7 @@ public class AmqpTestSupport {
brokerService.setPersistenceAdapter(kaha);
}
brokerService.setSchedulerSupport(isSchedulerEnabled());
- brokerService.setAdvisorySupport(false);
+ brokerService.setAdvisorySupport(advisorySupport);
brokerService.setUseJmx(isUseJmx());
brokerService.getManagementContext().setCreateConnector(false);