You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/05/07 12:48:44 UTC
[qpid-broker-j] branch 7.1.x updated: QPID-8309: [Broker-J] Fix
local transaction discharge when async underlying store transaction is in
progress
This is an automated email from the ASF dual-hosted git repository.
orudyy pushed a commit to branch 7.1.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/7.1.x by this push:
new 972001c QPID-8309: [Broker-J] Fix local transaction discharge when async underlying store transaction is in progress
972001c is described below
commit 972001cc06d3ed9cd761c84a8bf6dc74d18652c8
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Tue May 7 13:14:59 2019 +0100
QPID-8309: [Broker-J] Fix local transaction discharge when async underlying store transaction is in progress
(cherry picked from commit 2a85e19a0730cf66ffc88f97b3a028c196fb582d)
---
.../apache/qpid/server/txn/LocalTransaction.java | 8 +-
.../v0_8/extension/tx/AsyncTransactionTest.java | 170 +++++++++++++++++++++
2 files changed, 174 insertions(+), 4 deletions(-)
diff --git a/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index 1c24c3e..eaf1db2 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -377,6 +377,7 @@ public class LocalTransaction implements ServerTransaction
@Override
public void commit(Runnable immediateAction)
{
+ sync();
if(!_state.compareAndSet(LocalTransactionState.ACTIVE, LocalTransactionState.DISCHARGING))
{
LocalTransactionState state = _state.get();
@@ -386,8 +387,6 @@ public class LocalTransaction implements ServerTransaction
throw new IllegalStateException(message);
}
-
- sync();
try
{
if(_transaction != null)
@@ -418,6 +417,7 @@ public class LocalTransaction implements ServerTransaction
public void commitAsync(final Runnable deferred)
{
+ sync();
if(!_state.compareAndSet(LocalTransactionState.ACTIVE, LocalTransactionState.DISCHARGING))
{
LocalTransactionState state = _state.get();
@@ -426,7 +426,7 @@ public class LocalTransaction implements ServerTransaction
: String.format("Cannot commit transaction with state '%s'", state);
throw new IllegalStateException(message);
}
- sync();
+
if(_transaction != null)
{
@@ -480,6 +480,7 @@ public class LocalTransaction implements ServerTransaction
@Override
public void rollback()
{
+ sync();
if (!_state.compareAndSet(LocalTransactionState.ACTIVE, LocalTransactionState.DISCHARGING)
&& !_state.compareAndSet(LocalTransactionState.ROLLBACK_ONLY, LocalTransactionState.DISCHARGING)
&& _state.get() != LocalTransactionState.DISCHARGING)
@@ -488,7 +489,6 @@ public class LocalTransaction implements ServerTransaction
_state.get()));
}
- sync();
try
{
if(_transaction != null)
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/tx/AsyncTransactionTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/tx/AsyncTransactionTest.java
new file mode 100644
index 0000000..446e78c
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/tx/AsyncTransactionTest.java
@@ -0,0 +1,170 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8.extension.tx;
+
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicDeliverBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicQosOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelFlowOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeclareOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.TxCommitOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.TxRollbackOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.TxSelectOkBody;
+import org.apache.qpid.tests.protocol.v0_8.FrameTransport;
+import org.apache.qpid.tests.protocol.v0_8.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+public class AsyncTransactionTest extends BrokerAdminUsingTestBase
+{
+ private InetSocketAddress _brokerAddress;
+ private static final int MESSAGE_COUNT = 10;
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ }
+
+ @Test
+ public void subsequentCommit() throws Exception
+ {
+ publishPersistentMessages();
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(MESSAGE_COUNT)));
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = createConsumerInteraction(transport);
+
+ acknowledgeDeliveries(interaction, receiveBasicDeliverBodies(interaction));
+ interaction.tx().commit();
+
+ // subsequent commit
+ interaction.tx().commit();
+
+ interaction.consumeResponse(TxCommitOkBody.class);
+ interaction.consumeResponse(TxCommitOkBody.class);
+
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+ }
+ }
+
+ @Test
+ public void subsequentRollback() throws Exception
+ {
+ publishPersistentMessages();
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(MESSAGE_COUNT)));
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = createConsumerInteraction(transport);
+
+ acknowledgeDeliveries(interaction, receiveBasicDeliverBodies(interaction));
+ interaction.tx().commit();
+
+ // subsequent rollback
+ interaction.tx().rollback();
+
+ interaction.consumeResponse(TxCommitOkBody.class);
+ interaction.consumeResponse(TxRollbackOkBody.class);
+
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+ }
+ }
+
+
+ private void publishPersistentMessages() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class);
+ for (int i = 0; i < MESSAGE_COUNT; i++)
+ {
+ interaction.basic()
+ .publishExchange("")
+ .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+ .contentHeaderPropertiesDeliveryMode(BasicContentHeaderProperties.PERSISTENT)
+ .content("message" + 1)
+ .publishMessage();
+ }
+ interaction.exchange().declarePassive(true).declare().consumeResponse(ExchangeDeclareOkBody.class);
+ }
+ }
+
+ private Interaction createConsumerInteraction(final FrameTransport transport)
+ throws Exception
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .tx().select().consumeResponse(TxSelectOkBody.class)
+ .basic().qosPrefetchCount(MESSAGE_COUNT)
+ .qos()
+ .consumeResponse(BasicQosOkBody.class)
+ .channel().flow(true)
+ .consumeResponse(ChannelFlowOkBody.class)
+ .basic()
+ .consumeConsumerTag("A")
+ .consumeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .consume()
+ .consumeResponse(BasicConsumeOkBody.class);
+ return interaction;
+ }
+
+ private BasicDeliverBody[] receiveBasicDeliverBodies(final Interaction interaction)
+ throws Exception
+ {
+ final BasicDeliverBody[] deliveries = new BasicDeliverBody[MESSAGE_COUNT];
+ for (int i = 0; i < MESSAGE_COUNT; i++)
+ {
+ deliveries[i] = interaction.consumeResponse(BasicDeliverBody.class).getLatestResponse(BasicDeliverBody.class);
+ interaction.consumeResponse(ContentHeaderBody.class).consumeResponse(ContentBody.class);
+ }
+ return deliveries;
+ }
+
+ private void acknowledgeDeliveries(final Interaction interaction, final BasicDeliverBody[] deliveries)
+ throws Exception
+ {
+ for (final BasicDeliverBody delivery : deliveries)
+ {
+ interaction.basic().ackDeliveryTag(delivery.getDeliveryTag()).ack();
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org