You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/10/31 15:55:54 UTC
activemq-artemis git commit: ARTEMIS-2151 JMS Selectors broken in
some cases
Repository: activemq-artemis
Updated Branches:
refs/heads/2.6.x a2482f56d -> b5c862feb
ARTEMIS-2151 JMS Selectors broken in some cases
Create Test Case
Fix OpenWire so selectors are translated
Fix GroupID to call groupId method
(cherry picked from commit faa6ffa3b4fbaef4e57f9dfc53db0a9fb6843b88)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b5c862fe
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b5c862fe
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b5c862fe
Branch: refs/heads/2.6.x
Commit: b5c862febc260e957e20c50452eac23e8bbd0dd2
Parents: a2482f5
Author: Michael André Pearce <mi...@me.com>
Authored: Mon Oct 29 05:46:55 2018 +0000
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Oct 31 11:55:46 2018 -0400
----------------------------------------------------------------------
.../artemis/utils/SelectorTranslator.java | 1 +
.../artemis/api/core/FilterConstants.java | 5 +
.../core/protocol/openwire/amq/AMQConsumer.java | 3 +-
.../artemis/core/filter/impl/FilterImpl.java | 2 +
.../tests/integration/amqp/JMSSelectorTest.java | 162 +++++++++++++++++++
5 files changed, 172 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b5c862fe/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SelectorTranslator.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SelectorTranslator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SelectorTranslator.java
index 637cdff..dd391a9 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SelectorTranslator.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SelectorTranslator.java
@@ -48,6 +48,7 @@ public class SelectorTranslator {
filterString = SelectorTranslator.parse(filterString, "JMSTimestamp", "AMQTimestamp");
filterString = SelectorTranslator.parse(filterString, "JMSMessageID", "AMQUserID");
filterString = SelectorTranslator.parse(filterString, "JMSExpiration", "AMQExpiration");
+ filterString = SelectorTranslator.parse(filterString, "JMSXGroupID", "AMQGroupID");
return filterString;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b5c862fe/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/FilterConstants.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/FilterConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/FilterConstants.java
index 37b221c..3803a54 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/FilterConstants.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/FilterConstants.java
@@ -68,6 +68,11 @@ public final class FilterConstants {
public static final SimpleString ACTIVEMQ_ADDRESS = new SimpleString("AMQAddress");
/**
+ * Name of the ActiveMQ Artemis Message group id header.
+ */
+ public static final SimpleString ACTIVEMQ_GROUP_ID = new SimpleString("AMQGroupID");
+
+ /**
* All ActiveMQ Artemis headers are prepended by this prefix.
*/
public static final SimpleString ACTIVEMQ_PREFIX = new SimpleString("AMQ");
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b5c862fe/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index fae6ef7..1878937 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -46,6 +46,7 @@ import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.reader.MessageUtil;
+import org.apache.activemq.artemis.utils.SelectorTranslator;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
@@ -117,7 +118,7 @@ public class AMQConsumer {
public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception {
- SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector());
+ SimpleString selector = info.getSelector() == null ? null : new SimpleString(SelectorTranslator.convertToActiveMQFilterString(info.getSelector()));
boolean preAck = false;
if (info.isNoLocal()) {
if (!AdvisorySupport.isAdvisoryTopic(openwireDestination)) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b5c862fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
index 7ee7b6b..560e53a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java
@@ -176,6 +176,8 @@ public class FilterImpl implements Filter {
return msg.getEncodeSize();
} else if (FilterConstants.ACTIVEMQ_ADDRESS.equals(fieldName)) {
return msg.getAddress();
+ } else if (FilterConstants.ACTIVEMQ_GROUP_ID.equals(fieldName)) {
+ return msg.getGroupID();
} else {
return null;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b5c862fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSelectorTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSelectorTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSelectorTest.java
new file mode 100644
index 0000000..c61898f
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSelectorTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.junit.Test;
+
+public class JMSSelectorTest extends JMSClientTestSupport {
+
+ private static final String NORMAL_QUEUE_NAME = "NORMAL";
+
+ private ConnectionSupplier AMQPConnection = () -> createConnection();
+ private ConnectionSupplier CoreConnection = () -> createCoreConnection();
+ private ConnectionSupplier OpenWireConnection = () -> createOpenWireConnection();
+
+ @Override
+ protected String getConfiguredProtocols() {
+ return "AMQP,OPENWIRE,CORE";
+ }
+
+ @Override
+ protected void addConfiguration(ActiveMQServer server) {
+ server.getConfiguration().setPersistenceEnabled(false);
+ server.getAddressSettingsRepository().addMatch(NORMAL_QUEUE_NAME, new AddressSettings());
+ }
+
+ @Override
+ protected void createAddressAndQueues(ActiveMQServer server) throws Exception {
+ super.createAddressAndQueues(server);
+
+ //Add Standard Queue
+ server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(NORMAL_QUEUE_NAME), RoutingType.ANYCAST));
+ server.createQueue(SimpleString.toSimpleString(NORMAL_QUEUE_NAME), RoutingType.ANYCAST, SimpleString.toSimpleString(NORMAL_QUEUE_NAME), null, true, false, -1, false, true);
+ }
+
+ @Test
+ public void testJMSSelectorsAMQPProducerAMQPConsumer() throws Exception {
+ testJMSSelectors(AMQPConnection, AMQPConnection);
+ }
+
+ @Test
+ public void testJMSSelectorsCoreProducerCoreConsumer() throws Exception {
+ testJMSSelectors(CoreConnection, CoreConnection);
+ }
+
+ @Test
+ public void testJMSSelectorsCoreProducerAMQPConsumer() throws Exception {
+ testJMSSelectors(CoreConnection, AMQPConnection);
+ }
+
+ @Test
+ public void testJMSSelectorsAMQPProducerCoreConsumer() throws Exception {
+ testJMSSelectors(AMQPConnection, CoreConnection);
+ }
+
+ @Test
+ public void testJMSSelectorsOpenWireProducerOpenWireConsumer() throws Exception {
+ testJMSSelectors(OpenWireConnection, OpenWireConnection);
+ }
+
+ @Test
+ public void testJMSSelectorsCoreProducerOpenWireConsumer() throws Exception {
+ testJMSSelectors(CoreConnection, OpenWireConnection);
+ }
+
+ @Test
+ public void testJMSSelectorsOpenWireProducerCoreConsumer() throws Exception {
+ testJMSSelectors(OpenWireConnection, CoreConnection);
+ }
+
+ @Test
+ public void testJMSSelectorsAMQPProducerOpenWireConsumer() throws Exception {
+ testJMSSelectors(AMQPConnection, OpenWireConnection);
+ }
+
+ @Test
+ public void testJMSSelectorsOpenWireProducerAMQPConsumer() throws Exception {
+ testJMSSelectors(OpenWireConnection, AMQPConnection);
+ }
+
+ public void testJMSSelectors(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception {
+ testJMSSelector(producerConnectionSupplier, consumerConnectionSupplier, NORMAL_QUEUE_NAME, message -> message.setStringProperty("color", "blue"), "color = 'blue'");
+ testJMSSelector(producerConnectionSupplier, consumerConnectionSupplier, NORMAL_QUEUE_NAME, message -> message.setJMSCorrelationID("correlation"), "JMSCorrelationID = 'correlation'");
+ testJMSSelector(producerConnectionSupplier, consumerConnectionSupplier, NORMAL_QUEUE_NAME, null, "JMSPriority = 1", Message.DEFAULT_DELIVERY_MODE, 1, Message.DEFAULT_TIME_TO_LIVE);
+ testJMSSelector(producerConnectionSupplier, consumerConnectionSupplier, NORMAL_QUEUE_NAME, message -> message.setStringProperty("JMSXGroupID", "groupA"), "JMSXGroupID = 'groupA'");
+ }
+
+ public void testJMSSelector(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier, String queueName, MessageSetter setValue, String selector) throws Exception {
+ testJMSSelector(producerConnectionSupplier, consumerConnectionSupplier, queueName, setValue, selector, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+ }
+
+ public void testJMSSelector(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier, String queueName, MessageSetter setValue, String selector, int deliveryMode, int priority, long timeToLive) throws Exception {
+
+ sendMessage(producerConnectionSupplier, queueName, setValue, deliveryMode, priority, timeToLive);
+
+ receiveLVQ(consumerConnectionSupplier, queueName, selector);
+ }
+
+ private void receiveLVQ(ConnectionSupplier consumerConnectionSupplier, String queueName, String selector) throws JMSException {
+ try (Connection consumerConnection = consumerConnectionSupplier.createConnection()) {
+
+ Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue consumerQueue = consumerSession.createQueue(queueName);
+ MessageConsumer consumer = consumerSession.createConsumer(consumerQueue, selector);
+ TextMessage msg = (TextMessage) consumer.receive(1000);
+ assertNotNull(msg);
+ assertEquals("how are you", msg.getText());
+ assertNull(consumer.receive(1000));
+ consumer.close();
+ }
+ }
+
+ private void sendMessage(ConnectionSupplier producerConnectionSupplier, String queueName, MessageSetter setValue, int deliveryMode, int priority, long timeToLive) throws JMSException {
+ try (Connection producerConnection = producerConnectionSupplier.createConnection()) {
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue1 = producerSession.createQueue(queueName);
+ MessageProducer p = producerSession.createProducer(null);
+
+ TextMessage message1 = producerSession.createTextMessage();
+ message1.setText("hello");
+ p.send(queue1, message1);
+
+ TextMessage message2 = producerSession.createTextMessage();
+ if (setValue != null) {
+ setValue.accept(message2);
+ }
+ message2.setText("how are you");
+ p.send(queue1, message2, deliveryMode, priority, timeToLive);
+ }
+ }
+
+ public interface MessageSetter {
+
+ void accept(javax.jms.Message message) throws JMSException;
+ }
+}
\ No newline at end of file