You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2017/02/08 20:40:52 UTC
svn commit: r1782255 - in
/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0:
./ type/transaction/
Author: rgodfrey
Date: Wed Feb 8 20:40:51 2017
New Revision: 1782255
URL: http://svn.apache.org/viewvc?rev=1782255&view=rev
Log:
QPID-7667 : Implement multi-session spanning transactions
Added:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/IdentifiedTransaction.java (with props)
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Coordinator.java
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1782255&r1=1782254&r2=1782255&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Wed Feb 8 20:40:51 2017
@@ -20,6 +20,7 @@
package org.apache.qpid.server.protocol.v1_0;
+import java.util.Iterator;
import java.util.List;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
@@ -33,6 +34,7 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.transport.ProtocolEngine;
+import org.apache.qpid.server.txn.ServerTransaction;
@ManagedObject(category = false, creatable = false, type="AMQP_1_0")
public interface AMQPConnection_1_0<C extends AMQPConnection_1_0<C>> extends AMQPConnection<C>,
@@ -62,4 +64,9 @@ public interface AMQPConnection_1_0<C ex
boolean isClosed();
void close(Error error);
+
+ Iterator<IdentifiedTransaction> getOpenTransactions();
+ IdentifiedTransaction createLocalTransaction();
+ ServerTransaction getTransaction(int txnId);
+ void removeTransaction(int txnId);
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java?rev=1782255&r1=1782254&r2=1782255&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java Wed Feb 8 20:40:51 2017
@@ -62,6 +62,7 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
import org.apache.qpid.server.protocol.v1_0.codec.ProtocolHandler;
import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
+import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler;
@@ -76,7 +77,6 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
@@ -106,14 +106,16 @@ import org.apache.qpid.server.session.AM
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.transport.AggregateTicker;
+import org.apache.qpid.server.transport.ByteBufferSender;
import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.transport.ServerNetworkConnection;
+import org.apache.qpid.server.transport.util.Functions;
+import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
-import org.apache.qpid.server.transport.ByteBufferSender;
-import org.apache.qpid.server.transport.util.Functions;
public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnection_1_0Impl, ConnectionHandler>
implements FrameOutputHandler,
@@ -236,6 +238,10 @@ public class AMQPConnection_1_0Impl exte
private final Set<AMQPSession<?,?>> _sessionsWithWork =
Collections.newSetFromMap(new ConcurrentHashMap<AMQPSession<?,?>, Boolean>());
+
+ // Multi session transactions
+ private volatile ServerTransaction[] _openTransactions = new ServerTransaction[16];
+
AMQPConnection_1_0Impl(final Broker<?> broker,
final ServerNetworkConnection network,
AmqpPort<?> port,
@@ -1723,4 +1729,94 @@ public class AMQPConnection_1_0Impl exte
{
super.initialiseHeartbeating(writerDelay, readerDelay);
}
+
+ @Override
+ public Iterator<IdentifiedTransaction> getOpenTransactions()
+ {
+ return new Iterator<IdentifiedTransaction>()
+ {
+ int _index = 0;
+
+ @Override
+ public boolean hasNext()
+ {
+ for(int i = _index; i < _openTransactions.length; i++)
+ {
+ if(_openTransactions[i] != null)
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public IdentifiedTransaction next()
+ {
+ IdentifiedTransaction txn = null;
+ for( ; _index < _openTransactions.length; _index++)
+ {
+ if(_openTransactions[_index] != null)
+ {
+ txn = new IdentifiedTransaction(_index, _openTransactions[_index]);
+ _index++;
+ return txn;
+ }
+ }
+
+ throw new NoSuchElementException();
+ }
+
+ @Override
+ public void remove()
+ {
+ _openTransactions[_index] = null;
+ }
+ };
+ }
+
+ @Override
+ public IdentifiedTransaction createLocalTransaction()
+ {
+ ServerTransaction[] openTransactions = _openTransactions;
+ final int maxOpenTransactions = openTransactions.length;
+ int id = 0;
+ for(; id < maxOpenTransactions; id++)
+ {
+ if(openTransactions[id] == null)
+ {
+ break;
+
+ }
+ }
+
+ // we need to expand the transaction array;
+ if(id == maxOpenTransactions)
+ {
+ final int newSize = maxOpenTransactions < 1024 ? 2*maxOpenTransactions : maxOpenTransactions + 1024;
+
+ _openTransactions = new ServerTransaction[2*maxOpenTransactions];
+ System.arraycopy(openTransactions, 0, _openTransactions, 0, maxOpenTransactions);
+
+ }
+
+ final LocalTransaction serverTransaction =
+ new LocalTransaction(getAddressSpace().getMessageStore());
+ _openTransactions[id] = serverTransaction;
+ return new IdentifiedTransaction(id, serverTransaction);
+ }
+
+ @Override
+ public ServerTransaction getTransaction(final int txnId)
+ {
+ // TODO - bounds check
+ return _openTransactions[txnId];
+ }
+
+ @Override
+ public void removeTransaction(final int txnId)
+ {
+ // TODO - bounds check
+ _openTransactions[txnId] = null;
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1782255&r1=1782254&r2=1782255&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Wed Feb 8 20:40:51 2017
@@ -62,6 +62,7 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.transport.ProtocolEngine;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
@@ -239,6 +240,10 @@ class ConsumerTarget_1_0 extends Abstrac
}
});
}
+ else
+ {
+ // TODO - deal with the case of an invalid txn id
+ }
}
getSession().getAMQPConnection().registerMessageDelivered(message.getSize());
@@ -358,24 +363,29 @@ class ConsumerTarget_1_0 extends Abstrac
Binary transactionId = null;
final Outcome outcome;
+ ServerTransaction txn;
// If disposition is settled this overrides the txn?
if(state instanceof TransactionalState)
{
transactionId = ((TransactionalState)state).getTxnId();
outcome = ((TransactionalState)state).getOutcome();
+ txn = _link.getTransaction(transactionId);
+ if(txn == null)
+ {
+ // TODO - invalid txn id supplied
+ }
}
else if (state instanceof Outcome)
{
outcome = (Outcome) state;
+ txn = new AutoCommitTransaction(getSession().getConnection().getAddressSpace().getMessageStore());
}
else
{
outcome = null;
+ txn = null;
}
-
- ServerTransaction txn = _link.getTransaction(transactionId);
-
if(outcome instanceof Accepted)
{
if (_queueEntry.makeAcquisitionUnstealable(getConsumer()))
Added: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/IdentifiedTransaction.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/IdentifiedTransaction.java?rev=1782255&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/IdentifiedTransaction.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/IdentifiedTransaction.java Wed Feb 8 20:40:51 2017
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.server.txn.ServerTransaction;
+
+public class IdentifiedTransaction
+{
+ private final int _id;
+ private final ServerTransaction _serverTransaction;
+
+ public IdentifiedTransaction(final int id, final ServerTransaction serverTransaction)
+ {
+ _id = id;
+ _serverTransaction = serverTransaction;
+ }
+
+ public int getId()
+ {
+ return _id;
+ }
+
+ public ServerTransaction getServerTransaction()
+ {
+ return _serverTransaction;
+ }
+}
Propchange: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/IdentifiedTransaction.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1782255&r1=1782254&r2=1782255&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Wed Feb 8 20:40:51 2017
@@ -35,7 +35,6 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -117,7 +116,6 @@ import org.apache.qpid.server.security.S
import org.apache.qpid.server.session.AbstractAMQPSession;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.transport.AMQPConnection;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -131,12 +129,8 @@ public class Session_1_0 extends Abstrac
private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
private static final EnumSet<SessionState> END_STATES =
EnumSet.of(SessionState.END_RECVD, SessionState.END_PIPE, SessionState.END_SENT, SessionState.ENDED);
- private AutoCommitTransaction _transaction;
- private final LinkedHashMap<Integer, ServerTransaction> _openTransactions =
- new LinkedHashMap<Integer, ServerTransaction>();
-
- private final AMQPConnection_1_0 _connection;
+ private final AMQPConnection_1_0<?> _connection;
private AtomicBoolean _closed = new AtomicBoolean();
private final CopyOnWriteArrayList<Consumer<?, ConsumerTarget_1_0>> _consumers = new CopyOnWriteArrayList<>();
@@ -150,6 +144,8 @@ public class Session_1_0 extends Abstrac
private final Map<LinkEndpoint, UnsignedInteger> _localLinkEndpoints = new HashMap<>();
private final Map<UnsignedInteger, LinkEndpoint> _remoteLinkEndpoints = new HashMap<>();
+ private final List<TxnCoordinatorReceivingLink_1_0> _txnCoordinatorLinks = new ArrayList<>();
+
private short _receivingChannel;
private final short _sendingChannel;
@@ -940,8 +936,8 @@ public class Session_1_0 extends Abstrac
final TxnCoordinatorReceivingLink_1_0 coordinatorLink =
new TxnCoordinatorReceivingLink_1_0(getAddressSpace(),
this,
- receivingLinkEndpoint,
- _openTransactions);
+ receivingLinkEndpoint
+ );
receivingLinkEndpoint.setLink(coordinatorLink);
return coordinatorLink;
}
@@ -1469,30 +1465,12 @@ public class Session_1_0 extends Abstrac
ServerTransaction getTransaction(Binary transactionId)
{
-
- ServerTransaction transaction = _openTransactions.get(binaryToInteger(transactionId));
- if(transactionId == null)
- {
- if(_transaction == null)
- {
- _transaction = new AutoCommitTransaction(_connection.getAddressSpace().getMessageStore());
- }
- transaction = _transaction;
- }
- return transaction;
+ // TODO - deal with the case where the txn id is invalid
+ return _connection.getTransaction(binaryToInteger(transactionId));
}
void remoteEnd(End end)
{
- // TODO - if the end has a non empty error we should log it
- Iterator<Map.Entry<Integer, ServerTransaction>> iter = _openTransactions.entrySet().iterator();
-
- while(iter.hasNext())
- {
- Map.Entry<Integer, ServerTransaction> entry = iter.next();
- entry.getValue().rollback();
- iter.remove();
- }
for(LinkEndpoint linkEndpoint : getLocalLinkEndpoints())
{
@@ -1796,7 +1774,7 @@ public class Session_1_0 extends Abstrac
_sendingChannel) + "] ";
}
- public AMQPConnection_1_0 getConnection()
+ public AMQPConnection_1_0<?> getConnection()
{
return _connection;
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java?rev=1782255&r1=1782254&r2=1782255&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java Wed Feb 8 20:40:51 2017
@@ -235,10 +235,7 @@ public class StandardReceivingLink_1_0 i
}
else
{
- Session_1_0 session = getSession();
- transaction = session != null
- ? session.getTransaction(null)
- : new AutoCommitTransaction(_addressSpace.getMessageStore());
+ transaction = new AutoCommitTransaction(_addressSpace.getMessageStore());
}
try
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java?rev=1782255&r1=1782254&r2=1782255&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java Wed Feb 8 20:40:51 2017
@@ -21,9 +21,9 @@
package org.apache.qpid.server.protocol.v1_0;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,9 +40,11 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
import org.apache.qpid.server.protocol.v1_0.type.transaction.Declare;
import org.apache.qpid.server.protocol.v1_0.type.transaction.Declared;
import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TxnCapability;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
@@ -59,19 +61,19 @@ public class TxnCoordinatorReceivingLink
private ArrayList<Transfer> _incompleteMessage;
private SectionDecoder _sectionDecoder;
- private LinkedHashMap<Integer, ServerTransaction> _openTransactions;
+ private final LinkedHashMap<Integer, ServerTransaction> _createdTransactions = new LinkedHashMap<>();
private Session_1_0 _session;
public TxnCoordinatorReceivingLink_1_0(NamedAddressSpace namedAddressSpace,
- Session_1_0 session_1_0, ReceivingLinkEndpoint endpoint,
- LinkedHashMap<Integer, ServerTransaction> openTransactions)
+ Session_1_0 session_1_0,
+ ReceivingLinkEndpoint endpoint)
{
_namedAddressSpace = namedAddressSpace;
_session = session_1_0;
_endpoint = endpoint;
_sectionDecoder = new SectionDecoderImpl(endpoint.getSession().getConnection().getDescribedTypeRegistry().getSectionDecoderRegistry());
- _openTransactions = openTransactions;
+ ((Coordinator)endpoint.getTarget()).setCapabilities(TxnCapability.LOCAL_TXN, TxnCapability.MULTI_SSNS_PER_TXN, TxnCapability.MULTI_TXNS_PER_SSN);
}
public Error messageTransfer(Transfer xfr)
@@ -127,21 +129,14 @@ public class TxnCoordinatorReceivingLink
if(command instanceof Declare)
{
- Integer txnId = Integer.valueOf(0);
- Iterator<Integer> existingTxn = _openTransactions.keySet().iterator();
- while(existingTxn.hasNext())
- {
- txnId = existingTxn.next();
- }
- txnId = Integer.valueOf(txnId.intValue() + 1);
-
- _openTransactions.put(txnId, new LocalTransaction(_namedAddressSpace.getMessageStore()));
+ final IdentifiedTransaction txn = _session.getConnection().createLocalTransaction();
+ _createdTransactions.put(txn.getId(), txn.getServerTransaction());
Declared state = new Declared();
_session.incrementStartedTransactions();
- state.setTxnId(_session.integerToBinary(txnId));
+ state.setTxnId(_session.integerToBinary(txn.getId()));
_endpoint.updateDisposition(deliveryTag, state, true);
}
@@ -180,9 +175,17 @@ public class TxnCoordinatorReceivingLink
public void remoteDetached(LinkEndpoint endpoint, Detach detach)
{
+ // force rollback of open transactions
+ for(Map.Entry<Integer, ServerTransaction> entry : _createdTransactions.entrySet())
+ {
+ entry.getValue().rollback();
+ _session.incrementRolledBackTransactions();
+ _session.getConnection().removeTransaction(entry.getKey());
+ }
endpoint.detach();
}
+
@Override
public void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
{
@@ -192,7 +195,7 @@ public class TxnCoordinatorReceivingLink
private Error discharge(Integer transactionId, boolean fail)
{
Error error = null;
- ServerTransaction txn = _openTransactions.get(transactionId);
+ ServerTransaction txn = _createdTransactions.get(transactionId);
if(txn != null)
{
if(fail)
@@ -212,11 +215,9 @@ public class TxnCoordinatorReceivingLink
error = new Error();
error.setCondition(LinkError.DETACH_FORCED);
error.setDescription("The transaction was marked as rollback only due to an earlier issue (e.g. a published message was sent settled but could not be enqueued)");
- _openTransactions.remove(transactionId);
-
- return error;
}
- _openTransactions.remove(transactionId);
+ _createdTransactions.remove(transactionId);
+ _session.getConnection().removeTransaction(transactionId);
}
else
{
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Coordinator.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Coordinator.java?rev=1782255&r1=1782254&r2=1782255&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Coordinator.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Coordinator.java Wed Feb 8 20:40:51 2017
@@ -42,7 +42,7 @@ public class Coordinator
return _capabilities;
}
- public void setCapabilities(TxnCapability[] capabilities)
+ public void setCapabilities(TxnCapability... capabilities)
{
_capabilities = capabilities;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org