You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/08/14 20:18:09 UTC
[1/2] activemq-artemis git commit: ARTEMIS-1348 Support LVQ for AMQP
Repository: activemq-artemis
Updated Branches:
refs/heads/master 69e345f29 -> 9e624ba58
ARTEMIS-1348 Support LVQ for AMQP
Add support for LVQ, using the same property key as core "_AMQ_LVQ_NAME"
Add test case for AMQP LVQ.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/26752a7a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/26752a7a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/26752a7a
Branch: refs/heads/master
Commit: 26752a7aafa5651e41abf23ac550c6c09bb08287
Parents: 69e345f
Author: Michael Andre Pearce <Mi...@me.com>
Authored: Fri Aug 11 21:27:49 2017 +0100
Committer: Michael Andre Pearce <Mi...@me.com>
Committed: Sat Aug 12 23:43:37 2017 +0100
----------------------------------------------------------------------
.../protocol/amqp/broker/AMQPMessage.java | 6 +
.../integration/amqp/JMSClientTestSupport.java | 53 +++++++
.../tests/integration/amqp/JMSLVQTest.java | 152 +++++++++++++++++++
3 files changed, 211 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26752a7a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index db39c06..b475208 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -62,6 +62,7 @@ import io.netty.buffer.Unpooled;
// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
public class AMQPMessage extends RefCountMessage {
+ public static final String HDR_LAST_VALUE_NAME = org.apache.activemq.artemis.api.core.Message.HDR_LAST_VALUE_NAME.toString();
public static final int DEFAULT_MESSAGE_PRIORITY = 4;
public static final int MAX_MESSAGE_PRIORITY = 9;
@@ -1001,6 +1002,11 @@ public class AMQPMessage extends RefCountMessage {
}
@Override
+ public SimpleString getLastValueProperty() {
+ return getSimpleStringProperty(HDR_LAST_VALUE_NAME);
+ }
+
+ @Override
public SimpleString getReplyTo() {
if (getProperties() != null) {
return SimpleString.toSimpleString(getProperties().getReplyTo());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26752a7a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
index 3f96711..190dd78 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
@@ -23,6 +23,7 @@ import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
+import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.jboss.logging.Logger;
@@ -206,4 +207,56 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport {
return connection;
}
+
+ protected String getBrokerOpenWireJMSConnectionString() {
+
+ try {
+ int port = AMQP_PORT;
+
+ String uri = null;
+
+ if (isUseSSL()) {
+ uri = "tcp://127.0.0.1:" + port;
+ } else {
+ uri = "tcp://127.0.0.1:" + port;
+ }
+
+ if (!getJmsConnectionURIOptions().isEmpty()) {
+ uri = uri + "?" + getJmsConnectionURIOptions();
+ } else {
+ uri = uri + "?wireFormat.cacheEnabled=true";
+ }
+
+ return uri;
+ } catch (Exception e) {
+ throw new RuntimeException();
+ }
+ }
+
+ protected Connection createOpenWireConnection() throws JMSException {
+ return createCoreConnection(getBrokerOpenWireJMSConnectionString(), null, null, null, true);
+ }
+
+ private Connection createOpenWireConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionString);
+
+ Connection connection = trackJMSConnection(factory.createConnection(username, password));
+
+ connection.setExceptionListener(new ExceptionListener() {
+ @Override
+ public void onException(JMSException exception) {
+ exception.printStackTrace();
+ }
+ });
+
+ if (clientId != null && !clientId.isEmpty()) {
+ connection.setClientID(clientId);
+ }
+
+ if (start) {
+ connection.start();
+ }
+
+ return connection;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/26752a7a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java
new file mode 100644
index 0000000..b634a39
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import org.junit.Test;
+
+public class JMSLVQTest extends JMSClientTestSupport {
+
+ private static final String LVQ_QUEUE_NAME = "LVQ";
+
+ @Override
+ protected String getConfiguredProtocols() {
+ return "AMQP,OPENWIRE,CORE";
+ }
+
+ @Override
+ protected void addConfiguration(ActiveMQServer server) {
+ server.getAddressSettingsRepository().addMatch(LVQ_QUEUE_NAME, new AddressSettings().setLastValueQueue(true));
+ }
+ @Override
+ protected void createAddressAndQueues(ActiveMQServer server) throws Exception {
+ super.createAddressAndQueues(server);
+ server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(LVQ_QUEUE_NAME), RoutingType.ANYCAST));
+ server.createQueue(SimpleString.toSimpleString(LVQ_QUEUE_NAME), RoutingType.ANYCAST, SimpleString.toSimpleString("LVQ"), null, true, false, -1, false, true);
+ }
+
+
+ @Test
+ public void testLVQAMQPProducerAMQPConsumer() throws Exception {
+ Connection producerConnection = createConnection();
+ Connection consumerConnection = createConnection();
+ testLVQ(producerConnection, consumerConnection);
+ }
+
+ @Test
+ public void testLVQCoreProducerCoreConsumer() throws Exception {
+ Connection producerConnection = createCoreConnection();
+ Connection consumerConnection = createCoreConnection();
+ testLVQ(producerConnection, consumerConnection);
+ }
+
+ @Test
+ public void testLVQCoreProducerAMQPConsumer() throws Exception {
+ Connection producerConnection = createCoreConnection();
+ Connection consumerConnection = createConnection();
+ testLVQ(producerConnection, consumerConnection);
+ }
+
+ @Test
+ public void testLVQAMQPProducerCoreConsumer() throws Exception {
+ Connection producerConnection = createConnection();
+ Connection consumerConnection = createCoreConnection();
+ testLVQ(producerConnection, consumerConnection);
+ }
+
+ @Test
+ public void testLVQOpenWireProducerOpenWireConsumer() throws Exception {
+ Connection producerConnection = createOpenWireConnection();
+ Connection consumerConnection = createOpenWireConnection();
+ testLVQ(producerConnection, consumerConnection);
+ }
+
+ @Test
+ public void testLVQCoreProducerOpenWireConsumer() throws Exception {
+ Connection producerConnection = createCoreConnection();
+ Connection consumerConnection = createOpenWireConnection();
+ testLVQ(producerConnection, consumerConnection);
+ }
+
+ @Test
+ public void testLVQOpenWireProducerCoreConsumer() throws Exception {
+ Connection producerConnection = createOpenWireConnection();
+ Connection consumerConnection = createCoreConnection();
+ testLVQ(producerConnection, consumerConnection);
+ }
+
+ @Test
+ public void testLVQAMQPProducerOpenWireConsumer() throws Exception {
+ Connection producerConnection = createConnection();
+ Connection consumerConnection = createOpenWireConnection();
+ testLVQ(producerConnection, consumerConnection);
+ }
+
+ @Test
+ public void testLVQOpenWireProducerAMQPConsumer() throws Exception {
+ Connection producerConnection = createOpenWireConnection();
+ Connection consumerConnection = createConnection();
+ testLVQ(producerConnection, consumerConnection);
+ }
+
+ public void testLVQ(Connection producerConnection, Connection consumerConnection) throws Exception {
+
+ try {
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue1 = producerSession.createQueue(LVQ_QUEUE_NAME);
+ MessageProducer p = producerSession.createProducer(null);
+
+ TextMessage message1 = producerSession.createTextMessage();
+ message1.setStringProperty(AMQPMessage.HDR_LAST_VALUE_NAME, "KEY");
+ message1.setText("hello");
+ p.send(queue1, message1);
+
+ TextMessage message2 = producerSession.createTextMessage();
+ message2.setStringProperty(AMQPMessage.HDR_LAST_VALUE_NAME, "KEY");
+ message2.setText("how are you");
+ p.send(queue1, message2);
+
+
+ Session consumerSession = consumerConnection.createSession();
+ Queue consumerQueue = consumerSession.createQueue(LVQ_QUEUE_NAME);
+ MessageConsumer consumer = consumerSession.createConsumer(consumerQueue);
+ Message msg = consumer.receive(1000);
+ assertNotNull(msg);
+ assertEquals("KEY", msg.getStringProperty(AMQPMessage.HDR_LAST_VALUE_NAME));
+ assertTrue(msg instanceof TextMessage);
+ assertEquals("how are you", ((TextMessage)msg).getText());
+ consumer.close();
+
+ } finally {
+ producerConnection.close();
+ consumerConnection.close();
+ }
+ }
+}
\ No newline at end of file
[2/2] activemq-artemis git commit: This closes #1461
Posted by cl...@apache.org.
This closes #1461
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9e624ba5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9e624ba5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9e624ba5
Branch: refs/heads/master
Commit: 9e624ba58109ae04c97a40cb2aecc1d4e71e9d04
Parents: 69e345f 26752a7
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Aug 14 16:17:54 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Aug 14 16:17:54 2017 -0400
----------------------------------------------------------------------
.../protocol/amqp/broker/AMQPMessage.java | 6 +
.../integration/amqp/JMSClientTestSupport.java | 53 +++++++
.../tests/integration/amqp/JMSLVQTest.java | 152 +++++++++++++++++++
3 files changed, 211 insertions(+)
----------------------------------------------------------------------