You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/03/06 02:38:59 UTC

[activemq-artemis] branch master updated: ARTEMIS-2639 lost notif props with OpenWire+divert

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new e9ee995  ARTEMIS-2639 lost notif props with OpenWire+divert
     new 8927d07  This closes #3001
e9ee995 is described below

commit e9ee9956bd0d879e0ac241547384bede848d132d
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Wed Mar 4 19:24:26 2020 -0600

    ARTEMIS-2639 lost notif props with OpenWire+divert
    
    Historically speaking, all message properties starting with AMQ HDR
    would not be passed to OpenWire messages. However, that blocked the
    properties from management notifications so ARTEMIS-1209 was raised and
    the solution there was to pass properties that started with _AMQ *if*
    the consumer was connected to the management notification address.
    However, in this case messages are diverted to a different address so
    this check fails and the properties are removed. My solution will be to
    check the message itself to see if it has the _AMQ_NotifType property
    (which all notification messages do) rather than checking where the
    consumer is connected.
---
 .../openwire/OpenWireMessageConverter.java         |  3 +-
 .../tests/integration/divert/DivertTest.java       | 40 ++++++++++++++++++++++
 2 files changed, 42 insertions(+), 1 deletion(-)

diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index cfc4aa6..5f6570d 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -41,6 +41,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
@@ -943,7 +944,7 @@ public final class OpenWireMessageConverter {
                                                  final AMQConsumer consumer) throws IOException {
       for (SimpleString s : props) {
          final String keyStr = s.toString();
-         if (!consumer.hasNotificationDestination() && (keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_"))) {
+         if (!coreMessage.containsProperty(ManagementHelper.HDR_NOTIFICATION_TYPE) && (keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_"))) {
             continue;
          }
          final Object prop = coreMessage.getObjectProperty(s);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
index 75b4cec..f851872 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
@@ -23,9 +23,13 @@ import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
 import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.Message;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -51,6 +55,7 @@ import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -59,6 +64,41 @@ public class DivertTest extends ActiveMQTestBase {
    private static final int TIMEOUT = 3000;
 
    @Test
+   public void testDivertedNotificationMessagePropertiesOpenWire() throws Exception {
+      final String testAddress = ActiveMQDefaultConfiguration.getDefaultManagementNotificationAddress().toString();
+
+      final String forwardAddress = "forwardAddress";
+
+      DivertConfiguration divertConf = new DivertConfiguration().setName("divert1").setRoutingName("divert1").setAddress(testAddress).setForwardingAddress(forwardAddress).setFilterString("_AMQ_NotifType = 'CONSUMER_CREATED' OR _AMQ_NotifType = 'CONSUMER_CLOSED'");
+
+      Configuration config = createDefaultNettyConfig().addDivertConfiguration(divertConf);
+
+      ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, false));
+
+      server.start();
+
+      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+
+      connectionFactory.setClientID("myClientID");
+
+      Topic forwardTopic = new ActiveMQTopic(forwardAddress);
+      Connection connection = connectionFactory.createConnection();
+
+      connection.start();
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      TopicSubscriber subscriber = session.createDurableSubscriber(forwardTopic, "mySubscriptionName");
+
+      javax.jms.Message message = subscriber.receive(DivertTest.TIMEOUT);
+
+      connection.close();
+
+      Assert.assertNotNull(message);
+
+      Assert.assertEquals("CONSUMER_CREATED", message.getStringProperty("_AMQ_NotifType"));
+   }
+
+   @Test
    public void testSingleNonExclusiveDivert() throws Exception {
       final String testAddress = "testAddress";