You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/10/31 15:55:54 UTC

activemq-artemis git commit: ARTEMIS-2151 JMS Selectors broken in some cases

Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x a2482f56d -> b5c862feb


ARTEMIS-2151 JMS Selectors broken in some cases

Create Test Case
Fix OpenWire so selectors are translated
Fix GroupID to call groupId method
(cherry picked from commit faa6ffa3b4fbaef4e57f9dfc53db0a9fb6843b88)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b5c862fe
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b5c862fe
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b5c862fe

Branch: refs/heads/2.6.x
Commit: b5c862febc260e957e20c50452eac23e8bbd0dd2
Parents: a2482f5
Author: Michael André Pearce <mi...@me.com>
Authored: Mon Oct 29 05:46:55 2018 +0000
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Oct 31 11:55:46 2018 -0400

----------------------------------------------------------------------
 .../artemis/utils/SelectorTranslator.java       |   1 +
 .../artemis/api/core/FilterConstants.java       |   5 +
 .../core/protocol/openwire/amq/AMQConsumer.java |   3 +-
 .../artemis/core/filter/impl/FilterImpl.java    |   2 +
 .../tests/integration/amqp/JMSSelectorTest.java | 162 +++++++++++++++++++
 5 files changed, 172 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b5c862fe/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SelectorTranslator.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SelectorTranslator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SelectorTranslator.java
index 637cdff..dd391a9 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SelectorTranslator.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SelectorTranslator.java
@@ -48,6 +48,7 @@ public class SelectorTranslator {
       filterString = SelectorTranslator.parse(filterString, "JMSTimestamp", "AMQTimestamp");
       filterString = SelectorTranslator.parse(filterString, "JMSMessageID", "AMQUserID");
       filterString = SelectorTranslator.parse(filterString, "JMSExpiration", "AMQExpiration");
+      filterString = SelectorTranslator.parse(filterString, "JMSXGroupID", "AMQGroupID");
 
       return filterString;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b5c862fe/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/FilterConstants.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/FilterConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/FilterConstants.java
index 37b221c..3803a54 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/FilterConstants.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/FilterConstants.java
@@ -68,6 +68,11 @@ public final class FilterConstants {
    public static final SimpleString ACTIVEMQ_ADDRESS = new SimpleString("AMQAddress");
 
    /**
+    * Name of the ActiveMQ Artemis Message group id header.
+    */
+   public static final SimpleString ACTIVEMQ_GROUP_ID = new SimpleString("AMQGroupID");
+
+   /**
     * All ActiveMQ Artemis headers are prepended by this prefix.
     */
    public static final SimpleString ACTIVEMQ_PREFIX = new SimpleString("AMQ");

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b5c862fe/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index fae6ef7..1878937 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -46,6 +46,7 @@ import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.reader.MessageUtil;
+import org.apache.activemq.artemis.utils.SelectorTranslator;
 import org.apache.activemq.command.ConsumerControl;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
@@ -117,7 +118,7 @@ public class AMQConsumer {
 
    public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception {
 
-      SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector());
+      SimpleString selector = info.getSelector() == null ? null : new SimpleString(SelectorTranslator.convertToActiveMQFilterString(info.getSelector()));
       boolean preAck = false;
       if (info.isNoLocal()) {
          if (!AdvisorySupport.isAdvisoryTopic(openwireDestination)) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b5c862fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
index 7ee7b6b..560e53a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
@@ -176,6 +176,8 @@ public class FilterImpl implements Filter {
          return msg.getEncodeSize();
       } else if (FilterConstants.ACTIVEMQ_ADDRESS.equals(fieldName)) {
          return msg.getAddress();
+      } else if (FilterConstants.ACTIVEMQ_GROUP_ID.equals(fieldName)) {
+         return msg.getGroupID();
       } else {
          return null;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b5c862fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSelectorTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSelectorTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSelectorTest.java
new file mode 100644
index 0000000..c61898f
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSelectorTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.junit.Test;
+
+public class JMSSelectorTest extends JMSClientTestSupport {
+
+   private static final String NORMAL_QUEUE_NAME = "NORMAL";
+
+   private ConnectionSupplier AMQPConnection = () -> createConnection();
+   private ConnectionSupplier CoreConnection = () -> createCoreConnection();
+   private ConnectionSupplier OpenWireConnection = () -> createOpenWireConnection();
+
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP,OPENWIRE,CORE";
+   }
+
+   @Override
+   protected void addConfiguration(ActiveMQServer server) {
+      server.getConfiguration().setPersistenceEnabled(false);
+      server.getAddressSettingsRepository().addMatch(NORMAL_QUEUE_NAME, new AddressSettings());
+   }
+
+   @Override
+   protected void createAddressAndQueues(ActiveMQServer server) throws Exception {
+      super.createAddressAndQueues(server);
+
+      //Add Standard Queue
+      server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(NORMAL_QUEUE_NAME), RoutingType.ANYCAST));
+      server.createQueue(SimpleString.toSimpleString(NORMAL_QUEUE_NAME), RoutingType.ANYCAST, SimpleString.toSimpleString(NORMAL_QUEUE_NAME), null, true, false, -1, false, true);
+   }
+
+   @Test
+   public void testJMSSelectorsAMQPProducerAMQPConsumer() throws Exception {
+      testJMSSelectors(AMQPConnection, AMQPConnection);
+   }
+
+   @Test
+   public void testJMSSelectorsCoreProducerCoreConsumer() throws Exception {
+      testJMSSelectors(CoreConnection, CoreConnection);
+   }
+
+   @Test
+   public void testJMSSelectorsCoreProducerAMQPConsumer() throws Exception {
+      testJMSSelectors(CoreConnection, AMQPConnection);
+   }
+
+   @Test
+   public void testJMSSelectorsAMQPProducerCoreConsumer() throws Exception {
+      testJMSSelectors(AMQPConnection, CoreConnection);
+   }
+
+   @Test
+   public void testJMSSelectorsOpenWireProducerOpenWireConsumer() throws Exception {
+      testJMSSelectors(OpenWireConnection, OpenWireConnection);
+   }
+
+   @Test
+   public void testJMSSelectorsCoreProducerOpenWireConsumer() throws Exception {
+      testJMSSelectors(CoreConnection, OpenWireConnection);
+   }
+
+   @Test
+   public void testJMSSelectorsOpenWireProducerCoreConsumer() throws Exception {
+      testJMSSelectors(OpenWireConnection, CoreConnection);
+   }
+
+   @Test
+   public void testJMSSelectorsAMQPProducerOpenWireConsumer() throws Exception {
+      testJMSSelectors(AMQPConnection, OpenWireConnection);
+   }
+
+   @Test
+   public void testJMSSelectorsOpenWireProducerAMQPConsumer() throws Exception {
+      testJMSSelectors(OpenWireConnection, AMQPConnection);
+   }
+
+   public void testJMSSelectors(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception {
+      testJMSSelector(producerConnectionSupplier, consumerConnectionSupplier, NORMAL_QUEUE_NAME, message -> message.setStringProperty("color", "blue"), "color = 'blue'");
+      testJMSSelector(producerConnectionSupplier, consumerConnectionSupplier, NORMAL_QUEUE_NAME, message -> message.setJMSCorrelationID("correlation"), "JMSCorrelationID = 'correlation'");
+      testJMSSelector(producerConnectionSupplier, consumerConnectionSupplier, NORMAL_QUEUE_NAME, null, "JMSPriority = 1", Message.DEFAULT_DELIVERY_MODE, 1, Message.DEFAULT_TIME_TO_LIVE);
+      testJMSSelector(producerConnectionSupplier, consumerConnectionSupplier, NORMAL_QUEUE_NAME, message -> message.setStringProperty("JMSXGroupID", "groupA"), "JMSXGroupID = 'groupA'");
+   }
+
+   public void testJMSSelector(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier, String queueName, MessageSetter setValue, String selector) throws Exception {
+      testJMSSelector(producerConnectionSupplier, consumerConnectionSupplier, queueName, setValue, selector, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+   }
+
+   public void testJMSSelector(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier, String queueName, MessageSetter setValue, String selector, int deliveryMode, int priority, long timeToLive) throws Exception {
+
+      sendMessage(producerConnectionSupplier, queueName, setValue, deliveryMode, priority, timeToLive);
+
+      receiveLVQ(consumerConnectionSupplier, queueName, selector);
+   }
+
+   private void receiveLVQ(ConnectionSupplier consumerConnectionSupplier, String queueName, String selector) throws JMSException {
+      try (Connection consumerConnection = consumerConnectionSupplier.createConnection()) {
+
+         Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Queue consumerQueue = consumerSession.createQueue(queueName);
+         MessageConsumer consumer = consumerSession.createConsumer(consumerQueue, selector);
+         TextMessage msg = (TextMessage) consumer.receive(1000);
+         assertNotNull(msg);
+         assertEquals("how are you", msg.getText());
+         assertNull(consumer.receive(1000));
+         consumer.close();
+      }
+   }
+
+   private void sendMessage(ConnectionSupplier producerConnectionSupplier, String queueName, MessageSetter setValue,  int deliveryMode, int priority, long timeToLive) throws JMSException {
+      try (Connection producerConnection = producerConnectionSupplier.createConnection()) {
+         Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Queue queue1 = producerSession.createQueue(queueName);
+         MessageProducer p = producerSession.createProducer(null);
+
+         TextMessage message1 = producerSession.createTextMessage();
+         message1.setText("hello");
+         p.send(queue1, message1);
+
+         TextMessage message2 = producerSession.createTextMessage();
+         if (setValue != null) {
+            setValue.accept(message2);
+         }
+         message2.setText("how are you");
+         p.send(queue1, message2, deliveryMode, priority, timeToLive);
+      }
+   }
+
+   public interface MessageSetter {
+
+      void accept(javax.jms.Message message) throws JMSException;
+   }
+}
\ No newline at end of file