You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/05/07 12:41:25 UTC

[qpid-broker-j] branch master updated: QPID-8309: [Broker-J] Fix local transaction discharge when async underlying store transaction is in progress

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a85e19  QPID-8309: [Broker-J] Fix local transaction discharge when async underlying store transaction is in progress
2a85e19 is described below

commit 2a85e19a0730cf66ffc88f97b3a028c196fb582d
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Tue May 7 13:14:59 2019 +0100

    QPID-8309: [Broker-J] Fix local transaction discharge when async underlying store transaction is in progress
---
 .../apache/qpid/server/txn/LocalTransaction.java   |   8 +-
 .../v0_8/extension/tx/AsyncTransactionTest.java    | 170 +++++++++++++++++++++
 2 files changed, 174 insertions(+), 4 deletions(-)

diff --git a/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index 1c24c3e..eaf1db2 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -377,6 +377,7 @@ public class LocalTransaction implements ServerTransaction
     @Override
     public void commit(Runnable immediateAction)
     {
+        sync();
         if(!_state.compareAndSet(LocalTransactionState.ACTIVE, LocalTransactionState.DISCHARGING))
         {
             LocalTransactionState state = _state.get();
@@ -386,8 +387,6 @@ public class LocalTransaction implements ServerTransaction
             throw new IllegalStateException(message);
         }
 
-
-        sync();
         try
         {
             if(_transaction != null)
@@ -418,6 +417,7 @@ public class LocalTransaction implements ServerTransaction
 
     public void commitAsync(final Runnable deferred)
     {
+        sync();
         if(!_state.compareAndSet(LocalTransactionState.ACTIVE, LocalTransactionState.DISCHARGING))
         {
             LocalTransactionState state = _state.get();
@@ -426,7 +426,7 @@ public class LocalTransaction implements ServerTransaction
                     : String.format("Cannot commit transaction with state '%s'", state);
             throw new IllegalStateException(message);
         }
-        sync();
+
         if(_transaction != null)
         {
 
@@ -480,6 +480,7 @@ public class LocalTransaction implements ServerTransaction
     @Override
     public void rollback()
     {
+        sync();
         if (!_state.compareAndSet(LocalTransactionState.ACTIVE, LocalTransactionState.DISCHARGING)
             && !_state.compareAndSet(LocalTransactionState.ROLLBACK_ONLY, LocalTransactionState.DISCHARGING)
             && _state.get() != LocalTransactionState.DISCHARGING)
@@ -488,7 +489,6 @@ public class LocalTransaction implements ServerTransaction
                                                           _state.get()));
         }
 
-        sync();
         try
         {
             if(_transaction != null)
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/tx/AsyncTransactionTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/tx/AsyncTransactionTest.java
new file mode 100644
index 0000000..446e78c
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/tx/AsyncTransactionTest.java
@@ -0,0 +1,170 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8.extension.tx;
+
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicDeliverBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicQosOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelFlowOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeclareOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.TxCommitOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.TxRollbackOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.TxSelectOkBody;
+import org.apache.qpid.tests.protocol.v0_8.FrameTransport;
+import org.apache.qpid.tests.protocol.v0_8.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+public class AsyncTransactionTest extends BrokerAdminUsingTestBase
+{
+    private InetSocketAddress _brokerAddress;
+    private static final int MESSAGE_COUNT = 10;
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+    }
+
+    @Test
+    public void subsequentCommit() throws Exception
+    {
+        publishPersistentMessages();
+        assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(MESSAGE_COUNT)));
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = createConsumerInteraction(transport);
+
+            acknowledgeDeliveries(interaction, receiveBasicDeliverBodies(interaction));
+            interaction.tx().commit();
+
+            // subsequent commit
+            interaction.tx().commit();
+
+            interaction.consumeResponse(TxCommitOkBody.class);
+            interaction.consumeResponse(TxCommitOkBody.class);
+
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+        }
+    }
+
+    @Test
+    public void subsequentRollback() throws Exception
+    {
+        publishPersistentMessages();
+        assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(MESSAGE_COUNT)));
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = createConsumerInteraction(transport);
+
+            acknowledgeDeliveries(interaction, receiveBasicDeliverBodies(interaction));
+            interaction.tx().commit();
+
+            // subsequent rollback
+            interaction.tx().rollback();
+
+            interaction.consumeResponse(TxCommitOkBody.class);
+            interaction.consumeResponse(TxRollbackOkBody.class);
+
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+        }
+    }
+
+
+    private void publishPersistentMessages() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class);
+            for (int i = 0; i < MESSAGE_COUNT; i++)
+            {
+                interaction.basic()
+                           .publishExchange("")
+                           .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+                           .contentHeaderPropertiesDeliveryMode(BasicContentHeaderProperties.PERSISTENT)
+                           .content("message" + 1)
+                           .publishMessage();
+            }
+            interaction.exchange().declarePassive(true).declare().consumeResponse(ExchangeDeclareOkBody.class);
+        }
+    }
+
+    private Interaction createConsumerInteraction(final FrameTransport transport)
+            throws Exception
+    {
+        final Interaction interaction = transport.newInteraction();
+        interaction.openAnonymousConnection()
+                   .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                   .tx().select().consumeResponse(TxSelectOkBody.class)
+                   .basic().qosPrefetchCount(MESSAGE_COUNT)
+                   .qos()
+                   .consumeResponse(BasicQosOkBody.class)
+                   .channel().flow(true)
+                   .consumeResponse(ChannelFlowOkBody.class)
+                   .basic()
+                   .consumeConsumerTag("A")
+                   .consumeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+                   .consume()
+                   .consumeResponse(BasicConsumeOkBody.class);
+        return interaction;
+    }
+
+    private BasicDeliverBody[] receiveBasicDeliverBodies(final Interaction interaction)
+            throws Exception
+    {
+        final BasicDeliverBody[] deliveries = new BasicDeliverBody[MESSAGE_COUNT];
+        for (int i = 0; i < MESSAGE_COUNT; i++)
+        {
+            deliveries[i] = interaction.consumeResponse(BasicDeliverBody.class).getLatestResponse(BasicDeliverBody.class);
+            interaction.consumeResponse(ContentHeaderBody.class).consumeResponse(ContentBody.class);
+        }
+        return deliveries;
+    }
+
+    private void acknowledgeDeliveries(final Interaction interaction, final BasicDeliverBody[] deliveries)
+            throws Exception
+    {
+        for (final BasicDeliverBody delivery : deliveries)
+        {
+            interaction.basic().ackDeliveryTag(delivery.getDeliveryTag()).ack();
+        }
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org