You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2017/02/08 20:40:52 UTC

svn commit: r1782255 - in /qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0: ./ type/transaction/

Author: rgodfrey
Date: Wed Feb  8 20:40:51 2017
New Revision: 1782255

URL: http://svn.apache.org/viewvc?rev=1782255&view=rev
Log:
QPID-7667 : Implement multi-session spanning transactions

Added:
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/IdentifiedTransaction.java   (with props)
Modified:
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Coordinator.java

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1782255&r1=1782254&r2=1782255&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Wed Feb  8 20:40:51 2017
@@ -20,6 +20,7 @@
 
 package org.apache.qpid.server.protocol.v1_0;
 
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
@@ -33,6 +34,7 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.transport.ProtocolEngine;
+import org.apache.qpid.server.txn.ServerTransaction;
 
 @ManagedObject(category = false, creatable = false, type="AMQP_1_0")
 public interface AMQPConnection_1_0<C extends AMQPConnection_1_0<C>> extends AMQPConnection<C>,
@@ -62,4 +64,9 @@ public interface AMQPConnection_1_0<C ex
     boolean isClosed();
 
     void close(Error error);
+
+    Iterator<IdentifiedTransaction> getOpenTransactions();
+    IdentifiedTransaction createLocalTransaction();
+    ServerTransaction getTransaction(int txnId);
+    void removeTransaction(int txnId);
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java?rev=1782255&r1=1782254&r2=1782255&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java Wed Feb  8 20:40:51 2017
@@ -62,6 +62,7 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
 import org.apache.qpid.server.protocol.v1_0.codec.ProtocolHandler;
 import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
+import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
 import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
 import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
 import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler;
@@ -76,7 +77,6 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
 import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
@@ -106,14 +106,16 @@ import org.apache.qpid.server.session.AM
 import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.transport.AbstractAMQPConnection;
 import org.apache.qpid.server.transport.AggregateTicker;
+import org.apache.qpid.server.transport.ByteBufferSender;
 import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.transport.ServerNetworkConnection;
+import org.apache.qpid.server.transport.util.Functions;
+import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
-import org.apache.qpid.server.transport.ByteBufferSender;
-import org.apache.qpid.server.transport.util.Functions;
 
 public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnection_1_0Impl, ConnectionHandler>
         implements FrameOutputHandler,
@@ -236,6 +238,10 @@ public class AMQPConnection_1_0Impl exte
     private final Set<AMQPSession<?,?>> _sessionsWithWork =
             Collections.newSetFromMap(new ConcurrentHashMap<AMQPSession<?,?>, Boolean>());
 
+
+    // Multi session transactions
+    private volatile ServerTransaction[] _openTransactions = new ServerTransaction[16];
+
     AMQPConnection_1_0Impl(final Broker<?> broker,
                            final ServerNetworkConnection network,
                            AmqpPort<?> port,
@@ -1723,4 +1729,94 @@ public class AMQPConnection_1_0Impl exte
     {
         super.initialiseHeartbeating(writerDelay, readerDelay);
     }
+
+    @Override
+    public Iterator<IdentifiedTransaction> getOpenTransactions()
+    {
+        return new Iterator<IdentifiedTransaction>()
+        {
+            int _index = 0;
+
+            @Override
+            public boolean hasNext()
+            {
+                for(int i = _index; i < _openTransactions.length; i++)
+                {
+                    if(_openTransactions[i] != null)
+                    {
+                        return true;
+                    }
+                }
+                return false;
+            }
+
+            @Override
+            public IdentifiedTransaction next()
+            {
+                IdentifiedTransaction txn = null;
+                for( ; _index < _openTransactions.length; _index++)
+                {
+                    if(_openTransactions[_index] != null)
+                    {
+                        txn = new IdentifiedTransaction(_index, _openTransactions[_index]);
+                        _index++;
+                        return txn;
+                    }
+                }
+
+                throw new NoSuchElementException();
+            }
+
+            @Override
+            public void remove()
+            {
+                _openTransactions[_index] = null;
+            }
+        };
+    }
+
+    @Override
+    public IdentifiedTransaction createLocalTransaction()
+    {
+        ServerTransaction[] openTransactions = _openTransactions;
+        final int maxOpenTransactions = openTransactions.length;
+        int id = 0;
+        for(; id < maxOpenTransactions; id++)
+        {
+            if(openTransactions[id] == null)
+            {
+                break;
+
+            }
+        }
+
+        // we need to expand the transaction array;
+        if(id == maxOpenTransactions)
+        {
+            final int newSize = maxOpenTransactions < 1024 ? 2*maxOpenTransactions : maxOpenTransactions + 1024;
+
+            _openTransactions = new ServerTransaction[2*maxOpenTransactions];
+            System.arraycopy(openTransactions, 0, _openTransactions, 0, maxOpenTransactions);
+
+        }
+
+        final LocalTransaction serverTransaction =
+                new LocalTransaction(getAddressSpace().getMessageStore());
+        _openTransactions[id] = serverTransaction;
+        return new IdentifiedTransaction(id, serverTransaction);
+    }
+
+    @Override
+    public ServerTransaction getTransaction(final int txnId)
+    {
+        // TODO - bounds check
+        return _openTransactions[txnId];
+    }
+
+    @Override
+    public void removeTransaction(final int txnId)
+    {
+        // TODO - bounds check
+        _openTransactions[txnId] = null;
+    }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1782255&r1=1782254&r2=1782255&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Wed Feb  8 20:40:51 2017
@@ -62,6 +62,7 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.transport.ProtocolEngine;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 
@@ -239,6 +240,10 @@ class ConsumerTarget_1_0 extends Abstrac
                             }
                         });
                     }
+                    else
+                    {
+                        // TODO - deal with the case of an invalid txn id
+                    }
 
                 }
                 getSession().getAMQPConnection().registerMessageDelivered(message.getSize());
@@ -358,24 +363,29 @@ class ConsumerTarget_1_0 extends Abstrac
 
             Binary transactionId = null;
             final Outcome outcome;
+            ServerTransaction txn;
             // If disposition is settled this overrides the txn?
             if(state instanceof TransactionalState)
             {
                 transactionId = ((TransactionalState)state).getTxnId();
                 outcome = ((TransactionalState)state).getOutcome();
+                txn = _link.getTransaction(transactionId);
+                if(txn == null)
+                {
+                    // TODO - invalid txn id supplied
+                }
             }
             else if (state instanceof Outcome)
             {
                 outcome = (Outcome) state;
+                txn = new AutoCommitTransaction(getSession().getConnection().getAddressSpace().getMessageStore());
             }
             else
             {
                 outcome = null;
+                txn = null;
             }
 
-
-            ServerTransaction txn = _link.getTransaction(transactionId);
-
             if(outcome instanceof Accepted)
             {
                 if (_queueEntry.makeAcquisitionUnstealable(getConsumer()))

Added: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/IdentifiedTransaction.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/IdentifiedTransaction.java?rev=1782255&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/IdentifiedTransaction.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/IdentifiedTransaction.java Wed Feb  8 20:40:51 2017
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.server.txn.ServerTransaction;
+
+public class IdentifiedTransaction
+{
+    private final int _id;
+    private final ServerTransaction _serverTransaction;
+
+    public IdentifiedTransaction(final int id, final ServerTransaction serverTransaction)
+    {
+        _id = id;
+        _serverTransaction = serverTransaction;
+    }
+
+    public int getId()
+    {
+        return _id;
+    }
+
+    public ServerTransaction getServerTransaction()
+    {
+        return _serverTransaction;
+    }
+}

Propchange: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/IdentifiedTransaction.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1782255&r1=1782254&r2=1782255&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Wed Feb  8 20:40:51 2017
@@ -35,7 +35,6 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -117,7 +116,6 @@ import org.apache.qpid.server.security.S
 import org.apache.qpid.server.session.AbstractAMQPSession;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.transport.AMQPConnection;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -131,12 +129,8 @@ public class Session_1_0 extends Abstrac
     private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
     private static final EnumSet<SessionState> END_STATES =
             EnumSet.of(SessionState.END_RECVD, SessionState.END_PIPE, SessionState.END_SENT, SessionState.ENDED);
-    private AutoCommitTransaction _transaction;
 
-    private final LinkedHashMap<Integer, ServerTransaction> _openTransactions =
-            new LinkedHashMap<Integer, ServerTransaction>();
-
-    private final AMQPConnection_1_0 _connection;
+    private final AMQPConnection_1_0<?> _connection;
     private AtomicBoolean _closed = new AtomicBoolean();
 
     private final CopyOnWriteArrayList<Consumer<?, ConsumerTarget_1_0>> _consumers = new CopyOnWriteArrayList<>();
@@ -150,6 +144,8 @@ public class Session_1_0 extends Abstrac
     private final Map<LinkEndpoint, UnsignedInteger> _localLinkEndpoints = new HashMap<>();
     private final Map<UnsignedInteger, LinkEndpoint> _remoteLinkEndpoints = new HashMap<>();
 
+    private final List<TxnCoordinatorReceivingLink_1_0> _txnCoordinatorLinks = new ArrayList<>();
+
     private short _receivingChannel;
     private final short _sendingChannel;
 
@@ -940,8 +936,8 @@ public class Session_1_0 extends Abstrac
         final TxnCoordinatorReceivingLink_1_0 coordinatorLink =
                 new TxnCoordinatorReceivingLink_1_0(getAddressSpace(),
                                                     this,
-                                                    receivingLinkEndpoint,
-                                                    _openTransactions);
+                                                    receivingLinkEndpoint
+                );
         receivingLinkEndpoint.setLink(coordinatorLink);
         return coordinatorLink;
     }
@@ -1469,30 +1465,12 @@ public class Session_1_0 extends Abstrac
 
     ServerTransaction getTransaction(Binary transactionId)
     {
-
-        ServerTransaction transaction = _openTransactions.get(binaryToInteger(transactionId));
-        if(transactionId == null)
-        {
-            if(_transaction == null)
-            {
-                _transaction = new AutoCommitTransaction(_connection.getAddressSpace().getMessageStore());
-            }
-            transaction = _transaction;
-        }
-        return transaction;
+        // TODO - deal with the case where the txn id is invalid
+        return _connection.getTransaction(binaryToInteger(transactionId));
     }
 
     void remoteEnd(End end)
     {
-        // TODO - if the end has a non empty error we should log it
-        Iterator<Map.Entry<Integer, ServerTransaction>> iter = _openTransactions.entrySet().iterator();
-
-        while(iter.hasNext())
-        {
-            Map.Entry<Integer, ServerTransaction> entry = iter.next();
-            entry.getValue().rollback();
-            iter.remove();
-        }
 
         for(LinkEndpoint linkEndpoint : getLocalLinkEndpoints())
         {
@@ -1796,7 +1774,7 @@ public class Session_1_0 extends Abstrac
                                     _sendingChannel) + "] ";
     }
 
-    public AMQPConnection_1_0 getConnection()
+    public AMQPConnection_1_0<?> getConnection()
     {
         return _connection;
     }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java?rev=1782255&r1=1782254&r2=1782255&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java Wed Feb  8 20:40:51 2017
@@ -235,10 +235,7 @@ public class StandardReceivingLink_1_0 i
                 }
                 else
                 {
-                    Session_1_0 session = getSession();
-                    transaction = session != null
-                            ? session.getTransaction(null)
-                            : new AutoCommitTransaction(_addressSpace.getMessageStore());
+                    transaction = new AutoCommitTransaction(_addressSpace.getMessageStore());
                 }
 
                 try

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java?rev=1782255&r1=1782254&r2=1782255&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java Wed Feb  8 20:40:51 2017
@@ -21,9 +21,9 @@
 package org.apache.qpid.server.protocol.v1_0;
 
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,9 +40,11 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Declare;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Declared;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TxnCapability;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
@@ -59,19 +61,19 @@ public class TxnCoordinatorReceivingLink
 
     private ArrayList<Transfer> _incompleteMessage;
     private SectionDecoder _sectionDecoder;
-    private LinkedHashMap<Integer, ServerTransaction> _openTransactions;
+    private final LinkedHashMap<Integer, ServerTransaction> _createdTransactions = new LinkedHashMap<>();
     private Session_1_0 _session;
 
 
     public TxnCoordinatorReceivingLink_1_0(NamedAddressSpace namedAddressSpace,
-                                           Session_1_0 session_1_0, ReceivingLinkEndpoint endpoint,
-                                           LinkedHashMap<Integer, ServerTransaction> openTransactions)
+                                           Session_1_0 session_1_0,
+                                           ReceivingLinkEndpoint endpoint)
     {
         _namedAddressSpace = namedAddressSpace;
         _session = session_1_0;
         _endpoint  = endpoint;
         _sectionDecoder = new SectionDecoderImpl(endpoint.getSession().getConnection().getDescribedTypeRegistry().getSectionDecoderRegistry());
-        _openTransactions = openTransactions;
+        ((Coordinator)endpoint.getTarget()).setCapabilities(TxnCapability.LOCAL_TXN, TxnCapability.MULTI_SSNS_PER_TXN, TxnCapability.MULTI_TXNS_PER_SSN);
     }
 
     public Error messageTransfer(Transfer xfr)
@@ -127,21 +129,14 @@ public class TxnCoordinatorReceivingLink
 
                     if(command instanceof Declare)
                     {
-                        Integer txnId = Integer.valueOf(0);
-                        Iterator<Integer> existingTxn  = _openTransactions.keySet().iterator();
-                        while(existingTxn.hasNext())
-                        {
-                            txnId = existingTxn.next();
-                        }
-                        txnId = Integer.valueOf(txnId.intValue() + 1);
-
-                        _openTransactions.put(txnId, new LocalTransaction(_namedAddressSpace.getMessageStore()));
+                        final IdentifiedTransaction txn = _session.getConnection().createLocalTransaction();
+                        _createdTransactions.put(txn.getId(), txn.getServerTransaction());
 
                         Declared state = new Declared();
 
                         _session.incrementStartedTransactions();
 
-                        state.setTxnId(_session.integerToBinary(txnId));
+                        state.setTxnId(_session.integerToBinary(txn.getId()));
                         _endpoint.updateDisposition(deliveryTag, state, true);
 
                     }
@@ -180,9 +175,17 @@ public class TxnCoordinatorReceivingLink
 
     public void remoteDetached(LinkEndpoint endpoint, Detach detach)
     {
+        // force rollback of open transactions
+        for(Map.Entry<Integer, ServerTransaction> entry : _createdTransactions.entrySet())
+        {
+            entry.getValue().rollback();
+            _session.incrementRolledBackTransactions();
+            _session.getConnection().removeTransaction(entry.getKey());
+        }
         endpoint.detach();
     }
 
+
     @Override
     public void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
     {
@@ -192,7 +195,7 @@ public class TxnCoordinatorReceivingLink
     private Error discharge(Integer transactionId, boolean fail)
     {
         Error error = null;
-        ServerTransaction txn = _openTransactions.get(transactionId);
+        ServerTransaction txn = _createdTransactions.get(transactionId);
         if(txn != null)
         {
             if(fail)
@@ -212,11 +215,9 @@ public class TxnCoordinatorReceivingLink
                 error = new Error();
                 error.setCondition(LinkError.DETACH_FORCED);
                 error.setDescription("The transaction was marked as rollback only due to an earlier issue (e.g. a published message was sent settled but could not be enqueued)");
-                _openTransactions.remove(transactionId);
-
-                return error;
             }
-            _openTransactions.remove(transactionId);
+            _createdTransactions.remove(transactionId);
+            _session.getConnection().removeTransaction(transactionId);
         }
         else
         {

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Coordinator.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Coordinator.java?rev=1782255&r1=1782254&r2=1782255&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Coordinator.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Coordinator.java Wed Feb  8 20:40:51 2017
@@ -42,7 +42,7 @@ public class Coordinator
         return _capabilities;
     }
 
-    public void setCapabilities(TxnCapability[] capabilities)
+    public void setCapabilities(TxnCapability... capabilities)
     {
         _capabilities = capabilities;
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org