You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/09/14 22:24:02 UTC
activemq git commit: NO-JIRA AMQP Test updates
Repository: activemq
Updated Branches:
refs/heads/master da9fedead -> b4ab0e1af
NO-JIRA AMQP Test updates
Adds support for doing sends and receives that are enrolled in a
transaction created in a session other than the session that created the
sender or receiver. Adds some tests that show this in action.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b4ab0e1a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b4ab0e1a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b4ab0e1a
Branch: refs/heads/master
Commit: b4ab0e1af9fb0bb56b8a4a9cfc948727a7b92e0c
Parents: da9fede
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Sep 14 18:23:52 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Sep 14 18:23:52 2016 -0400
----------------------------------------------------------------------
.../transport/amqp/client/AmqpMessage.java | 19 +-
.../transport/amqp/client/AmqpReceiver.java | 26 +++
.../transport/amqp/client/AmqpSender.java | 29 ++-
.../transport/amqp/client/AmqpSession.java | 8 +-
.../amqp/interop/AmqpTransactionTest.java | 194 +++++++++++++++++++
5 files changed, 269 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/b4ab0e1a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index 99f4cfb..8b378e1 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -140,6 +140,22 @@ public class AmqpMessage {
}
/**
+ * Accepts the message marking it as consumed on the remote peer.
+ *
+ * @param session
+ * The session that is used to manage acceptance of the message.
+ *
+ * @throws Exception if an error occurs during the accept.
+ */
+ public void accept(AmqpSession txnSession) throws Exception {
+ if (receiver == null) {
+ throw new IllegalStateException("Can't accept non-received message.");
+ }
+
+ receiver.accept(delivery, txnSession);
+ }
+
+ /**
* Marks the message as Modified, indicating whether it failed to deliver and is not deliverable here.
*
* @param deliveryFailed
@@ -374,7 +390,7 @@ public class AmqpMessage {
* @param key
* the name used to lookup the property in the application properties.
*
- * @return the propety value or null if not set.
+ * @return the property value or null if not set.
*/
public Object getApplicationProperty(String key) {
if (applicationPropertiesMap == null) {
@@ -560,6 +576,7 @@ public class AmqpMessage {
message.setHeader(new Header());
}
}
+
private void lazyCreateProperties() {
if (message.getProperties() == null) {
message.setProperties(new Properties());
http://git-wip-us.apache.org/repos/asf/activemq/blob/b4ab0e1a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index 77a529d..999e033 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -422,12 +422,38 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
* @throws IOException if an error occurs while sending the accept.
*/
public void accept(final Delivery delivery) throws IOException {
+ accept(delivery, this.session);
+ }
+
+ /**
+ * Accepts a message that was dispatched under the given Delivery instance.
+ *
+ * This method allows for the session that is used in the accept to be specified by the
+ * caller. This allows for an accepted message to be involved in a transaction that is
+ * being managed by some other session other than the one that created this receiver.
+ *
+ * @param delivery
+ * the Delivery instance to accept.
+ * @param session
+ * the session under which the message is being accepted.
+ *
+ * @throws IOException if an error occurs while sending the accept.
+ */
+ public void accept(final Delivery delivery, final AmqpSession session) throws IOException {
checkClosed();
if (delivery == null) {
throw new IllegalArgumentException("Delivery to accept cannot be null");
}
+ if (session == null) {
+ throw new IllegalArgumentException("Session given cannot be null");
+ }
+
+ if (session.getConnection() != this.session.getConnection()) {
+ throw new IllegalArgumentException("The session used for accept must originate from the connection that created this receiver.");
+ }
+
final ClientFuture request = new ClientFuture();
session.getScheduler().execute(new Runnable() {
http://git-wip-us.apache.org/repos/asf/activemq/blob/b4ab0e1a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
index f9d6435..dd3a371 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
@@ -127,6 +127,21 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
*/
public void send(final AmqpMessage message) throws IOException {
checkClosed();
+ send(message, null);
+ }
+
+ /**
+ * Sends the given message to this senders assigned address using the supplied transaction ID.
+ *
+ * @param message
+ * the message to send.
+ * @param txId
+ * the transaction ID to assign the outgoing send.
+ *
+ * @throws IOException if an error occurs during the send.
+ */
+ public void send(final AmqpMessage message, final AmqpTransactionId txId) throws IOException {
+ checkClosed();
final ClientFuture sendRequest = new ClientFuture();
session.getScheduler().execute(new Runnable() {
@@ -134,7 +149,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
@Override
public void run() {
try {
- doSend(message, sendRequest);
+ doSend(message, sendRequest, txId);
session.pumpToProtonTransport(sendRequest);
} catch (Exception e) {
sendRequest.onFailure(e);
@@ -319,7 +334,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
}
}
- private void doSend(AmqpMessage message, AsyncResult request) throws Exception {
+ private void doSend(AmqpMessage message, AsyncResult request, AmqpTransactionId txId) throws Exception {
LOG.trace("Producer sending message: {}", message);
Delivery delivery = null;
@@ -332,8 +347,14 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
delivery.setContext(request);
- if (session.isInTransaction()) {
- Binary amqpTxId = session.getTransactionId().getRemoteTxId();
+ Binary amqpTxId = null;
+ if (txId != null) {
+ amqpTxId = txId.getRemoteTxId();
+ } else if (session.isInTransaction()) {
+ amqpTxId = session.getTransactionId().getRemoteTxId();
+ }
+
+ if (amqpTxId != null) {
TransactionalState state = new TransactionalState();
state.setTxnId(amqpTxId);
delivery.disposition(state);
http://git-wip-us.apache.org/repos/asf/activemq/blob/b4ab0e1a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index ae99f65..3804603 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -464,8 +464,12 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
connection.pumpToProtonTransport(request);
}
- AmqpTransactionId getTransactionId() {
- return txContext.getTransactionId();
+ public AmqpTransactionId getTransactionId() {
+ if (txContext != null && txContext.isInTransaction()) {
+ return txContext.getTransactionId();
+ }
+
+ return null;
}
AmqpTransactionContext getTransactionContext() {
http://git-wip-us.apache.org/repos/asf/activemq/blob/b4ab0e1a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
index a998290..97089a9 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
@@ -178,4 +178,198 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
sender.close();
connection.close();
}
+
+ @Test(timeout = 60000)
+ public void testMultipleSessionReceiversInSingleTXNWithCommit() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = client.connect();
+
+ // Load up the Queue with some messages
+ {
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender("queue://" + getTestName());
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+ sender.send(message);
+ sender.send(message);
+ sender.send(message);
+ sender.close();
+ }
+
+ // Root TXN session controls all TXN send lifetimes.
+ AmqpSession txnSession = connection.createSession();
+
+ // Create some sender sessions
+ AmqpSession session1 = connection.createSession();
+ AmqpSession session2 = connection.createSession();
+ AmqpSession session3 = connection.createSession();
+
+ // Sender linked to each session
+ AmqpReceiver receiver1 = session1.createReceiver("queue://" + getTestName());
+ AmqpReceiver receiver2 = session2.createReceiver("queue://" + getTestName());
+ AmqpReceiver receiver3 = session3.createReceiver("queue://" + getTestName());
+
+ final QueueViewMBean queue = getProxyToQueue(getTestName());
+ assertEquals(3, queue.getQueueSize());
+
+ // Begin the transaction that all senders will operate in.
+ txnSession.begin();
+
+ assertTrue(txnSession.isInTransaction());
+
+ receiver1.flow(1);
+ receiver2.flow(1);
+ receiver3.flow(1);
+
+ AmqpMessage message1 = receiver1.receive(5, TimeUnit.SECONDS);
+ AmqpMessage message2 = receiver2.receive(5, TimeUnit.SECONDS);
+ AmqpMessage message3 = receiver3.receive(5, TimeUnit.SECONDS);
+
+ message1.accept(txnSession);
+ message2.accept(txnSession);
+ message3.accept(txnSession);
+
+ assertEquals(3, queue.getQueueSize());
+
+ txnSession.commit();
+
+ assertEquals(0, queue.getQueueSize());
+ }
+
+ @Test(timeout = 60000)
+ public void testMultipleSessionReceiversInSingleTXNWithRollback() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = client.connect();
+
+ // Load up the Queue with some messages
+ {
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender("queue://" + getTestName());
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+ sender.send(message);
+ sender.send(message);
+ sender.send(message);
+ sender.close();
+ }
+
+ // Root TXN session controls all TXN send lifetimes.
+ AmqpSession txnSession = connection.createSession();
+
+ // Create some sender sessions
+ AmqpSession session1 = connection.createSession();
+ AmqpSession session2 = connection.createSession();
+ AmqpSession session3 = connection.createSession();
+
+ // Sender linked to each session
+ AmqpReceiver receiver1 = session1.createReceiver("queue://" + getTestName());
+ AmqpReceiver receiver2 = session2.createReceiver("queue://" + getTestName());
+ AmqpReceiver receiver3 = session3.createReceiver("queue://" + getTestName());
+
+ final QueueViewMBean queue = getProxyToQueue(getTestName());
+ assertEquals(3, queue.getQueueSize());
+
+ // Begin the transaction that all senders will operate in.
+ txnSession.begin();
+
+ assertTrue(txnSession.isInTransaction());
+
+ receiver1.flow(1);
+ receiver2.flow(1);
+ receiver3.flow(1);
+
+ AmqpMessage message1 = receiver1.receive(5, TimeUnit.SECONDS);
+ AmqpMessage message2 = receiver2.receive(5, TimeUnit.SECONDS);
+ AmqpMessage message3 = receiver3.receive(5, TimeUnit.SECONDS);
+
+ message1.accept(txnSession);
+ message2.accept(txnSession);
+ message3.accept(txnSession);
+
+ assertEquals(3, queue.getQueueSize());
+
+ txnSession.rollback();
+
+ assertEquals(3, queue.getQueueSize());
+ }
+
+ @Test(timeout = 60000)
+ public void testMultipleSessionSendersInSingleTXNWithCommit() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = client.connect();
+
+ // Root TXN session controls all TXN send lifetimes.
+ AmqpSession txnSession = connection.createSession();
+
+ // Create some sender sessions
+ AmqpSession session1 = connection.createSession();
+ AmqpSession session2 = connection.createSession();
+ AmqpSession session3 = connection.createSession();
+
+ // Sender linked to each session
+ AmqpSender sender1 = session1.createSender("queue://" + getTestName());
+ AmqpSender sender2 = session2.createSender("queue://" + getTestName());
+ AmqpSender sender3 = session3.createSender("queue://" + getTestName());
+
+ final QueueViewMBean queue = getProxyToQueue(getTestName());
+ assertEquals(0, queue.getQueueSize());
+
+ // Begin the transaction that all senders will operate in.
+ txnSession.begin();
+
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+
+ assertTrue(txnSession.isInTransaction());
+
+ sender1.send(message, txnSession.getTransactionId());
+ sender2.send(message, txnSession.getTransactionId());
+ sender3.send(message, txnSession.getTransactionId());
+
+ assertEquals(0, queue.getQueueSize());
+
+ txnSession.commit();
+
+ assertEquals(3, queue.getQueueSize());
+ }
+
+ @Test(timeout = 60000)
+ public void testMultipleSessionSendersInSingleTXNWithRollback() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = client.connect();
+
+ // Root TXN session controls all TXN send lifetimes.
+ AmqpSession txnSession = connection.createSession();
+
+ // Create some sender sessions
+ AmqpSession session1 = connection.createSession();
+ AmqpSession session2 = connection.createSession();
+ AmqpSession session3 = connection.createSession();
+
+ // Sender linked to each session
+ AmqpSender sender1 = session1.createSender("queue://" + getTestName());
+ AmqpSender sender2 = session2.createSender("queue://" + getTestName());
+ AmqpSender sender3 = session3.createSender("queue://" + getTestName());
+
+ final QueueViewMBean queue = getProxyToQueue(getTestName());
+ assertEquals(0, queue.getQueueSize());
+
+ // Begin the transaction that all senders will operate in.
+ txnSession.begin();
+
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+
+ assertTrue(txnSession.isInTransaction());
+
+ sender1.send(message, txnSession.getTransactionId());
+ sender2.send(message, txnSession.getTransactionId());
+ sender3.send(message, txnSession.getTransactionId());
+
+ assertEquals(0, queue.getQueueSize());
+
+ txnSession.rollback();
+
+ assertEquals(0, queue.getQueueSize());
+ }
}