You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/09/22 00:21:27 UTC
[1/3] activemq-artemis git commit: ARTEMIS-738 Improving TX support
on AMQP
Repository: activemq-artemis
Updated Branches:
refs/heads/master 5ea53c48e -> e790c7858
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
new file mode 100644
index 0000000..e84534f
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
@@ -0,0 +1,625 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test various aspects of Transaction support.
+ */
+public class AmqpTransactionTest extends AmqpClientTestSupport {
+
+ @Before
+ public void createQueue() throws Exception {
+ server.createQueue(SimpleString.toSimpleString(getTestName()), SimpleString.toSimpleString(getTestName()), null, true, false);
+ }
+
+ @Test(timeout = 30000)
+ public void testBeginAndCommitTransaction() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+ assertNotNull(session);
+
+ session.begin();
+ assertTrue(session.isInTransaction());
+ session.commit();
+
+ connection.close();
+ }
+
+ @Test(timeout = 30000)
+ public void testBeginAndRollbackTransaction() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+ assertNotNull(session);
+
+ session.begin();
+ assertTrue(session.isInTransaction());
+ session.rollback();
+
+ connection.close();
+
+ System.err.println("Closed");
+ }
+
+ @Test(timeout = 60000)
+ public void testSendMessageToQueueWithCommit() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender(getTestName());
+ final Queue queue = getProxyToQueue(getTestName());
+
+ session.begin();
+
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+ sender.send(message);
+
+ assertEquals(0, queue.getMessageCount());
+
+ session.commit();
+
+ assertEquals(1, queue.getMessageCount());
+
+ sender.close();
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testSendMessageToQueueWithRollback() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender(getTestName());
+ final Queue queue = getProxyToQueue(getTestName());
+
+ session.begin();
+
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+ sender.send(message);
+
+ assertEquals(0, queue.getMessageCount());
+
+ session.rollback();
+
+ assertEquals(0, queue.getMessageCount());
+
+ sender.close();
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testReceiveMessageWithCommit() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender(getTestName());
+ final Queue queue = getProxyToQueue(getTestName());
+
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+ sender.send(message);
+
+ assertEquals(1, queue.getMessageCount());
+
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+
+ session.begin();
+
+ receiver.flow(1);
+ AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(received);
+ received.accept();
+
+ session.commit();
+
+ assertEquals(0, queue.getMessageCount());
+
+ sender.close();
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testReceiveAfterConnectionClose() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender(getTestName());
+ final Queue queue = getProxyToQueue(getTestName());
+
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+ sender.send(message);
+
+ assertEquals(1, queue.getMessageCount());
+
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+
+ session.begin();
+
+ receiver.flow(1);
+ AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(received);
+ received.accept();
+
+ // this will force a rollback on the TX (It should at least)
+ connection.close();
+
+ connection = addConnection(client.connect());
+ session = connection.createSession();
+ receiver = session.createReceiver(getTestName());
+ session.begin();
+ receiver.flow(1);
+
+ received = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(received);
+ received.accept();
+
+ session.commit();
+
+ assertEquals(0, queue.getMessageCount());
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testReceiveMessageWithRollback() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender(getTestName());
+ final Queue queue = getProxyToQueue(getTestName());
+
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+ sender.send(message);
+
+ assertEquals(1, queue.getMessageCount());
+
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+
+ session.begin();
+
+ receiver.flow(1);
+ AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(received);
+ received.accept();
+
+ session.rollback();
+
+ assertEquals(1, queue.getMessageCount());
+
+ sender.close();
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testMultipleSessionReceiversInSingleTXNWithCommit() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+
+ // Load up the Queue with some messages
+ {
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender(getTestName());
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+ sender.send(message);
+ sender.send(message);
+ sender.send(message);
+ sender.close();
+ }
+
+ // Root TXN session controls all TXN send lifetimes.
+ AmqpSession txnSession = connection.createSession();
+
+ // Create some sender sessions
+ AmqpSession session1 = connection.createSession();
+ AmqpSession session2 = connection.createSession();
+ AmqpSession session3 = connection.createSession();
+
+ // Sender linked to each session
+ AmqpReceiver receiver1 = session1.createReceiver(getTestName());
+ AmqpReceiver receiver2 = session2.createReceiver(getTestName());
+ AmqpReceiver receiver3 = session3.createReceiver(getTestName());
+
+ final Queue queue = getProxyToQueue(getTestName());
+ assertEquals(3, queue.getMessageCount());
+
+ // Begin the transaction that all senders will operate in.
+ txnSession.begin();
+
+ assertTrue(txnSession.isInTransaction());
+
+ receiver1.flow(1);
+ receiver2.flow(1);
+ receiver3.flow(1);
+
+ AmqpMessage message1 = receiver1.receive(5, TimeUnit.SECONDS);
+ AmqpMessage message2 = receiver2.receive(5, TimeUnit.SECONDS);
+ AmqpMessage message3 = receiver3.receive(5, TimeUnit.SECONDS);
+
+ message1.accept(txnSession);
+ message2.accept(txnSession);
+ message3.accept(txnSession);
+
+ assertEquals(3, queue.getMessageCount());
+
+ txnSession.commit();
+
+ assertEquals(0, queue.getMessageCount());
+ }
+
+ @Test(timeout = 60000)
+ public void testMultipleSessionReceiversInSingleTXNWithRollback() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+
+ // Load up the Queue with some messages
+ {
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender(getTestName());
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+ sender.send(message);
+ sender.send(message);
+ sender.send(message);
+ sender.close();
+ }
+
+ // Root TXN session controls all TXN send lifetimes.
+ AmqpSession txnSession = connection.createSession();
+
+ // Create some sender sessions
+ AmqpSession session1 = connection.createSession();
+ AmqpSession session2 = connection.createSession();
+ AmqpSession session3 = connection.createSession();
+
+ // Sender linked to each session
+ AmqpReceiver receiver1 = session1.createReceiver(getTestName());
+ AmqpReceiver receiver2 = session2.createReceiver(getTestName());
+ AmqpReceiver receiver3 = session3.createReceiver(getTestName());
+
+ final Queue queue = getProxyToQueue(getTestName());
+ assertEquals(3, queue.getMessageCount());
+
+ // Begin the transaction that all senders will operate in.
+ txnSession.begin();
+
+ assertTrue(txnSession.isInTransaction());
+
+ receiver1.flow(1);
+ receiver2.flow(1);
+ receiver3.flow(1);
+
+ AmqpMessage message1 = receiver1.receive(5, TimeUnit.SECONDS);
+ AmqpMessage message2 = receiver2.receive(5, TimeUnit.SECONDS);
+ AmqpMessage message3 = receiver3.receive(5, TimeUnit.SECONDS);
+
+ message1.accept(txnSession);
+ message2.accept(txnSession);
+ message3.accept(txnSession);
+
+ assertEquals(3, queue.getMessageCount());
+
+ txnSession.rollback();
+
+ assertEquals(3, queue.getMessageCount());
+ }
+
+ @Test(timeout = 60000)
+ public void testMultipleSessionSendersInSingleTXNWithCommit() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+
+ // Root TXN session controls all TXN send lifetimes.
+ AmqpSession txnSession = connection.createSession();
+
+ // Create some sender sessions
+ AmqpSession session1 = connection.createSession();
+ AmqpSession session2 = connection.createSession();
+ AmqpSession session3 = connection.createSession();
+
+ // Sender linked to each session
+ AmqpSender sender1 = session1.createSender(getTestName());
+ AmqpSender sender2 = session2.createSender(getTestName());
+ AmqpSender sender3 = session3.createSender(getTestName());
+
+ final Queue queue = getProxyToQueue(getTestName());
+ assertEquals(0, queue.getMessageCount());
+
+ // Begin the transaction that all senders will operate in.
+ txnSession.begin();
+
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+
+ assertTrue(txnSession.isInTransaction());
+
+ sender1.send(message, txnSession.getTransactionId());
+ sender2.send(message, txnSession.getTransactionId());
+ sender3.send(message, txnSession.getTransactionId());
+
+ assertEquals(0, queue.getMessageCount());
+
+ txnSession.commit();
+
+ assertEquals(3, queue.getMessageCount());
+ }
+
+ @Test(timeout = 60000)
+ public void testMultipleSessionSendersInSingleTXNWithRollback() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+
+ // Root TXN session controls all TXN send lifetimes.
+ AmqpSession txnSession = connection.createSession();
+
+ // Create some sender sessions
+ AmqpSession session1 = connection.createSession();
+ AmqpSession session2 = connection.createSession();
+ AmqpSession session3 = connection.createSession();
+
+ // Sender linked to each session
+ AmqpSender sender1 = session1.createSender(getTestName());
+ AmqpSender sender2 = session2.createSender(getTestName());
+ AmqpSender sender3 = session3.createSender(getTestName());
+
+ final Queue queue = getProxyToQueue(getTestName());
+ assertEquals(0, queue.getMessageCount());
+
+ // Begin the transaction that all senders will operate in.
+ txnSession.begin();
+
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+
+ assertTrue(txnSession.isInTransaction());
+
+ sender1.send(message, txnSession.getTransactionId());
+ sender2.send(message, txnSession.getTransactionId());
+ sender3.send(message, txnSession.getTransactionId());
+
+ assertEquals(0, queue.getMessageCount());
+
+ txnSession.rollback();
+
+ assertEquals(0, queue.getMessageCount());
+ }
+
+ //----- Tests Ported from AmqpNetLite client -----------------------------//
+
+ @Test(timeout = 60000)
+ public void testSendersCommitAndRollbackWithMultipleSessionsInSingleTX() throws Exception {
+ final int NUM_MESSAGES = 5;
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+
+ // Root TXN session controls all TXN send lifetimes.
+ AmqpSession txnSession = connection.createSession();
+
+ // Normal Session which won't create an TXN itself
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender(getTestName());
+
+ // Commit TXN work from a sender.
+ txnSession.begin();
+ for (int i = 0; i < NUM_MESSAGES; ++i) {
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+ sender.send(message, txnSession.getTransactionId());
+ }
+ txnSession.commit();
+
+ // Rollback an additional batch of TXN work from a sender.
+ txnSession.begin();
+ for (int i = 0; i < NUM_MESSAGES; ++i) {
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+ sender.send(message, txnSession.getTransactionId());
+ }
+ txnSession.rollback();
+
+ // Commit more TXN work from a sender.
+ txnSession.begin();
+ for (int i = 0; i < NUM_MESSAGES; ++i) {
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+ sender.send(message, txnSession.getTransactionId());
+ }
+ txnSession.commit();
+
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+ receiver.flow(NUM_MESSAGES * 2);
+ for (int i = 0; i < NUM_MESSAGES * 2; ++i) {
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(message);
+ message.accept(txnSession);
+ }
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testReceiversCommitAndRollbackWithMultipleSessionsInSingleTX() throws Exception {
+ final int NUM_MESSAGES = 10;
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+
+ // Root TXN session controls all TXN send lifetimes.
+ AmqpSession txnSession = connection.createSession();
+
+ // Normal Session which won't create an TXN itself
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender(getTestName());
+
+ for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+ message.setApplicationProperty("msgId", i);
+ sender.send(message, txnSession.getTransactionId());
+ }
+
+ // Read all messages from the Queue, do not accept them yet.
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+ ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
+ receiver.flow((NUM_MESSAGES + 2) * 2);
+ for (int i = 0; i < NUM_MESSAGES; ++i) {
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(message);
+ messages.add(message);
+ }
+
+ // Commit half the consumed messages
+ txnSession.begin();
+ for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
+ messages.get(i).accept(txnSession);
+ }
+ txnSession.commit();
+
+ // Rollback the other half the consumed messages
+ txnSession.begin();
+ for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+ messages.get(i).accept(txnSession);
+ }
+ txnSession.rollback();
+
+ {
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(message);
+ assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+ message.release();
+ }
+
+ // Commit the other half the consumed messages
+ // This is a variation from the .NET client tests which doesn't settle the
+ // messages in the TX until commit is called but on ActiveMQ they will be
+ // redispatched regardless and not stay in the acquired state.
+ txnSession.begin();
+ for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(message);
+ message.accept();
+ }
+ txnSession.commit();
+
+ // The final message should still be pending.
+ {
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ receiver.flow(1);
+ assertNotNull(message);
+ assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+ message.release();
+ }
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testCommitAndRollbackWithMultipleSessionsInSingleTX() throws Exception {
+ final int NUM_MESSAGES = 10;
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+
+ // Root TXN session controls all TXN send lifetimes.
+ AmqpSession txnSession = connection.createSession();
+
+ // Normal Session which won't create an TXN itself
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender(getTestName());
+
+ for (int i = 0; i < NUM_MESSAGES; ++i) {
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+ message.setApplicationProperty("msgId", i);
+ sender.send(message, txnSession.getTransactionId());
+ }
+
+ // Read all messages from the Queue, do not accept them yet.
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+ receiver.flow(2);
+ AmqpMessage message1 = receiver.receive(5, TimeUnit.SECONDS);
+ AmqpMessage message2 = receiver.receive(5, TimeUnit.SECONDS);
+
+ // Accept the first one in a TXN and send a new message in that TXN as well
+ txnSession.begin();
+ {
+ message1.accept(txnSession);
+
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+ message.setApplicationProperty("msgId", NUM_MESSAGES);
+
+ sender.send(message, txnSession.getTransactionId());
+ }
+ txnSession.commit();
+
+ // Accept the second one in a TXN and send a new message in that TXN as well but rollback
+ txnSession.begin();
+ {
+ message2.accept(txnSession);
+
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+ message.setApplicationProperty("msgId", NUM_MESSAGES + 1);
+ sender.send(message, txnSession.getTransactionId());
+ }
+ txnSession.rollback();
+
+ // Variation here from .NET code, the client settles the accepted message where
+ // the .NET client does not and instead releases here to have it redelivered.
+
+ receiver.flow(NUM_MESSAGES);
+ for (int i = 1; i <= NUM_MESSAGES; ++i) {
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(message);
+ assertEquals(i, message.getApplicationProperty("msgId"));
+ message.accept();
+ }
+
+ // Should be nothing left.
+ assertNull(receiver.receive(1, TimeUnit.SECONDS));
+
+ connection.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
index 193b46b..984459e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
@@ -815,7 +815,8 @@ public class ProtonTest extends ProtonTestBase {
request.setText("[]");
sender.send(request);
- AmqpMessage response = receiver.receive();
+ AmqpMessage response = receiver.receive(50, TimeUnit.SECONDS);
+ Assert.assertNotNull(response);
assertNotNull(response);
Object section = response.getWrappedMessage().getBody();
assertTrue(section instanceof AmqpValue);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestBase.java
index 0acd4ae..cec59da 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestBase.java
@@ -41,7 +41,6 @@ public class ProtonTestBase extends ActiveMQTestBase {
@Before
public void setUp() throws Exception {
super.setUp();
- disableCheckThread();
server = this.createServer(true, true);
HashMap<String, Object> params = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestForHeader.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestForHeader.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestForHeader.java
index b4a6a34..d23bd5b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestForHeader.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestForHeader.java
@@ -43,7 +43,6 @@ public class ProtonTestForHeader extends ActiveMQTestBase {
@Before
public void setUp() throws Exception {
super.setUp();
- disableCheckThread();
server = this.createServer(true, true);
HashMap<String, Object> params = new HashMap<>();
params.put(TransportConstants.PORT_PROP_NAME, "5672");
@@ -61,8 +60,6 @@ public class ProtonTestForHeader extends ActiveMQTestBase {
@After
public void tearDown() throws Exception {
try {
- Thread.sleep(250);
-
server.stop();
}
finally {
[2/3] activemq-artemis git commit: ARTEMIS-738 Improving TX support
on AMQP
Posted by jb...@apache.org.
ARTEMIS-738 Improving TX support on AMQP
https://issues.apache.org/jira/browse/ARTEMIS-738
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/113c0c93
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/113c0c93
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/113c0c93
Branch: refs/heads/master
Commit: 113c0c9360197ef3467f3907a604fa527247c858
Parents: 5ea53c4
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Sep 15 15:28:07 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Sep 21 18:14:38 2016 -0400
----------------------------------------------------------------------
.../plug/ActiveMQProtonConnectionCallback.java | 71 ++-
.../plug/ProtonSessionIntegrationCallback.java | 84 ++-
.../org/proton/plug/AMQPConnectionCallback.java | 11 +
.../org/proton/plug/AMQPSessionCallback.java | 18 +-
.../context/AbstractProtonSessionContext.java | 1 -
.../plug/context/ProtonTransactionHandler.java | 8 +-
.../server/ProtonServerReceiverContext.java | 10 +-
.../server/ProtonServerSenderContext.java | 7 +-
.../server/ProtonServerSessionContext.java | 6 +
.../ActiveMQAMQPProtocolMessageBundle.java | 3 +
.../context/AbstractConnectionContextTest.java | 17 +
.../proton/plug/test/invm/ProtonINVMSPI.java | 33 +
.../plug/test/minimalclient/AMQPClientSPI.java | 18 +
.../minimalserver/MinimalConnectionSPI.java | 18 +
.../test/minimalserver/MinimalSessionSPI.java | 22 +-
.../artemis/core/server/ServerSession.java | 2 +
.../core/server/impl/ServerSessionImpl.java | 18 +-
.../transport/amqp/client/AmqpConnection.java | 9 +-
.../transport/amqp/client/AmqpMessage.java | 129 +++-
.../transport/amqp/client/AmqpReceiver.java | 28 +
.../transport/amqp/client/AmqpSender.java | 27 +-
.../transport/amqp/client/AmqpSession.java | 12 +-
.../integration/amqp/AmqpClientTestSupport.java | 194 ++++++
.../integration/amqp/AmqpTransactionTest.java | 625 +++++++++++++++++++
.../tests/integration/proton/ProtonTest.java | 3 +-
.../integration/proton/ProtonTestBase.java | 1 -
.../integration/proton/ProtonTestForHeader.java | 3 -
27 files changed, 1263 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
index ea66b01..d5b2ff7 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
@@ -21,6 +21,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -37,8 +39,13 @@ import org.apache.activemq.artemis.core.protocol.proton.sasl.ActiveMQPlainSASL;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.jboss.logging.Logger;
@@ -47,7 +54,9 @@ import org.proton.plug.AMQPConnectionContext;
import org.proton.plug.AMQPSessionCallback;
import org.proton.plug.SASLResult;
import org.proton.plug.ServerSASL;
+import org.proton.plug.exceptions.ActiveMQAMQPException;
import org.proton.plug.handler.ExtCapability;
+import org.proton.plug.logger.ActiveMQAMQPProtocolMessageBundle;
import org.proton.plug.sasl.AnonymousServerSASL;
import static org.proton.plug.AmqpSupport.CONTAINER_ID;
@@ -55,8 +64,11 @@ import static org.proton.plug.AmqpSupport.INVALID_FIELD;
import static org.proton.plug.context.AbstractConnectionContext.CONNECTION_OPEN_FAILED;
public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback, FailureListener, CloseListener {
+ private static final Logger logger = Logger.getLogger(ActiveMQProtonConnectionCallback.class);
private static final List<String> connectedContainers = Collections.synchronizedList(new ArrayList());
+ private ConcurrentMap<XidImpl, Transaction> transactions = new ConcurrentHashMap<>();
+
private static final Logger log = Logger.getLogger(ActiveMQProtonConnectionCallback.class);
private final ProtonProtocolManager manager;
@@ -117,11 +129,23 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback,
@Override
public void close() {
- if (registeredConnectionId.getAndSet(false)) {
- server.removeClientConnection(remoteContainerId);
+ try {
+ if (registeredConnectionId.getAndSet(false)) {
+ server.removeClientConnection(remoteContainerId);
+ }
+ connection.close();
+ amqpConnection.close();
+ }
+ finally {
+ for (Transaction tx : transactions.values()) {
+ try {
+ tx.rollback();
+ }
+ catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
}
- connection.close();
- amqpConnection.close();
}
public Executor getExeuctor() {
@@ -219,4 +243,43 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback,
public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
close();
}
+
+ @Override
+ public Binary newTransaction() {
+ XidImpl xid = newXID();
+ Transaction transaction = new TransactionImpl(xid, server.getStorageManager(), -1);
+ transactions.put(xid, transaction);
+ return new Binary(xid.getGlobalTransactionId());
+ }
+
+ @Override
+ public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
+ XidImpl xid = newXID(txid.getArray());
+ Transaction tx = transactions.get(xid);
+
+ if (tx == null) {
+ throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.txNotFound(xid.toString());
+ }
+
+ return tx;
+ }
+
+ @Override
+ public void removeTransaction(Binary txid) {
+ XidImpl xid = newXID(txid.getArray());
+ transactions.remove(xid);
+ }
+
+
+ protected XidImpl newXID() {
+ return newXID(UUIDGenerator.getInstance().generateStringUUID().getBytes());
+ }
+
+ protected XidImpl newXID(byte[] bytes) {
+ return new XidImpl("amqp".getBytes(), 1, bytes);
+ }
+
+
+
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index 153d033..da9dd9c 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -42,7 +42,6 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
-import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.activemq.artemis.utils.SelectorTranslator;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
@@ -61,6 +60,7 @@ import org.proton.plug.AMQPSessionCallback;
import org.proton.plug.AMQPSessionContext;
import org.proton.plug.SASLResult;
import org.proton.plug.context.ProtonPlugSender;
+import org.proton.plug.exceptions.ActiveMQAMQPException;
import org.proton.plug.exceptions.ActiveMQAMQPInternalErrorException;
import org.proton.plug.exceptions.ActiveMQAMQPResourceLimitExceededException;
import org.proton.plug.sasl.PlainSASLResult;
@@ -282,46 +282,11 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
}
@Override
- public Binary getCurrentTXID() {
- Transaction tx = serverSession.getCurrentTransaction();
- if (tx == null) {
- tx = serverSession.newTransaction();
- serverSession.resetTX(tx);
- }
- return new Binary(ByteUtil.longToBytes(tx.getID()));
- }
-
- @Override
public String tempQueueName() {
return UUIDGenerator.getInstance().generateStringUUID();
}
@Override
- public void commitCurrentTX() throws Exception {
- recoverContext();
- try {
- serverSession.commit();
- }
- finally {
- resetContext();
- }
- }
-
- @Override
- public void rollbackCurrentTX(boolean lastMessageDelivered) throws Exception {
- //need to check here as this can be called if init fails
- if (serverSession != null) {
- recoverContext();
- try {
- serverSession.rollback(lastMessageDelivered);
- }
- finally {
- resetContext();
- }
- }
- }
-
- @Override
public void close() throws Exception {
//need to check here as this can be called if init fails
if (serverSession != null) {
@@ -336,10 +301,13 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
}
@Override
- public void ack(Object brokerConsumer, Object message) throws Exception {
+ public void ack(Transaction transaction, Object brokerConsumer, Object message) throws Exception {
+ if (transaction == null) {
+ transaction = serverSession.getCurrentTransaction();
+ }
recoverContext();
try {
- ((ServerConsumer) brokerConsumer).individualAcknowledge(serverSession.getCurrentTransaction(), ((ServerMessage) message).getMessageID());
+ ((ServerConsumer) brokerConsumer).individualAcknowledge(transaction, ((ServerMessage) message).getMessageID());
}
finally {
resetContext();
@@ -363,7 +331,8 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
}
@Override
- public void serverSend(final Receiver receiver,
+ public void serverSend(final Transaction transaction,
+ final Receiver receiver,
final Delivery delivery,
String address,
int messageFormat,
@@ -382,10 +351,10 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
if (store.isRejectingMessages()) {
// We drop pre-settled messages (and abort any associated Tx)
if (delivery.remotelySettled()) {
- if (serverSession.getCurrentTransaction() != null) {
+ if (transaction != null) {
String amqpAddress = delivery.getLink().getTarget().getAddress();
ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address is full: " + amqpAddress);
- serverSession.getCurrentTransaction().markAsRollbackOnly(e);
+ transaction.markAsRollbackOnly(e);
}
}
else {
@@ -393,7 +362,7 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
}
}
else {
- serverSend(message, delivery, receiver);
+ serverSend(transaction, message, delivery, receiver);
}
}
@@ -406,11 +375,11 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
connection.flush();
}
- private void serverSend(final ServerMessage message, final Delivery delivery, final Receiver receiver) throws Exception {
+ private void serverSend(final Transaction transaction, final ServerMessage message, final Delivery delivery, final Receiver receiver) throws Exception {
try {
message.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString(), receiver.getSession().getConnection().getRemoteContainer());
- serverSession.send(message, false);
+ serverSession.send(transaction, message, false, false);
// FIXME Potential race here...
manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() {
@@ -543,4 +512,31 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
return false;
}
}
+
+ @Override
+ public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
+ return protonSPI.getTransaction(txid);
+ }
+
+ @Override
+ public Binary newTransaction() {
+ return protonSPI.newTransaction();
+ }
+
+
+ @Override
+ public void commitTX(Binary txid) throws Exception {
+ Transaction tx = protonSPI.getTransaction(txid);
+ tx.commit(true);
+ protonSPI.removeTransaction(txid);
+ }
+
+ @Override
+ public void rollbackTX(Binary txid, boolean lastMessageReceived) throws Exception {
+ Transaction tx = protonSPI.getTransaction(txid);
+ tx.rollback();
+ protonSPI.removeTransaction(txid);
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
index 15a3246..f4ed64c 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
@@ -17,7 +17,10 @@
package org.proton.plug;
import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.engine.Connection;
+import org.proton.plug.exceptions.ActiveMQAMQPException;
public interface AMQPConnectionCallback {
@@ -44,4 +47,12 @@ public interface AMQPConnectionCallback {
void sendSASLSupported();
boolean validateConnection(Connection connection, SASLResult saslResult);
+
+ Binary newTransaction();
+
+ Transaction getTransaction(Binary txid) throws ActiveMQAMQPException;
+
+ void removeTransaction(Binary txid);
+
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
index b6acd3f..5f3b6dd 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
@@ -18,11 +18,13 @@ package org.proton.plug;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.ProtonJMessage;
import org.proton.plug.context.ProtonPlugSender;
+import org.proton.plug.exceptions.ActiveMQAMQPException;
/**
* These are methods where the Proton Plug component will call your server
@@ -67,17 +69,20 @@ public interface AMQPSessionCallback {
// This one can be a lot improved
ProtonJMessage encodeMessage(Object message, int deliveryCount) throws Exception;
- Binary getCurrentTXID();
-
String tempQueueName();
- void commitCurrentTX() throws Exception;
- void rollbackCurrentTX(boolean lastMessageReceived) throws Exception;
+ Transaction getTransaction(Binary txid) throws ActiveMQAMQPException;
+
+ Binary newTransaction();
+
+ void commitTX(Binary txid) throws Exception;
+
+ void rollbackTX(Binary txid, boolean lastMessageReceived) throws Exception;
void close() throws Exception;
- void ack(Object brokerConsumer, Object message) throws Exception;
+ void ack(Transaction transaction, Object brokerConsumer, Object message) throws Exception;
/**
* @param brokerConsumer
@@ -96,7 +101,8 @@ public interface AMQPSessionCallback {
* @param messageFormat
* @param messageEncoded a Heap Buffer ByteBuffer (safe to convert into byte[])
*/
- void serverSend(Receiver receiver,
+ void serverSend(Transaction transaction,
+ Receiver receiver,
Delivery delivery,
String address,
int messageFormat,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java
index 96f7413..5c0a626 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java
@@ -140,7 +140,6 @@ public abstract class AbstractProtonSessionContext extends ProtonInitializable i
senders.clear();
try {
if (sessionSPI != null) {
- sessionSPI.rollbackCurrentTX(false);
sessionSPI.close();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
index 597b5e4..263d3e6 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
@@ -72,7 +72,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
Object action = ((AmqpValue) msg.getBody()).getValue();
if (action instanceof Declare) {
- Binary txID = sessionSPI.getCurrentTXID();
+ Binary txID = sessionSPI.newTransaction();
Declared declared = new Declared();
declared.setTxnId(txID);
delivery.disposition(declared);
@@ -80,9 +80,11 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
}
else if (action instanceof Discharge) {
Discharge discharge = (Discharge) action;
+
+ Binary txID = discharge.getTxnId();
if (discharge.getFail()) {
try {
- sessionSPI.rollbackCurrentTX(true);
+ sessionSPI.rollbackTX(txID, true);
delivery.disposition(new Accepted());
}
catch (Exception e) {
@@ -91,7 +93,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
}
else {
try {
- sessionSPI.commitCurrentTX();
+ sessionSPI.commitTX(txID);
delivery.disposition(new Accepted());
}
catch (ActiveMQAMQPException amqpE) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
index c564a9e..173ff28 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
@@ -18,8 +18,10 @@ package org.proton.plug.context.server;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
@@ -130,7 +132,13 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext {
receiver.advance();
- sessionSPI.serverSend(receiver, delivery, address, delivery.getMessageFormat(), buffer);
+ Transaction tx = null;
+ if (delivery.getRemoteState() instanceof TransactionalState) {
+
+ TransactionalState txState = (TransactionalState) delivery.getRemoteState();
+ tx = this.sessionSPI.getTransaction(txState.getTxnId());
+ }
+ sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), buffer);
flow(maxCreditAllocation, minCreditRefresh);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
index 2d91f37..e9bd123 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
@@ -21,6 +21,7 @@ import java.util.Objects;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
@@ -339,7 +340,9 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
if (remoteState != null) {
// If we are transactional then we need ack if the msg has been accepted
if (remoteState instanceof TransactionalState) {
+
TransactionalState txState = (TransactionalState) remoteState;
+ Transaction tx = this.sessionSPI.getTransaction(txState.getTxnId());
if (txState.getOutcome() != null) {
Outcome outcome = txState.getOutcome();
if (outcome instanceof Accepted) {
@@ -353,7 +356,7 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
//we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order
// from dealer, a perf hit but a must
try {
- sessionSPI.ack(brokerConsumer, message);
+ sessionSPI.ack(tx, brokerConsumer, message);
}
catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
@@ -365,7 +368,7 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
//we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order
// from dealer, a perf hit but a must
try {
- sessionSPI.ack(brokerConsumer, message);
+ sessionSPI.ack(null, brokerConsumer, message);
}
catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSessionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSessionContext.java
index 46178a9..983fa4e 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSessionContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSessionContext.java
@@ -19,6 +19,7 @@ package org.proton.plug.context.server;
import java.util.HashMap;
import java.util.Map;
+import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Receiver;
@@ -60,6 +61,11 @@ public class ProtonServerSessionContext extends AbstractProtonSessionContext {
public void addTransactionHandler(Coordinator coordinator, Receiver receiver) {
ProtonTransactionHandler transactionHandler = new ProtonTransactionHandler(sessionSPI);
+
+ coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"),
+ Symbol.getSymbol("amqp:multi-txns-per-ssn"),
+ Symbol.getSymbol("amqp:multi-ssns-per-txn"));
+
receiver.setContext(transactionHandler);
receiver.open();
receiver.flow(100);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/logger/ActiveMQAMQPProtocolMessageBundle.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/logger/ActiveMQAMQPProtocolMessageBundle.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/logger/ActiveMQAMQPProtocolMessageBundle.java
index 8817310..576e61a 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/logger/ActiveMQAMQPProtocolMessageBundle.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/logger/ActiveMQAMQPProtocolMessageBundle.java
@@ -74,4 +74,7 @@ public interface ActiveMQAMQPProtocolMessageBundle {
@Message(id = 219013, value = "error committing coordinator: {0}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQAMQPIllegalStateException errorCommittingCoordinator(String message);
+ @Message(id = 219014, value = "Transaction not found: xid={0}", format = Message.Format.MESSAGE_FORMAT)
+ ActiveMQAMQPIllegalStateException txNotFound(String xidToString);
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
index da7b617..825b987 100644
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
+++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
@@ -20,7 +20,9 @@ import java.util.concurrent.Executors;
import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Session;
@@ -78,6 +80,21 @@ public class AbstractConnectionContextTest {
}
@Override
+ public Binary newTransaction() {
+ return null;
+ }
+
+ @Override
+ public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
+ return null;
+ }
+
+ @Override
+ public void removeTransaction(Binary txid) {
+
+ }
+
+ @Override
public void onTransport(ByteBuf bytes, AMQPConnectionContext connection) {
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
index 5de6e9d..a35e8ac 100644
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
+++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
@@ -20,7 +20,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.engine.Connection;
import org.jboss.logging.Logger;
import org.proton.plug.AMQPConnectionContext;
@@ -29,6 +31,7 @@ import org.proton.plug.AMQPSessionCallback;
import org.proton.plug.SASLResult;
import org.proton.plug.ServerSASL;
import org.proton.plug.context.server.ProtonServerConnectionContext;
+import org.proton.plug.exceptions.ActiveMQAMQPException;
import org.proton.plug.sasl.AnonymousServerSASL;
import org.proton.plug.sasl.ServerSASLPlain;
import org.proton.plug.test.minimalserver.MinimalSessionSPI;
@@ -132,6 +135,21 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
return null;
}
+ @Override
+ public Binary newTransaction() {
+ return null;
+ }
+
+ @Override
+ public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
+ return null;
+ }
+
+ @Override
+ public void removeTransaction(Binary txid) {
+
+ }
+
class ReturnSPI implements AMQPConnectionCallback {
@Override
@@ -140,6 +158,21 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
}
@Override
+ public Binary newTransaction() {
+ return null;
+ }
+
+ @Override
+ public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
+ return null;
+ }
+
+ @Override
+ public void removeTransaction(Binary txid) {
+
+ }
+
+ @Override
public ServerSASL[] getSASLMechnisms() {
return new ServerSASL[]{new AnonymousServerSASL(), new ServerSASLPlain()};
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
index be1571c..85e4c02 100644
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
+++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
@@ -22,6 +22,8 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.engine.Connection;
import org.jboss.logging.Logger;
import org.proton.plug.AMQPConnectionContext;
@@ -29,6 +31,7 @@ import org.proton.plug.AMQPConnectionCallback;
import org.proton.plug.AMQPSessionCallback;
import org.proton.plug.SASLResult;
import org.proton.plug.ServerSASL;
+import org.proton.plug.exceptions.ActiveMQAMQPException;
import org.proton.plug.sasl.AnonymousServerSASL;
import org.proton.plug.sasl.ServerSASLPlain;
import org.proton.plug.util.ByteUtil;
@@ -75,6 +78,21 @@ public class AMQPClientSPI implements AMQPConnectionCallback {
}
@Override
+ public Binary newTransaction() {
+ return null;
+ }
+
+ @Override
+ public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
+ return null;
+ }
+
+ @Override
+ public void removeTransaction(Binary txid) {
+
+ }
+
+ @Override
public boolean validateConnection(Connection connection, SASLResult saslResult) {
return true;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
index 1b9c919..6325ad7 100644
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
+++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
@@ -24,7 +24,9 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
+import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.engine.Connection;
import org.jboss.logging.Logger;
import org.proton.plug.AMQPConnectionContext;
@@ -32,6 +34,7 @@ import org.proton.plug.AMQPConnectionCallback;
import org.proton.plug.AMQPSessionCallback;
import org.proton.plug.SASLResult;
import org.proton.plug.ServerSASL;
+import org.proton.plug.exceptions.ActiveMQAMQPException;
import org.proton.plug.sasl.AnonymousServerSASL;
import org.proton.plug.sasl.ServerSASLPlain;
import org.proton.plug.util.ByteUtil;
@@ -88,6 +91,21 @@ public class MinimalConnectionSPI implements AMQPConnectionCallback {
}
@Override
+ public Binary newTransaction() {
+ return null;
+ }
+
+ @Override
+ public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
+ return null;
+ }
+
+ @Override
+ public void removeTransaction(Binary txid) {
+
+ }
+
+ @Override
public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) {
final int bufferSize = bytes.writerIndex();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
index f9a3533..d366c5b 100644
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
+++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
@@ -22,7 +22,10 @@ import java.util.concurrent.atomic.AtomicInteger;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
@@ -123,16 +126,23 @@ public class MinimalSessionSPI implements AMQPSessionCallback {
}
@Override
- public Binary getCurrentTXID() {
- return new Binary(new byte[]{1});
+ public Transaction getTransaction(Binary txid) {
+ return new TransactionImpl(new NullStorageManager());
}
@Override
- public void commitCurrentTX() {
+ public Binary newTransaction() {
+ return null;
+ }
+
+ @Override
+ public void commitTX(Binary txid) throws Exception {
+
}
@Override
- public void rollbackCurrentTX(boolean lastMessage) {
+ public void rollbackTX(Binary txid, boolean lastMessageReceived) throws Exception {
+
}
@Override
@@ -141,7 +151,7 @@ public class MinimalSessionSPI implements AMQPSessionCallback {
}
@Override
- public void ack(Object brokerConsumer, Object message) {
+ public void ack(Transaction tx, Object brokerConsumer, Object message) {
}
@@ -157,7 +167,7 @@ public class MinimalSessionSPI implements AMQPSessionCallback {
}
@Override
- public void serverSend(Receiver receiver, Delivery delivery, String address, int messageFormat, ByteBuf buffer) {
+ public void serverSend(Transaction tx, Receiver receiver, Delivery delivery, String address, int messageFormat, ByteBuf buffer) {
ProtonServerMessage serverMessage = new ProtonServerMessage();
serverMessage.decode(buffer.nioBuffer());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index 953de1f..3521d71 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -133,6 +133,8 @@ public interface ServerSession extends SecurityAuth {
void sendContinuations(int packetSize, long totalBodySize, byte[] body, boolean continues) throws Exception;
+ RoutingStatus send(Transaction tx, ServerMessage message, boolean direct, boolean noAutoCreateQueue) throws Exception;
+
RoutingStatus send(ServerMessage message, boolean direct, boolean noAutoCreateQueue) throws Exception;
RoutingStatus send(ServerMessage message, boolean direct) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index d20fa43..3ccfd16 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1256,6 +1256,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public RoutingStatus send(final ServerMessage message, final boolean direct, boolean noAutoCreateQueue) throws Exception {
+ return send(getCurrentTransaction(), message, direct, noAutoCreateQueue);
+ }
+
+ public RoutingStatus send(Transaction tx, final ServerMessage message, final boolean direct, boolean noAutoCreateQueue) throws Exception {
// If the protocol doesn't support flow control, we have no choice other than fail the communication
if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) {
@@ -1308,10 +1312,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
if (message.getAddress().equals(managementAddress)) {
// It's a management message
- handleManagementMessage(message, direct);
+ handleManagementMessage(tx, message, direct);
}
else {
- result = doSend(message, direct, noAutoCreateQueue);
+ result = doSend(tx, message, direct, noAutoCreateQueue);
}
return result;
}
@@ -1337,7 +1341,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
}
- doSend(currentLargeMessage, false, false);
+ doSend(tx, currentLargeMessage, false, false);
currentLargeMessage = null;
}
@@ -1526,7 +1530,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
started = s;
}
- private void handleManagementMessage(final ServerMessage message, final boolean direct) throws Exception {
+ private RoutingStatus handleManagementMessage(final Transaction tx, final ServerMessage message, final boolean direct) throws Exception {
try {
securityCheck(message.getAddress(), CheckType.MANAGE, this);
}
@@ -1544,8 +1548,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
if (replyTo != null) {
reply.setAddress(replyTo);
- doSend(reply, direct, false);
+ doSend(tx, reply, direct, false);
}
+
+ return RoutingStatus.OK;
}
private void doRollback(final boolean clientFailed,
@@ -1600,7 +1606,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
theTx.rollback();
}
- protected RoutingStatus doSend(final ServerMessage msg, final boolean direct, final boolean noAutoCreateQueue) throws Exception {
+ public RoutingStatus doSend(final Transaction tx, final ServerMessage msg, final boolean direct, final boolean noAutoCreateQueue) throws Exception {
RoutingStatus result = RoutingStatus.OK;
// check the user has write access to this address.
try {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
index 1454dd9..3e2c5d7 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
@@ -228,7 +228,14 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
}
}
- serializer.shutdown();
+ serializer.shutdownNow();
+ try {
+ if (!serializer.awaitTermination(10, TimeUnit.SECONDS)) {
+ LOG.warn("Serializer didn't shutdown cleanly");
+ }
+ }
+ catch (InterruptedException e) {
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index 320d174..060fc4e 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -60,7 +60,8 @@ public class AmqpMessage {
* Creates a new AmqpMessage that wraps the information necessary to handle
* an outgoing message.
*
- * @param message the Proton message that is to be sent.
+ * @param message
+ * the Proton message that is to be sent.
*/
public AmqpMessage(Message message) {
this(null, message, null);
@@ -70,9 +71,12 @@ public class AmqpMessage {
* Creates a new AmqpMessage that wraps the information necessary to handle
* an incoming delivery.
*
- * @param receiver the AmqpReceiver that received this message.
- * @param message the Proton message that was received.
- * @param delivery the Delivery instance that produced this message.
+ * @param receiver
+ * the AmqpReceiver that received this message.
+ * @param message
+ * the Proton message that was received.
+ * @param delivery
+ * the Delivery instance that produced this message.
*/
@SuppressWarnings("unchecked")
public AmqpMessage(AmqpReceiver receiver, Message message, Delivery delivery) {
@@ -136,10 +140,29 @@ public class AmqpMessage {
}
/**
+ * Accepts the message marking it as consumed on the remote peer.
+ *
+ * @param session
+ * The session that is used to manage acceptance of the message.
+ *
+ * @throws Exception if an error occurs during the accept.
+ */
+ public void accept(AmqpSession txnSession) throws Exception {
+ if (receiver == null) {
+ throw new IllegalStateException("Can't accept non-received message.");
+ }
+
+ receiver.accept(delivery, txnSession);
+ }
+
+ /**
* Marks the message as Modified, indicating whether it failed to deliver and is not deliverable here.
*
- * @param deliveryFailed indicates that the delivery failed for some reason.
- * @param undeliverableHere marks the delivery as not being able to be process by link it was sent to.
+ * @param deliveryFailed
+ * indicates that the delivery failed for some reason.
+ * @param undeliverableHere
+ * marks the delivery as not being able to be process by link it was sent to.
+ *
* @throws Exception if an error occurs during the process.
*/
public void modified(Boolean deliveryFailed, Boolean undeliverableHere) throws Exception {
@@ -166,9 +189,35 @@ public class AmqpMessage {
//----- Convenience methods for constructing outbound messages -----------//
/**
+ * Sets the address which is applied to the AMQP message To field in the message properties
+ *
+ * @param address
+ * The address that should be applied in the Message To field.
+ */
+ public void setAddress(String address) {
+ checkReadOnly();
+ lazyCreateProperties();
+ getWrappedMessage().setAddress(address);
+ }
+
+ /**
+ * Return the set address that was set in the Message To field.
+ *
+ * @return the set address String form or null if not set.
+ */
+ public String getAddress() {
+ if (message.getProperties() == null) {
+ return null;
+ }
+
+ return message.getProperties().getTo();
+ }
+
+ /**
* Sets the MessageId property on an outbound message using the provided String
*
- * @param messageId the String message ID value to set.
+ * @param messageId
+ * the String message ID value to set.
*/
public void setMessageId(String messageId) {
checkReadOnly();
@@ -207,7 +256,8 @@ public class AmqpMessage {
/**
* Sets the MessageId property on an outbound message using the provided value
*
- * @param messageId the message ID value to set.
+ * @param messageId
+ * the message ID value to set.
*/
public void setRawMessageId(Object messageId) {
checkReadOnly();
@@ -218,7 +268,8 @@ public class AmqpMessage {
/**
* Sets the CorrelationId property on an outbound message using the provided String
*
- * @param correlationId the String Correlation ID value to set.
+ * @param correlationId
+ * the String Correlation ID value to set.
*/
public void setCorrelationId(String correlationId) {
checkReadOnly();
@@ -257,7 +308,8 @@ public class AmqpMessage {
/**
* Sets the CorrelationId property on an outbound message using the provided value
*
- * @param correlationId the correlation ID value to set.
+ * @param correlationId
+ * the correlation ID value to set.
*/
public void setRawCorrelationId(Object correlationId) {
checkReadOnly();
@@ -268,7 +320,8 @@ public class AmqpMessage {
/**
* Sets the GroupId property on an outbound message using the provided String
*
- * @param groupId the String Group ID value to set.
+ * @param messageId
+ * the String Group ID value to set.
*/
public void setGroupId(String groupId) {
checkReadOnly();
@@ -293,7 +346,8 @@ public class AmqpMessage {
/**
* Sets the durable header on the outgoing message.
*
- * @param durable the boolean durable value to set.
+ * @param durable
+ * the boolean durable value to set.
*/
public void setDurable(boolean durable) {
checkReadOnly();
@@ -318,8 +372,10 @@ public class AmqpMessage {
/**
* Sets a given application property on an outbound message.
*
- * @param key the name to assign the new property.
- * @param value the value to set for the named property.
+ * @param key
+ * the name to assign the new property.
+ * @param value
+ * the value to set for the named property.
*/
public void setApplicationProperty(String key, Object value) {
checkReadOnly();
@@ -331,8 +387,10 @@ public class AmqpMessage {
* Gets the application property that is mapped to the given name or null
* if no property has been set with that name.
*
- * @param key the name used to lookup the property in the application properties.
- * @return the propety value or null if not set.
+ * @param key
+ * the name used to lookup the property in the application properties.
+ *
+ * @return the property value or null if not set.
*/
public Object getApplicationProperty(String key) {
if (applicationPropertiesMap == null) {
@@ -346,8 +404,10 @@ public class AmqpMessage {
* Perform a proper annotation set on the AMQP Message based on a Symbol key and
* the target value to append to the current annotations.
*
- * @param key The name of the Symbol whose value is being set.
- * @param value The new value to set in the annotations of this message.
+ * @param key
+ * The name of the Symbol whose value is being set.
+ * @param value
+ * The new value to set in the annotations of this message.
*/
public void setMessageAnnotation(String key, Object value) {
checkReadOnly();
@@ -360,7 +420,9 @@ public class AmqpMessage {
* that annotation name. If the message annotations have not been created yet
* then this method will always return null.
*
- * @param key the Symbol name that should be looked up in the message annotations.
+ * @param key
+ * the Symbol name that should be looked up in the message annotations.
+ *
* @return the value of the annotation if it exists, or null if not set or not accessible.
*/
public Object getMessageAnnotation(String key) {
@@ -375,8 +437,10 @@ public class AmqpMessage {
* Perform a proper delivery annotation set on the AMQP Message based on a Symbol
* key and the target value to append to the current delivery annotations.
*
- * @param key The name of the Symbol whose value is being set.
- * @param value The new value to set in the delivery annotations of this message.
+ * @param key
+ * The name of the Symbol whose value is being set.
+ * @param value
+ * The new value to set in the delivery annotations of this message.
*/
public void setDeliveryAnnotation(String key, Object value) {
checkReadOnly();
@@ -389,7 +453,9 @@ public class AmqpMessage {
* that annotation name. If the message annotations have not been created yet
* then this method will always return null.
*
- * @param key the Symbol name that should be looked up in the message annotations.
+ * @param key
+ * the Symbol name that should be looked up in the message annotations.
+ *
* @return the value of the annotation if it exists, or null if not set or not accessible.
*/
public Object getDeliveryAnnotation(String key) {
@@ -406,7 +472,9 @@ public class AmqpMessage {
* Sets a String value into the body of an outgoing Message, throws
* an exception if this is an incoming message instance.
*
- * @param value the String value to store in the Message body.
+ * @param value
+ * the String value to store in the Message body.
+ *
* @throws IllegalStateException if the message is read only.
*/
public void setText(String value) throws IllegalStateException {
@@ -419,7 +487,9 @@ public class AmqpMessage {
* Sets a byte array value into the body of an outgoing Message, throws
* an exception if this is an incoming message instance.
*
- * @param bytes the byte array value to store in the Message body.
+ * @param value
+ * the byte array value to store in the Message body.
+ *
* @throws IllegalStateException if the message is read only.
*/
public void setBytes(byte[] bytes) throws IllegalStateException {
@@ -432,7 +502,9 @@ public class AmqpMessage {
* Sets a byte array value into the body of an outgoing Message, throws
* an exception if this is an incoming message instance.
*
- * @param described the byte array value to store in the Message body.
+ * @param value
+ * the byte array value to store in the Message body.
+ *
* @throws IllegalStateException if the message is read only.
*/
public void setDescribedType(DescribedType described) throws IllegalStateException {
@@ -445,6 +517,7 @@ public class AmqpMessage {
* Attempts to retrieve the message body as an DescribedType instance.
*
* @return an DescribedType instance if one is stored in the message body.
+ *
* @throws NoSuchElementException if the body does not contain a DescribedType.
*/
public DescribedType getDescribedType() throws NoSuchElementException {
@@ -482,21 +555,21 @@ public class AmqpMessage {
private void lazyCreateMessageAnnotations() {
if (messageAnnotationsMap == null) {
- messageAnnotationsMap = new HashMap<>();
+ messageAnnotationsMap = new HashMap<Symbol, Object>();
message.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap));
}
}
private void lazyCreateDeliveryAnnotations() {
if (deliveryAnnotationsMap == null) {
- deliveryAnnotationsMap = new HashMap<>();
+ deliveryAnnotationsMap = new HashMap<Symbol, Object>();
message.setDeliveryAnnotations(new DeliveryAnnotations(deliveryAnnotationsMap));
}
}
private void lazyCreateApplicationProperties() {
if (applicationPropertiesMap == null) {
- applicationPropertiesMap = new HashMap<>();
+ applicationPropertiesMap = new HashMap<String, Object>();
message.setApplicationProperties(new ApplicationProperties(applicationPropertiesMap));
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index 9f3bff2..2802751 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -208,6 +208,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
* it is returned immediately otherwise this methods return null without waiting.
*
* @return a newly received message or null if there is no currently available message.
+ *
* @throws Exception if an error occurs during the receive attempt.
*/
public AmqpMessage receiveNoWait() throws Exception {
@@ -219,6 +220,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
* Request a remote peer send a Message to this client waiting until one arrives.
*
* @return the pulled AmqpMessage or null if none was pulled from the remote.
+ *
* @throws IOException if an error occurs
*/
public AmqpMessage pull() throws IOException {
@@ -402,12 +404,38 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
* @throws IOException if an error occurs while sending the accept.
*/
public void accept(final Delivery delivery) throws IOException {
+ accept(delivery, this.session);
+ }
+
+ /**
+ * Accepts a message that was dispatched under the given Delivery instance.
+ *
+ * This method allows for the session that is used in the accept to be specified by the
+ * caller. This allows for an accepted message to be involved in a transaction that is
+ * being managed by some other session other than the one that created this receiver.
+ *
+ * @param delivery
+ * the Delivery instance to accept.
+ * @param session
+ * the session under which the message is being accepted.
+ *
+ * @throws IOException if an error occurs while sending the accept.
+ */
+ public void accept(final Delivery delivery, final AmqpSession session) throws IOException {
checkClosed();
if (delivery == null) {
throw new IllegalArgumentException("Delivery to accept cannot be null");
}
+ if (session == null) {
+ throw new IllegalArgumentException("Session given cannot be null");
+ }
+
+ if (session.getConnection() != this.session.getConnection()) {
+ throw new IllegalArgumentException("The session used for accept must originate from the connection that created this receiver.");
+ }
+
final ClientFuture request = new ClientFuture();
session.getScheduler().execute(new Runnable() {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
index 5ae2948..ed83e02 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
@@ -118,6 +118,18 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
*/
public void send(final AmqpMessage message) throws IOException {
checkClosed();
+ send(message, null);
+ }
+
+ /**
+ * Sends the given message to this senders assigned address using the supplied transaction ID.
+ *
+ * @param message the message to send.
+ * @param txId the transaction ID to assign the outgoing send.
+ * @throws IOException if an error occurs during the send.
+ */
+ public void send(final AmqpMessage message, final AmqpTransactionId txId) throws IOException {
+ checkClosed();
final ClientFuture sendRequest = new ClientFuture();
session.getScheduler().execute(new Runnable() {
@@ -125,7 +137,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
@Override
public void run() {
try {
- doSend(message, sendRequest);
+ doSend(message, sendRequest, txId);
session.pumpToProtonTransport(sendRequest);
}
catch (Exception e) {
@@ -316,7 +328,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
}
}
- private void doSend(AmqpMessage message, AsyncResult request) throws Exception {
+ private void doSend(AmqpMessage message, AsyncResult request, AmqpTransactionId txId) throws Exception {
LOG.trace("Producer sending message: {}", message);
Delivery delivery = null;
@@ -330,8 +342,15 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
delivery.setContext(request);
- if (session.isInTransaction()) {
- Binary amqpTxId = session.getTransactionId().getRemoteTxId();
+ Binary amqpTxId = null;
+ if (txId != null) {
+ amqpTxId = txId.getRemoteTxId();
+ }
+ else if (session.isInTransaction()) {
+ amqpTxId = session.getTransactionId().getRemoteTxId();
+ }
+
+ if (amqpTxId != null) {
TransactionalState state = new TransactionalState();
state.setTxnId(amqpTxId);
delivery.disposition(state);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index 82b6aec..755ecf8 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -408,11 +408,15 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
connection.pumpToProtonTransport(request);
}
- AmqpTransactionId getTransactionId() {
- return txContext.getTransactionId();
+ public AmqpTransactionId getTransactionId() {
+ if (txContext != null && txContext.isInTransaction()) {
+ return txContext.getTransactionId();
+ }
+
+ return null;
}
- public AmqpTransactionContext getTransactionContext() {
+ AmqpTransactionContext getTransactionContext() {
return txContext;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/113c0c93/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
new file mode 100644
index 0000000..8fa140a
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.net.URI;
+import java.util.LinkedList;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.junit.After;
+import org.junit.Before;
+
+/**
+ * Test support class for tests that will be using the AMQP Proton wrapper client.
+ * This is to make it easier to migrate tests from ActiveMQ5
+ */
+public class AmqpClientTestSupport extends ActiveMQTestBase {
+
+
+ ActiveMQServer server;
+
+ LinkedList<AmqpConnection> connections = new LinkedList<>();
+
+
+ protected AmqpConnection addConnection(AmqpConnection connection) {
+ connections.add(connection);
+ return connection;
+ }
+
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ server = createServer(true, true);
+ server.start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+
+ for (AmqpConnection conn: connections) {
+ try {
+ conn.close();
+ }
+ catch (Throwable ignored) {
+ ignored.printStackTrace();
+ }
+ }
+ server.stop();
+ }
+
+ public Queue getProxyToQueue(String queueName) {
+ return server.locateQueue(SimpleString.toSimpleString(queueName));
+ }
+
+ private String connectorScheme = "amqp";
+ private boolean useSSL;
+
+ public String getTestName() {
+ return "jms.queue." + getName();
+ }
+
+ public AmqpClientTestSupport() {
+ }
+
+ public AmqpClientTestSupport(String connectorScheme, boolean useSSL) {
+ this.connectorScheme = connectorScheme;
+ this.useSSL = useSSL;
+ }
+
+ public String getConnectorScheme() {
+ return connectorScheme;
+ }
+
+ public boolean isUseSSL() {
+ return useSSL;
+ }
+
+ public String getAmqpConnectionURIOptions() {
+ return "";
+ }
+
+ protected boolean isUseTcpConnector() {
+ return !isUseSSL() && !connectorScheme.contains("nio") && !connectorScheme.contains("ws");
+ }
+
+ protected boolean isUseSslConnector() {
+ return isUseSSL() && !connectorScheme.contains("nio") && !connectorScheme.contains("wss");
+ }
+
+ protected boolean isUseNioConnector() {
+ return !isUseSSL() && connectorScheme.contains("nio");
+ }
+
+ protected boolean isUseNioPlusSslConnector() {
+ return isUseSSL() && connectorScheme.contains("nio");
+ }
+
+ protected boolean isUseWsConnector() {
+ return !isUseSSL() && connectorScheme.contains("ws");
+ }
+
+ protected boolean isUseWssConnector() {
+ return isUseSSL() && connectorScheme.contains("wss");
+ }
+
+ public URI getBrokerAmqpConnectionURI() {
+ boolean webSocket = false;
+
+ try {
+ int port = 61616;
+
+ String uri = null;
+
+ if (isUseSSL()) {
+ if (webSocket) {
+ uri = "wss://127.0.0.1:" + port;
+ }
+ else {
+ uri = "ssl://127.0.0.1:" + port;
+ }
+ }
+ else {
+ if (webSocket) {
+ uri = "ws://127.0.0.1:" + port;
+ }
+ else {
+ uri = "tcp://127.0.0.1:" + port;
+ }
+ }
+
+ if (!getAmqpConnectionURIOptions().isEmpty()) {
+ uri = uri + "?" + getAmqpConnectionURIOptions();
+ }
+
+ return new URI(uri);
+ }
+ catch (Exception e) {
+ throw new RuntimeException();
+ }
+ }
+
+ public AmqpConnection createAmqpConnection() throws Exception {
+ return createAmqpConnection(getBrokerAmqpConnectionURI());
+ }
+
+ public AmqpConnection createAmqpConnection(String username, String password) throws Exception {
+ return createAmqpConnection(getBrokerAmqpConnectionURI(), username, password);
+ }
+
+ public AmqpConnection createAmqpConnection(URI brokerURI) throws Exception {
+ return createAmqpConnection(brokerURI, null, null);
+ }
+
+ public AmqpConnection createAmqpConnection(URI brokerURI, String username, String password) throws Exception {
+ return createAmqpClient(brokerURI, username, password).connect();
+ }
+
+ public AmqpClient createAmqpClient() throws Exception {
+ return createAmqpClient(getBrokerAmqpConnectionURI(), null, null);
+ }
+
+ public AmqpClient createAmqpClient(URI brokerURI) throws Exception {
+ return createAmqpClient(brokerURI, null, null);
+ }
+
+ public AmqpClient createAmqpClient(String username, String password) throws Exception {
+ return createAmqpClient(getBrokerAmqpConnectionURI(), username, password);
+ }
+
+ public AmqpClient createAmqpClient(URI brokerURI, String username, String password) throws Exception {
+ return new AmqpClient(brokerURI, username, password);
+ }
+}
[3/3] activemq-artemis git commit: This closes #782
Posted by jb...@apache.org.
This closes #782
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e790c785
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e790c785
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e790c785
Branch: refs/heads/master
Commit: e790c78583bb51c4ae2a4de1d9b3513c243d212b
Parents: 5ea53c4 113c0c9
Author: jbertram <jb...@apache.com>
Authored: Wed Sep 21 19:21:04 2016 -0500
Committer: jbertram <jb...@apache.com>
Committed: Wed Sep 21 19:21:04 2016 -0500
----------------------------------------------------------------------
.../plug/ActiveMQProtonConnectionCallback.java | 71 ++-
.../plug/ProtonSessionIntegrationCallback.java | 84 ++-
.../org/proton/plug/AMQPConnectionCallback.java | 11 +
.../org/proton/plug/AMQPSessionCallback.java | 18 +-
.../context/AbstractProtonSessionContext.java | 1 -
.../plug/context/ProtonTransactionHandler.java | 8 +-
.../server/ProtonServerReceiverContext.java | 10 +-
.../server/ProtonServerSenderContext.java | 7 +-
.../server/ProtonServerSessionContext.java | 6 +
.../ActiveMQAMQPProtocolMessageBundle.java | 3 +
.../context/AbstractConnectionContextTest.java | 17 +
.../proton/plug/test/invm/ProtonINVMSPI.java | 33 +
.../plug/test/minimalclient/AMQPClientSPI.java | 18 +
.../minimalserver/MinimalConnectionSPI.java | 18 +
.../test/minimalserver/MinimalSessionSPI.java | 22 +-
.../artemis/core/server/ServerSession.java | 2 +
.../core/server/impl/ServerSessionImpl.java | 18 +-
.../transport/amqp/client/AmqpConnection.java | 9 +-
.../transport/amqp/client/AmqpMessage.java | 129 +++-
.../transport/amqp/client/AmqpReceiver.java | 28 +
.../transport/amqp/client/AmqpSender.java | 27 +-
.../transport/amqp/client/AmqpSession.java | 12 +-
.../integration/amqp/AmqpClientTestSupport.java | 194 ++++++
.../integration/amqp/AmqpTransactionTest.java | 625 +++++++++++++++++++
.../tests/integration/proton/ProtonTest.java | 3 +-
.../integration/proton/ProtonTestBase.java | 1 -
.../integration/proton/ProtonTestForHeader.java | 3 -
27 files changed, 1263 insertions(+), 115 deletions(-)
----------------------------------------------------------------------