You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/02/16 16:11:27 UTC

svn commit: r1783244 [1/2] - /qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/

Author: lquack
Date: Thu Feb 16 16:11:27 2017
New Revision: 1783244

URL: http://svn.apache.org/viewvc?rev=1783244&view=rev
Log:
QPID-7656: [Java Broker] Move functionality from Link to LinkEndpoint

Added:
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
      - copied, changed from r1783242, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
      - copied, changed from r1783242, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
Removed:
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java
Modified:
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1783244&r1=1783243&r2=1783244&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Thu Feb 16 16:11:27 2017
@@ -70,41 +70,40 @@ class ConsumerTarget_1_0 extends Abstrac
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerTarget_1_0.class);
     private final boolean _acquires;
-    private final SendingLink_1_0 _link;
 
     private long _deliveryTag = 0L;
 
     private Binary _transactionId;
     private final AMQPDescribedTypeRegistry _typeRegistry;
+    private SendingLinkEndpoint _linkEndpoint;
     private final SectionEncoder _sectionEncoder;
     private boolean _queueEmpty;
 
-    public ConsumerTarget_1_0(final SendingLink_1_0 link,
-                              boolean acquires)
+    public ConsumerTarget_1_0(final SendingLinkEndpoint linkEndpoint, boolean acquires)
     {
-        super(false, link.getSession().getAMQPConnection());
-        _link = link;
-        _typeRegistry = link.getEndpoint().getSession().getConnection().getDescribedTypeRegistry();
+        super(false, linkEndpoint.getSession().getAMQPConnection());
+        _typeRegistry = linkEndpoint.getSession().getConnection().getDescribedTypeRegistry();
+        _linkEndpoint = linkEndpoint;
         _sectionEncoder = new SectionEncoderImpl(_typeRegistry);
         _acquires = acquires;
     }
 
     private SendingLinkEndpoint getEndpoint()
     {
-        return _link.getEndpoint();
+        return _linkEndpoint;
     }
 
     @Override
     public void updateNotifyWorkDesired()
     {
         boolean state = false;
-        Session_1_0 session = _link.getSession();
+        Session_1_0 session = _linkEndpoint.getSession();
         if (session != null)
         {
             final AMQPConnection<?> amqpConnection = session.getAMQPConnection();
 
             state = !amqpConnection.isTransportBlockedForWriting()
-                    && _link.isAttached()
+                    && _linkEndpoint.isAttached()
                     && getEndpoint().hasCreditToSend();
         }
         setNotifyWorkDesired(state);
@@ -125,7 +124,7 @@ class ConsumerTarget_1_0 extends Abstrac
         {
             converter =
                     (MessageConverter<? super ServerMessage, Message_1_0>) MessageConverterRegistry.getConverter(serverMessage.getClass(), Message_1_0.class);
-            message = converter.convert(serverMessage, _link.getAddressSpace());
+            message = converter.convert(serverMessage, _linkEndpoint.getAddressSpace());
         }
 
         Transfer transfer = new Transfer();
@@ -199,7 +198,7 @@ class ConsumerTarget_1_0 extends Abstrac
 
             transfer.setDeliveryTag(tag);
 
-            if (_link.isAttached())
+            if (_linkEndpoint.isAttached())
             {
                 if (SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode()))
                 {
@@ -209,9 +208,9 @@ class ConsumerTarget_1_0 extends Abstrac
                 {
                     UnsettledAction action = _acquires
                             ? new DispositionAction(tag, entry, consumer)
-                            : new DoNothingAction(tag, entry);
+                            : new DoNothingAction();
 
-                    _link.addUnsettled(tag, action, entry);
+                    _linkEndpoint.addUnsettled(tag, action, entry);
                 }
 
                 if (_transactionId != null)
@@ -223,7 +222,7 @@ class ConsumerTarget_1_0 extends Abstrac
                 // TODO - need to deal with failure here
                 if (_acquires && _transactionId != null)
                 {
-                    ServerTransaction txn = _link.getTransaction(_transactionId);
+                    ServerTransaction txn = _linkEndpoint.getTransaction(_transactionId);
                     if (txn != null)
                     {
                         txn.addPostTransactionAction(new ServerTransaction.Action()
@@ -236,7 +235,7 @@ class ConsumerTarget_1_0 extends Abstrac
                             public void onRollback()
                             {
                                 entry.release(consumer);
-                                _link.getEndpoint().updateDisposition(tag, (DeliveryState) null, true);
+                                _linkEndpoint.updateDisposition(tag, (DeliveryState) null, true);
                             }
                         });
                     }
@@ -289,14 +288,13 @@ class ConsumerTarget_1_0 extends Abstrac
     public boolean allocateCredit(final ServerMessage msg)
     {
         ProtocolEngine protocolEngine = getSession().getConnection();
-        final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend();
+        final boolean hasCredit = _linkEndpoint.isAttached() && getEndpoint().hasCreditToSend();
 
         updateNotifyWorkDesired();
 
         if (hasCredit)
         {
-            SendingLinkEndpoint linkEndpoint = _link.getEndpoint();
-            linkEndpoint.setLinkCredit(linkEndpoint.getLinkCredit().subtract(UnsignedInteger.ONE));
+            _linkEndpoint.setLinkCredit(_linkEndpoint.getLinkCredit().subtract(UnsignedInteger.ONE));
         }
 
         return hasCredit;
@@ -305,14 +303,13 @@ class ConsumerTarget_1_0 extends Abstrac
 
     public void restoreCredit(final ServerMessage message)
     {
-        final SendingLinkEndpoint endpoint = _link.getEndpoint();
-        endpoint.setLinkCredit(endpoint.getLinkCredit().add(UnsignedInteger.ONE));
+        _linkEndpoint.setLinkCredit(_linkEndpoint.getLinkCredit().add(UnsignedInteger.ONE));
         updateNotifyWorkDesired();
     }
 
     public void queueEmpty()
     {
-        if(_link.drained())
+        if(_linkEndpoint.drained())
         {
             updateNotifyWorkDesired();
         }
@@ -324,14 +321,14 @@ class ConsumerTarget_1_0 extends Abstrac
 
         if (isSuspended() && getEndpoint() != null)
         {
-            _transactionId = _link.getTransactionId();
+            _transactionId = _linkEndpoint.getTransactionId();
         }
     }
 
     @Override
     public Session_1_0 getSession()
     {
-        return _link.getSession();
+        return _linkEndpoint.getSession();
     }
 
     public void flush()
@@ -369,7 +366,7 @@ class ConsumerTarget_1_0 extends Abstrac
             {
                 transactionId = ((TransactionalState)state).getTxnId();
                 outcome = ((TransactionalState)state).getOutcome();
-                txn = _link.getTransaction(transactionId);
+                txn = _linkEndpoint.getTransaction(transactionId);
                 if(txn == null)
                 {
                     // TODO - invalid txn id supplied
@@ -416,13 +413,13 @@ class ConsumerTarget_1_0 extends Abstrac
                         {
                             if(Boolean.TRUE.equals(settled))
                             {
-                                _link.getEndpoint().settle(_deliveryTag);
+                                _linkEndpoint.settle(_deliveryTag);
                             }
                             else
                             {
-                                _link.getEndpoint().updateDisposition(_deliveryTag, (DeliveryState) outcome, true);
+                                _linkEndpoint.updateDisposition(_deliveryTag, (DeliveryState) outcome, true);
                             }
-                            _link.getEndpoint().sendFlowConditional();
+                            _linkEndpoint.sendFlowConditional();
                         }
 
                         @Override
@@ -445,13 +442,13 @@ class ConsumerTarget_1_0 extends Abstrac
                     {
 
                         _queueEntry.release(getConsumer());
-                        _link.getEndpoint().settle(_deliveryTag);
+                        _linkEndpoint.settle(_deliveryTag);
                     }
 
                     @Override
                     public void onRollback()
                     {
-                        _link.getEndpoint().settle(_deliveryTag);
+                        _linkEndpoint.settle(_deliveryTag);
 
                         // TODO: apply source's default outcome if settled
                     }
@@ -475,7 +472,7 @@ class ConsumerTarget_1_0 extends Abstrac
                         {
                             _queueEntry.release(getConsumer());
                         }
-                        _link.getEndpoint().settle(_deliveryTag);
+                        _linkEndpoint.settle(_deliveryTag);
                     }
 
                     @Override
@@ -496,9 +493,9 @@ class ConsumerTarget_1_0 extends Abstrac
                     @Override
                     public void postCommit()
                     {
-                        _link.getEndpoint().settle(_deliveryTag);
+                        _linkEndpoint.settle(_deliveryTag);
                         incrementDeliveryCountOrRouteToAlternateOrDiscard();
-                        _link.getEndpoint().sendFlowConditional();
+                        _linkEndpoint.sendFlowConditional();
                     }
 
                     @Override
@@ -520,8 +517,8 @@ class ConsumerTarget_1_0 extends Abstrac
         {
             final Modified modified = new Modified();
             modified.setDeliveryFailed(true);
-            _link.getEndpoint().updateDisposition(_deliveryTag, modified, true);
-            _link.getEndpoint().sendFlowConditional();
+            _linkEndpoint.updateDisposition(_deliveryTag, modified, true);
+            _linkEndpoint.sendFlowConditional();
             incrementDeliveryCountOrRouteToAlternateOrDiscard();
         }
 
@@ -541,7 +538,7 @@ class ConsumerTarget_1_0 extends Abstrac
 
         private void routeToAlternateOrDiscard()
         {
-            final Session_1_0 session = _link.getSession();
+            final Session_1_0 session = _linkEndpoint.getSession();
             final ServerMessage message = _queueEntry.getMessage();
             final EventLogger eventLogger = session.getEventLogger();
             final LogSubject logSubject = session.getLogSubject();
@@ -590,25 +587,12 @@ class ConsumerTarget_1_0 extends Abstrac
 
     private class DoNothingAction implements UnsettledAction
     {
-        public DoNothingAction(final Binary tag,
-                               final MessageInstance queueEntry)
+        public DoNothingAction()
         {
         }
 
         public boolean process(final DeliveryState state, final Boolean settled)
         {
-            Binary transactionId = null;
-            Outcome outcome = null;
-            // If disposition is settled this overrides the txn?
-            if(state instanceof TransactionalState)
-            {
-                transactionId = ((TransactionalState)state).getTxnId();
-                outcome = ((TransactionalState)state).getOutcome();
-            }
-            else if (state instanceof Outcome)
-            {
-                outcome = (Outcome) state;
-            }
             return true;
         }
     }
@@ -621,9 +605,9 @@ class ConsumerTarget_1_0 extends Abstrac
     @Override
     public String getTargetAddress()
     {
-        BaseTarget target = _link.getEndpoint().getTarget();
+        BaseTarget target = _linkEndpoint.getTarget();
 
-        return target instanceof org.apache.qpid.server.protocol.v1_0.type.messaging.Target ? ((org.apache.qpid.server.protocol.v1_0.type.messaging.Target) target).getAddress() : _link.getEndpoint().getName();
+        return target instanceof org.apache.qpid.server.protocol.v1_0.type.messaging.Target ? ((org.apache.qpid.server.protocol.v1_0.type.messaging.Target) target).getAddress() : _linkEndpoint.getName();
     }
 
     @Override
@@ -643,6 +627,6 @@ class ConsumerTarget_1_0 extends Abstrac
     @Override
     public String toString()
     {
-        return "ConsumerTarget_1_0[linkSession=" + _link.getSession().toLogString() + "]";
+        return "ConsumerTarget_1_0[linkSession=" + _linkEndpoint.getSession().toLogString() + "]";
     }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java?rev=1783244&r1=1783243&r2=1783244&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java Thu Feb 16 16:11:27 2017
@@ -24,10 +24,10 @@ package org.apache.qpid.server.protocol.
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
 import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
@@ -57,7 +57,7 @@ public abstract class LinkEndpoint<T ext
     private volatile boolean _stoppedUpdated;
     private Symbol[] _capabilities;
 
-    private enum State
+    protected enum State
     {
         DETACHED,
         ATTACH_SENT,
@@ -72,7 +72,7 @@ public abstract class LinkEndpoint<T ext
     private Session_1_0 _session;
 
 
-    private volatile State _state = State.DETACHED;
+    protected volatile State _state = State.DETACHED;
 
     private BaseSource _source;
     private BaseTarget _target;
@@ -84,8 +84,6 @@ public abstract class LinkEndpoint<T ext
     private UnsignedLong _maxMessageSize;
     private Map<Symbol, Object> _properties;
 
-    private Map<Binary,Delivery> _unsettledTransfers = new HashMap<Binary,Delivery>();
-
     LinkEndpoint(final Session_1_0 sessionEndpoint,final Attach attach)
     {
         _session = sessionEndpoint;
@@ -96,6 +94,8 @@ public abstract class LinkEndpoint<T ext
         _state = State.ATTACH_RECVD;
     }
 
+    public abstract void start();
+
     public boolean isStopped()
     {
         return _stopped;
@@ -125,6 +125,11 @@ public abstract class LinkEndpoint<T ext
         return _source;
     }
 
+    public NamedAddressSpace getAddressSpace()
+    {
+        return getSession().getConnection().getAddressSpace();
+    }
+
     public void setSource(final BaseSource source)
     {
         _source = source;
@@ -189,29 +194,26 @@ public abstract class LinkEndpoint<T ext
                 break;
             case ATTACHED:
                 _state = State.DETACH_RECVD;
-                _link.remoteDetached(LinkEndpoint.this, detach);
+                remoteDetachedPerformDetach(detach);
                 break;
         }
     }
 
+    protected abstract void remoteDetachedPerformDetach(final Detach detach);
+
     public void receiveFlow(final Flow flow)
     {
     }
 
     public void addUnsettled(final Delivery unsettled)
     {
-        _unsettledTransfers.put(unsettled.getDeliveryTag(), unsettled);
     }
 
     public void receiveDeliveryState(final Delivery unsettled,
                                      final DeliveryState state,
                                      final Boolean settled)
     {
-        // TODO
-        if (_link != null)
-        {
-            _link.handle(unsettled.getDeliveryTag(), state, settled);
-        }
+        handle(unsettled.getDeliveryTag(), state, settled);
 
         if (Boolean.TRUE.equals(settled))
         {
@@ -219,9 +221,11 @@ public abstract class LinkEndpoint<T ext
         }
     }
 
+    protected abstract void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled);
+
     public void settle(final Binary deliveryTag)
     {
-        _unsettledTransfers.remove(deliveryTag);
+
     }
 
     void setLocalHandle(final UnsignedInteger localHandle)
@@ -289,6 +293,11 @@ public abstract class LinkEndpoint<T ext
         return _session;
     }
 
+    public void setSession(final Session_1_0 session)
+    {
+        _session = session;
+    }
+
     UnsignedInteger getLocalHandle()
     {
         return _localHandle;

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java?rev=1783244&r1=1783243&r2=1783244&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java Thu Feb 16 16:11:27 2017
@@ -28,9 +28,4 @@ import org.apache.qpid.server.protocol.v
 public interface Link_1_0 extends LinkModel
 {
 
-    void remoteDetached(final LinkEndpoint endpoint, Detach detach);
-
-    void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled);
-
-    void start();
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java?rev=1783244&r1=1783243&r2=1783244&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java Thu Feb 16 16:11:27 2017
@@ -37,11 +37,12 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 
-public class ReceivingLinkEndpoint extends LinkEndpoint<ReceivingLink_1_0>
+public abstract class ReceivingLinkEndpoint extends LinkEndpoint<ReceivingLink_1_0>
 {
 
 
     private UnsignedInteger _lastDeliveryId;
+    private ReceivingDestination _receivingDestination;
 
     private static class TransientState
     {
@@ -144,7 +145,7 @@ public class ReceivingLinkEndpoint exten
             {
                 _unsettledMap.remove(deliveryTag);
             }
-            return getLink().messageTransfer(transfer);
+            return messageTransfer(transfer);
         }
         else
         {
@@ -153,6 +154,18 @@ public class ReceivingLinkEndpoint exten
         }
     }
 
+    protected abstract Error messageTransfer(final Transfer transfer);
+
+    public ReceivingDestination getReceivingDestination()
+    {
+        return _receivingDestination;
+    }
+
+    public void setDestination(final ReceivingDestination receivingDestination)
+    {
+        _receivingDestination = receivingDestination;
+    }
+
     @Override public void receiveFlow(final Flow flow)
     {
         super.receiveFlow(flow);

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java?rev=1783244&r1=1783243&r2=1783244&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java Thu Feb 16 16:11:27 2017
@@ -25,5 +25,5 @@ import org.apache.qpid.server.protocol.v
 
 public interface ReceivingLink_1_0 extends Link_1_0
 {
-    Error messageTransfer(Transfer xfr);
+    void setLinkAttachmentToNull();
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java?rev=1783244&r1=1783243&r2=1783244&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java Thu Feb 16 16:11:27 2017
@@ -21,39 +21,231 @@
 
 package org.apache.qpid.server.protocol.v1_0;
 
+import java.security.AccessControlException;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.consumer.ConsumerOption;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.JMSSelectorFilter;
+import org.apache.qpid.server.filter.SelectorParsingException;
+import org.apache.qpid.server.filter.selector.ParseException;
+import org.apache.qpid.server.filter.selector.TokenMgrError;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.NotFoundException;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
+import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Modified;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.NoLocalFilter;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Released;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.StdDistMode;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 
 public class SendingLinkEndpoint extends LinkEndpoint<SendingLink_1_0>
 {
+    private static final Logger LOGGER = LoggerFactory.getLogger(SendingLinkEndpoint.class);
 
     public static final Symbol PRIORITY = Symbol.valueOf("priority");
     private UnsignedInteger _lastDeliveryId;
     private Binary _lastDeliveryTag;
-    private Map<Binary, UnsignedInteger> _unsettledMap = new HashMap<Binary, UnsignedInteger>();
+    private Map<Binary, UnsignedInteger> _unsettledMap = new HashMap<>();
+    private Map<Binary, MessageInstance> _unsettledMap2 = new HashMap<>();
     private Binary _transactionId;
     private Integer _priority;
+    private final List<Binary> _resumeAcceptedTransfers = new ArrayList<>();
+    private final List<MessageInstance> _resumeFullTransfers = new ArrayList<>();
+    private volatile boolean _draining = false;
+    private final ConcurrentMap<Binary, UnsettledAction> _unsettledActionMap = new ConcurrentHashMap<>();
+    private TerminusDurability _durability;
+    private SendingDestination _destination;
+    private EnumSet<ConsumerOption> _consumerOptions;
+    private FilterManager _consumerFilters;
+    private ConsumerTarget_1_0 _consumerTarget;
+    private MessageInstanceConsumer<ConsumerTarget_1_0> _consumer;
 
-
-    public SendingLinkEndpoint(final Session_1_0 sessionEndpoint, final Attach attach)
+    public SendingLinkEndpoint(final Session_1_0 session, final Attach attach)
     {
-        super(sessionEndpoint, attach);
+        super(session, attach);
         setSendingSettlementMode(attach.getSndSettleMode());
         setReceivingSettlementMode(attach.getRcvSettleMode());
         init();
     }
 
     @Override
+    public void start()
+    {
+    }
+
+    public void doStuff(final SendingDestination destination) throws AmqpErrorException
+    {
+        _destination = destination;
+        final Source source = (Source) getSource();
+
+        EnumSet<ConsumerOption> options = EnumSet.noneOf(ConsumerOption.class);
+
+        boolean noLocal = false;
+        JMSSelectorFilter messageFilter = null;
+
+        if(destination instanceof ExchangeDestination)
+        {
+            options.add(ConsumerOption.ACQUIRES);
+            options.add(ConsumerOption.SEES_REQUEUES);
+        }
+        else if(destination instanceof MessageSourceDestination)
+        {
+            MessageSource messageSource = _destination.getMessageSource();
+
+            if(messageSource instanceof Queue && ((Queue<?>)messageSource).getAvailableAttributes().contains("topic"))
+            {
+                source.setDistributionMode(StdDistMode.COPY);
+            }
+
+            Map<Symbol,Filter> filters = source.getFilter();
+
+            Map<Symbol,Filter> actualFilters = new HashMap<Symbol,Filter>();
+
+            if(filters != null)
+            {
+                for(Map.Entry<Symbol,Filter> entry : filters.entrySet())
+                {
+                    if(entry.getValue() instanceof NoLocalFilter)
+                    {
+                        actualFilters.put(entry.getKey(), entry.getValue());
+                        noLocal = true;
+                    }
+                    else if(messageFilter == null && entry.getValue() instanceof org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter)
+                    {
+
+                        org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter selectorFilter = (org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter) entry.getValue();
+                        try
+                        {
+                            messageFilter = new JMSSelectorFilter(selectorFilter.getValue());
+
+                            actualFilters.put(entry.getKey(), entry.getValue());
+                        }
+                        catch (ParseException | SelectorParsingException | TokenMgrError e)
+                        {
+                            Error error = new Error();
+                            error.setCondition(AmqpError.INVALID_FIELD);
+                            error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue());
+                            error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter")));
+                            throw new AmqpErrorException(error);
+                        }
+
+
+                    }
+                }
+            }
+            source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
+
+            if(source.getDistributionMode() != StdDistMode.COPY)
+            {
+                options.add(ConsumerOption.ACQUIRES);
+                options.add(ConsumerOption.SEES_REQUEUES);
+            }
+        }
+        else
+        {
+            throw new ConnectionScopedRuntimeException("Unknown destination type");
+        }
+        if(noLocal)
+        {
+            options.add(ConsumerOption.NO_LOCAL);
+        }
+
+        FilterManager filters = null;
+        if(messageFilter != null)
+        {
+            filters = new FilterManager();
+            filters.add(messageFilter.getName(), messageFilter);
+        }
+        _consumerOptions = options;
+        _consumerFilters = filters;
+    }
+
+    void createConsumerTarget() throws AmqpErrorException
+    {
+        final Source source = (Source) getSource();
+        _consumerTarget = new ConsumerTarget_1_0(this,
+                                         _destination instanceof ExchangeDestination ? true : source.getDistributionMode() != StdDistMode.COPY);
+        try
+        {
+            final String name;
+            if(getTarget() instanceof Target)
+            {
+                Target target = (Target) getTarget();
+                name = target.getAddress() == null ? getName() : target.getAddress();
+            }
+            else
+            {
+                name = getName();
+            }
+
+            _consumer = _destination.getMessageSource()
+                                    .addConsumer(_consumerTarget,
+                                                 _consumerFilters,
+                                                 Message_1_0.class,
+                                                 name,
+                                                 _consumerOptions,
+                                                 getPriority());
+            _consumerTarget.updateNotifyWorkDesired();
+        }
+        catch (MessageSource.ExistingExclusiveConsumer e)
+        {
+            String msg = "Cannot add a consumer to the destination as there is already an exclusive consumer";
+            throw new AmqpErrorException(new Error(AmqpError.RESOURCE_LOCKED, msg), e);
+        }
+        catch (MessageSource.ExistingConsumerPreventsExclusive e)
+        {
+            String msg = "Cannot add an exclusive consumer to the destination as there is already a consumer";
+            throw new AmqpErrorException(new Error(AmqpError.RESOURCE_LOCKED, msg), e);
+        }
+        catch (MessageSource.ConsumerAccessRefused e)
+        {
+            String msg = "Cannot add an exclusive consumer to the destination as there is an incompatible exclusivity policy";
+            throw new AmqpErrorException(new Error(AmqpError.RESOURCE_LOCKED, msg), e);
+        }
+        catch (MessageSource.QueueDeleted e)
+        {
+            String msg = "Cannot add a consumer to the destination as the destination has been deleted";
+            throw new AmqpErrorException(new Error(AmqpError.RESOURCE_DELETED, msg), e);
+        }
+    }
+
+
+    @Override
     protected Map<Symbol, Object> initProperties(final Attach attach)
     {
 
@@ -98,7 +290,8 @@ public class SendingLinkEndpoint extends
         setAvailable(UnsignedInteger.valueOf(0));
     }
 
-    @Override public Role getRole()
+    @Override
+    public Role getRole()
     {
         return Role.SENDER;
     }
@@ -108,6 +301,11 @@ public class SendingLinkEndpoint extends
         return _priority;
     }
 
+    public void setDurability(final TerminusDurability durability)
+    {
+        _durability = durability;
+    }
+
     public boolean transfer(final Transfer xfr, final boolean decrementCredit)
     {
         Session_1_0 s = getSession();
@@ -141,11 +339,20 @@ public class SendingLinkEndpoint extends
     }
 
 
-    public void drained()
+    public boolean drained()
     {
-        setDeliveryCount(getDeliveryCount().add(getLinkCredit()));
-        setLinkCredit(UnsignedInteger.ZERO);
-        sendFlow();
+        if (_draining)
+        {
+            setDeliveryCount(getDeliveryCount().add(getLinkCredit()));
+            setLinkCredit(UnsignedInteger.ZERO);
+            sendFlow();
+            _draining = false;
+            return true;
+        }
+        else
+        {
+            return false;
+        }
     }
 
     @Override
@@ -185,27 +392,142 @@ public class SendingLinkEndpoint extends
     @Override
     public void flowStateChanged()
     {
-        getLink().flowStateChanged();
+        if(Boolean.TRUE.equals(getDrain()) && getLinkCredit().compareTo(UnsignedInteger.ZERO) > 0)
+        {
+            _draining = true;
+            getConsumerTarget().flush();
+        }
+
+        while(!_resumeAcceptedTransfers.isEmpty() && hasCreditToSend())
+        {
+            Accepted accepted = new Accepted();
+            Transfer xfr = new Transfer();
+            Binary dt = _resumeAcceptedTransfers.remove(0);
+            xfr.setDeliveryTag(dt);
+            xfr.setState(accepted);
+            xfr.setResume(Boolean.TRUE);
+            transfer(xfr, true);
+            xfr.dispose();
+        }
+        if(_resumeAcceptedTransfers.isEmpty())
+        {
+            getConsumerTarget().flowStateChanged();
+        }
     }
 
-    public boolean hasCreditToSend()
+
+    @Override
+    protected void remoteDetachedPerformDetach(final Detach detach)
     {
-        UnsignedInteger linkCredit = getLinkCredit();
-        return linkCredit != null && (linkCredit.compareTo(UnsignedInteger.valueOf(0)) > 0)
-               && getSession().hasCreditToSend();
+        getConsumerTarget().close();
+        //TODO
+        // if not durable or close
+        if (Boolean.TRUE.equals(detach.getClosed())
+            || !(TerminusDurability.UNSETTLED_STATE.equals(_durability) || TerminusDurability.CONFIGURATION.equals(
+                _durability)))
+        {
+
+            Modified state = new Modified();
+            state.setDeliveryFailed(true);
+
+            for(UnsettledAction action : _unsettledActionMap.values())
+            {
+                action.process(state,Boolean.TRUE);
+            }
+            _unsettledActionMap.clear();
+
+            close();
+
+            if (getDestination() instanceof ExchangeDestination
+               && (_durability == TerminusDurability.CONFIGURATION
+                   || _durability == TerminusDurability.UNSETTLED_STATE))
+            {
+                try
+                {
+                    if (getSession().getConnection().getAddressSpace() instanceof QueueManagingVirtualHost)
+                    {
+                        ((QueueManagingVirtualHost) getSession().getConnection().getAddressSpace()).removeSubscriptionQueue(((ExchangeDestination) getDestination()).getQueue().getName());
+                    }
+                }
+                catch (AccessControlException e)
+                {
+                    LOGGER.error("Error unregistering subscription", e);
+                    detach(new Error(AmqpError.NOT_ALLOWED, "Error unregistering subscription"));
+                }
+                catch (IllegalStateException e)
+                {
+                    detach(new Error(AmqpError.RESOURCE_LOCKED, e.getMessage()));
+                }
+                catch (NotFoundException e)
+                {
+                    detach(new Error(AmqpError.NOT_FOUND, e.getMessage()));
+                }
+            }
+        }
+        else if (detach.getError() != null && !getSession().isSyntheticError(detach.getError()))
+        {
+            try
+            {
+                getLink().setLinkAttachment(null, null);
+            }
+            catch (AmqpErrorException e)
+            {
+                throw new ConnectionScopedRuntimeException(e);
+            }
+            getConsumerTarget().flowStateChanged();
+            detach();
+        }
+        else
+        {
+            detach();
+            getConsumerTarget().updateNotifyWorkDesired();
+        }
+    }
+
+    public void addUnsettled(final Binary tag, final UnsettledAction unsettledAction, final MessageInstance queueEntry)
+    {
+        _unsettledActionMap.put(tag, unsettledAction);
+        if(getTransactionId() == null)
+        {
+            _unsettledMap2.put(tag, queueEntry);
+        }
+
     }
 
-    public void receiveDeliveryState(final Delivery unsettled,
-                                               final DeliveryState state,
-                                               final Boolean settled)
+    @Override
+    protected void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
     {
-        super.receiveDeliveryState(unsettled, state, settled);
-        if(settled)
+        UnsettledAction action = _unsettledActionMap.get(deliveryTag);
+        boolean localSettle = false;
+        if(action != null)
+        {
+            localSettle = action.process(state, settled);
+            if(localSettle && !Boolean.TRUE.equals(settled))
+            {
+                updateDisposition(deliveryTag, state, true);
+            }
+        }
+        if(Boolean.TRUE.equals(settled) || localSettle)
         {
-            _unsettledMap.remove(unsettled.getDeliveryTag());
+            _unsettledActionMap.remove(deliveryTag);
+            _unsettledMap.remove(deliveryTag);
+            _unsettledMap2.remove(deliveryTag);
         }
     }
 
+    public ServerTransaction getTransaction(Binary transactionId)
+    {
+        Session_1_0 session = getSession();
+        return session == null ? null : session.getTransaction(transactionId);
+    }
+
+    public boolean hasCreditToSend()
+    {
+        UnsignedInteger linkCredit = getLinkCredit();
+        return linkCredit != null && (linkCredit.compareTo(UnsignedInteger.valueOf(0)) > 0)
+               && getSession().hasCreditToSend();
+    }
+
     public UnsignedInteger getLastDeliveryId()
     {
         return _lastDeliveryId;
@@ -219,9 +541,9 @@ public class SendingLinkEndpoint extends
     public void updateDisposition(final Binary deliveryTag, DeliveryState state, boolean settled)
     {
         UnsignedInteger deliveryId;
-        if(settled && (deliveryId = _unsettledMap.remove(deliveryTag))!=null)
+        if (settled && (deliveryId = _unsettledMap.remove(deliveryTag)) != null)
         {
-            settle(deliveryTag);
+            _unsettledMap2.remove(deliveryTag);
             getSession().updateDisposition(getRole(), deliveryId, deliveryId, state, settled);
         }
     }
@@ -231,4 +553,119 @@ public class SendingLinkEndpoint extends
         return _transactionId;
     }
 
+    public void doLinkAttachment(final Session_1_0 session, final MessageInstanceConsumer consumer) throws AmqpErrorException
+    {
+        if (session != null)
+        {
+            createConsumerTarget();
+
+            setSession(session);
+            _resumeAcceptedTransfers.clear();
+            _resumeFullTransfers.clear();
+            final NamedAddressSpace addressSpace = getSession().getConnection().getAddressSpace();
+            Map<Binary, MessageInstance> unsettledCopy = new HashMap<>(_unsettledMap2);
+            Map initialUnsettledMap = getInitialUnsettledMap();
+
+            for (Map.Entry<Binary, MessageInstance> entry : unsettledCopy.entrySet())
+            {
+                Binary deliveryTag = entry.getKey();
+                final MessageInstance queueEntry = entry.getValue();
+                if (initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag))
+                {
+                    queueEntry.setRedelivered();
+                    queueEntry.release(consumer);
+                    _unsettledMap2.remove(deliveryTag);
+                }
+                else if (initialUnsettledMap.get(deliveryTag) instanceof Outcome)
+                {
+                    Outcome outcome = (Outcome) initialUnsettledMap.get(deliveryTag);
+
+                    if (outcome instanceof Accepted)
+                    {
+                        AutoCommitTransaction txn = new AutoCommitTransaction(addressSpace.getMessageStore());
+                        if (consumer.acquires())
+                        {
+                            if (queueEntry.acquire() || queueEntry.isAcquired())
+                            {
+                                txn.dequeue(Collections.singleton(queueEntry),
+                                            new ServerTransaction.Action()
+                                            {
+                                                public void postCommit()
+                                                {
+                                                    queueEntry.delete();
+                                                }
+
+                                                public void onRollback()
+                                                {
+                                                }
+                                            });
+                            }
+                        }
+                    }
+                    else if (outcome instanceof Released)
+                    {
+                        AutoCommitTransaction txn = new AutoCommitTransaction(addressSpace.getMessageStore());
+                        if (consumer.acquires())
+                        {
+                            txn.dequeue(Collections.singleton(queueEntry),
+                                        new ServerTransaction.Action()
+                                        {
+                                            public void postCommit()
+                                            {
+                                                queueEntry.release(consumer);
+                                            }
+
+                                            public void onRollback()
+                                            {
+                                            }
+                                        });
+                        }
+                    }
+                    //_unsettledMap.remove(deliveryTag);
+                    initialUnsettledMap.remove(deliveryTag);
+                    _resumeAcceptedTransfers.add(deliveryTag);
+                }
+                else
+                {
+                    _resumeFullTransfers.add(queueEntry);
+                    // exists in receivers map, but not yet got an outcome ... should resend with resume = true
+                }
+                // TODO - else
+            }
+        }
+
+        getConsumerTarget().updateNotifyWorkDesired();
+    }
+
+    public Map<Binary, MessageInstance> getUnsettledOutcomeMap()
+    {
+        Map<Binary, MessageInstance> unsettled = new HashMap<>(_unsettledMap2);
+
+        for (Map.Entry<Binary, MessageInstance> entry : unsettled.entrySet())
+        {
+            entry.setValue(null);
+        }
+
+        return unsettled;
+    }
+
+    public MessageInstanceConsumer<ConsumerTarget_1_0> getConsumer()
+    {
+        return _consumer;
+    }
+
+    public ConsumerTarget_1_0 getConsumerTarget()
+    {
+        return _consumerTarget;
+    }
+
+    public SendingDestination getDestination()
+    {
+        return _destination;
+    }
+
+    public void setDestination(final SendingDestination destination)
+    {
+        _destination = destination;
+    }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1783244&r1=1783243&r2=1783244&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Thu Feb 16 16:11:27 2017
@@ -20,541 +20,28 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
-import java.security.AccessControlException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.server.filter.SelectorParsingException;
-import org.apache.qpid.server.filter.selector.ParseException;
-import org.apache.qpid.server.filter.selector.TokenMgrError;
-import org.apache.qpid.server.consumer.ConsumerOption;
-import org.apache.qpid.server.filter.FilterManager;
-import org.apache.qpid.server.filter.JMSSelectorFilter;
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.MessageInstanceConsumer;
-import org.apache.qpid.server.message.MessageSource;
-import org.apache.qpid.server.model.NamedAddressSpace;
-import org.apache.qpid.server.model.NotFoundException;
-import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
-import org.apache.qpid.server.protocol.v1_0.type.Binary;
-import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
-import org.apache.qpid.server.protocol.v1_0.type.Outcome;
-import org.apache.qpid.server.protocol.v1_0.type.Symbol;
-import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Modified;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.NoLocalFilter;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Released;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.StdDistMode;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
-import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 
 public class SendingLink_1_0 implements Link_1_0
 {
-    private static final Logger _logger = LoggerFactory.getLogger(SendingLink_1_0.class);
-    private final EnumSet<ConsumerOption> _consumerOptions;
-    private final FilterManager _consumerFilters;
-
-    private NamedAddressSpace _addressSpace;
-    private volatile SendingDestination _destination;
-
-    private MessageInstanceConsumer _consumer;
-    private ConsumerTarget_1_0 _target;
-
-    private boolean _draining;
-    private final Map<Binary, MessageInstance> _unsettledMap =
-            new HashMap<Binary, MessageInstance>();
-
-    private final ConcurrentMap<Binary, UnsettledAction> _unsettledActionMap =
-            new ConcurrentHashMap<Binary, UnsettledAction>();
-    private volatile SendingLinkAttachment _linkAttachment;
-    private TerminusDurability _durability;
-    private List<MessageInstance> _resumeFullTransfers = new ArrayList<MessageInstance>();
-    private List<Binary> _resumeAcceptedTransfers = new ArrayList<Binary>();
-    private Runnable _closeAction;
-
-
-    public SendingLink_1_0(final SendingLinkAttachment linkAttachment,
-                           final NamedAddressSpace addressSpace,
-                           final SendingDestination destination)
-            throws AmqpErrorException
-    {
-        _addressSpace = addressSpace;
-        _destination = destination;
-        _linkAttachment = linkAttachment;
-        final Source source = (Source) linkAttachment.getSource();
-        _durability = source.getDurable();
-
-        EnumSet<ConsumerOption> options = EnumSet.noneOf(ConsumerOption.class);
-
-        boolean noLocal = false;
-        JMSSelectorFilter messageFilter = null;
-
-        if(destination instanceof ExchangeDestination)
-        {
-            options.add(ConsumerOption.ACQUIRES);
-            options.add(ConsumerOption.SEES_REQUEUES);
-        }
-        else if(destination instanceof MessageSourceDestination)
-        {
-            MessageSource messageSource = _destination.getMessageSource();
-
-            if(messageSource instanceof Queue && ((Queue<?>)messageSource).getAvailableAttributes().contains("topic"))
-            {
-                source.setDistributionMode(StdDistMode.COPY);
-            }
-
-            Map<Symbol,Filter> filters = source.getFilter();
-
-            Map<Symbol,Filter> actualFilters = new HashMap<Symbol,Filter>();
-
-            if(filters != null)
-            {
-                for(Map.Entry<Symbol,Filter> entry : filters.entrySet())
-                {
-                    if(entry.getValue() instanceof NoLocalFilter)
-                    {
-                        actualFilters.put(entry.getKey(), entry.getValue());
-                        noLocal = true;
-                    }
-                    else if(messageFilter == null && entry.getValue() instanceof org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter)
-                    {
-
-                        org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter selectorFilter = (org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter) entry.getValue();
-                        try
-                        {
-                            messageFilter = new JMSSelectorFilter(selectorFilter.getValue());
-
-                            actualFilters.put(entry.getKey(), entry.getValue());
-                        }
-                        catch (ParseException | SelectorParsingException | TokenMgrError e)
-                        {
-                            Error error = new Error();
-                            error.setCondition(AmqpError.INVALID_FIELD);
-                            error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue());
-                            error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter")));
-                            throw new AmqpErrorException(error);
-                        }
-
-
-                    }
-                }
-            }
-            source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
-
-            if(source.getDistributionMode() != StdDistMode.COPY)
-            {
-                options.add(ConsumerOption.ACQUIRES);
-                options.add(ConsumerOption.SEES_REQUEUES);
-            }
-        }
-        else
-        {
-            throw new ConnectionScopedRuntimeException("Unknown destination type");
-        }
-        if(noLocal)
-        {
-            options.add(ConsumerOption.NO_LOCAL);
-        }
-
-        FilterManager filters = null;
-        if(messageFilter != null)
-        {
-            filters = new FilterManager();
-            filters.add(messageFilter.getName(), messageFilter);
-        }
-        _consumerOptions = options;
-        _consumerFilters = filters;
-    }
-
-    void createConsumerTarget() throws AmqpErrorException
-    {
-        final Source source = (Source) getEndpoint().getSource();
-        _target = new ConsumerTarget_1_0(this, _destination instanceof ExchangeDestination ? true : source.getDistributionMode() != StdDistMode.COPY);
-        try
-        {
-            final String name;
-            if(getEndpoint().getTarget() instanceof Target)
-            {
-                Target target = (Target) getEndpoint().getTarget();
-                name = target.getAddress() == null ? getEndpoint().getName() : target.getAddress();
-            }
-            else
-            {
-                name = getEndpoint().getName();
-            }
-
-            _consumer = _destination.getMessageSource()
-                                    .addConsumer(_target,
-                                                 _consumerFilters,
-                                                 Message_1_0.class,
-                                                 name,
-                                                 _consumerOptions,
-                                                 getEndpoint().getPriority());
-            _target.updateNotifyWorkDesired();
-        }
-        catch (MessageSource.ExistingExclusiveConsumer e)
-        {
-            String msg = "Cannot add a consumer to the destination as there is already an exclusive consumer";
-            throw new AmqpErrorException(new Error(AmqpError.RESOURCE_LOCKED, msg), e);
-        }
-        catch (MessageSource.ExistingConsumerPreventsExclusive e)
-        {
-            String msg = "Cannot add an exclusive consumer to the destination as there is already a consumer";
-            throw new AmqpErrorException(new Error(AmqpError.RESOURCE_LOCKED, msg), e);
-        }
-        catch (MessageSource.ConsumerAccessRefused e)
-        {
-            String msg = "Cannot add an exclusive consumer to the destination as there is an incompatible exclusivity policy";
-            throw new AmqpErrorException(new Error(AmqpError.RESOURCE_LOCKED, msg), e);
-        }
-        catch (MessageSource.QueueDeleted e)
-        {
-            String msg = "Cannot add a consumer to the destination as the destination has been deleted";
-            throw new AmqpErrorException(new Error(AmqpError.RESOURCE_DELETED, msg), e);
-        }
-    }
-
-    public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
-    {
-        _target.close();
-        //TODO
-        // if not durable or close
-        if (Boolean.TRUE.equals(detach.getClosed())
-            || !(TerminusDurability.UNSETTLED_STATE.equals(_durability) || TerminusDurability.CONFIGURATION.equals( _durability)))
-        {
-
-            Modified state = new Modified();
-            state.setDeliveryFailed(true);
 
-            for(UnsettledAction action : _unsettledActionMap.values())
-            {
-                action.process(state,Boolean.TRUE);
-            }
-            _unsettledActionMap.clear();
+    private volatile SendingLinkEndpoint _linkEndpoint;
 
-            endpoint.close();
 
-            if(_destination instanceof ExchangeDestination
-               && (_durability == TerminusDurability.CONFIGURATION
-                    || _durability == TerminusDurability.UNSETTLED_STATE))
-            {
-                try
-                {
-                    if (getAddressSpace() instanceof QueueManagingVirtualHost)
-                    {
-                        ((QueueManagingVirtualHost) getAddressSpace()).removeSubscriptionQueue(((ExchangeDestination) _destination).getQueue().getName());
-                    }
-                }
-                catch (AccessControlException e)
-                {
-                    _logger.error("Error unregistering subscription", e);
-                    endpoint.detach(new Error(AmqpError.NOT_ALLOWED, "Error unregistering subscription"));
-                }
-                catch (IllegalStateException e)
-                {
-                    endpoint.detach(new Error(AmqpError.RESOURCE_LOCKED, e.getMessage()));
-                }
-                catch (NotFoundException e)
-                {
-                    endpoint.detach(new Error(AmqpError.NOT_FOUND, e.getMessage()));
-                }
-            }
-
-            if(_closeAction != null)
-            {
-                _closeAction.run();
-            }
-
-        }
-        else if (detach.getError() != null && !_linkAttachment.getEndpoint()
-                                                              .getSession()
-                                                              .isSyntheticError(detach.getError()))
-        {
-            _linkAttachment = null;
-            _target.flowStateChanged();
-        }
-        else
-        {
-            endpoint.detach();
-            _target.updateNotifyWorkDesired();
-        }
-    }
-
-    public void start()
+    public SendingLink_1_0(final SendingLinkEndpoint linkEndpoint)
     {
-        //TODO
+        _linkEndpoint = linkEndpoint;
     }
 
     public SendingLinkEndpoint getEndpoint()
     {
-        return _linkAttachment == null ? null : _linkAttachment.getEndpoint() ;
-    }
-
-    public Session_1_0 getSession()
-    {
-        return _linkAttachment == null ? null : _linkAttachment.getSession();
-    }
-
-    public void flowStateChanged()
-    {
-        if(Boolean.TRUE.equals(getEndpoint().getDrain())
-                && hasCredit())
-        {
-            _draining = true;
-            _target.flush();
-        }
-
-        while(!_resumeAcceptedTransfers.isEmpty() && getEndpoint().hasCreditToSend())
-        {
-            Accepted accepted = new Accepted();
-            Transfer xfr = new Transfer();
-            Binary dt = _resumeAcceptedTransfers.remove(0);
-            xfr.setDeliveryTag(dt);
-            xfr.setState(accepted);
-            xfr.setResume(Boolean.TRUE);
-            getEndpoint().transfer(xfr, true);
-            xfr.dispose();
-        }
-        if(_resumeAcceptedTransfers.isEmpty())
-        {
-            _target.flowStateChanged();
-        }
-
-    }
-
-    boolean hasCredit()
-    {
-        return getEndpoint().getLinkCredit().compareTo(UnsignedInteger.ZERO) > 0;
-    }
-
-    public boolean isDraining()
-    {
-        return false;  //TODO
-    }
-
-    public boolean drained()
-    {
-        if(getEndpoint() != null)
-        {
-            if (_draining)
-            {
-                //TODO
-                getEndpoint().drained();
-                _draining = false;
-                return true;
-            }
-            else
-            {
-                return false;
-            }
-
-        }
-        else
-        {
-            return false;
-        }
-    }
-
-    public void addUnsettled(Binary tag, UnsettledAction unsettledAction, MessageInstance queueEntry)
-    {
-        _unsettledActionMap.put(tag,unsettledAction);
-        if(getTransactionId() == null)
-        {
-            _unsettledMap.put(tag, queueEntry);
-        }
-    }
-
-    public void removeUnsettled(Binary tag)
-    {
-        _unsettledActionMap.remove(tag);
-    }
-
-    public void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
-    {
-        UnsettledAction action = _unsettledActionMap.get(deliveryTag);
-        boolean localSettle = false;
-        if(action != null)
-        {
-            localSettle = action.process(state, settled);
-            if(localSettle && !Boolean.TRUE.equals(settled))
-            {
-                _linkAttachment.updateDisposition(deliveryTag, state, true);
-            }
-        }
-        if(Boolean.TRUE.equals(settled) || localSettle)
-        {
-            _unsettledActionMap.remove(deliveryTag);
-            _unsettledMap.remove(deliveryTag);
-        }
-    }
-
-    ServerTransaction getTransaction(Binary transactionId)
-    {
-        Session_1_0 session = _linkAttachment.getSession();
-        return session == null ? null : session.getTransaction(transactionId);
-    }
-
-    public Binary getTransactionId()
-    {
-        SendingLinkEndpoint endpoint = getEndpoint();
-        return endpoint == null ? null : endpoint.getTransactionId();
-    }
-
-    public boolean isDetached()
-    {
-        return _linkAttachment == null || getEndpoint().isDetached();
-    }
-
-    public boolean isAttached()
-    {
-        return _linkAttachment != null && getEndpoint().isAttached();
-    }
-
-    public synchronized void setLinkAttachment(SendingLinkAttachment linkAttachment) throws AmqpErrorException
-    {
-        _linkAttachment = linkAttachment;
-
-        if (linkAttachment.getSession() != null)
-        {
-            SendingLinkEndpoint endpoint = linkAttachment.getEndpoint();
-            Map initialUnsettledMap = endpoint.getInitialUnsettledMap();
-            Map<Binary, MessageInstance> unsettledCopy = new HashMap<Binary, MessageInstance>(_unsettledMap);
-            _resumeAcceptedTransfers.clear();
-            _resumeFullTransfers.clear();
-
-            createConsumerTarget();
-
-            for (Map.Entry<Binary, MessageInstance> entry : unsettledCopy.entrySet())
-            {
-                Binary deliveryTag = entry.getKey();
-                final MessageInstance queueEntry = entry.getValue();
-                if (initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag))
-                {
-                    queueEntry.setRedelivered();
-                    queueEntry.release(_consumer);
-                    _unsettledMap.remove(deliveryTag);
-                }
-                else if (initialUnsettledMap.get(deliveryTag) instanceof Outcome)
-                {
-                    Outcome outcome = (Outcome) initialUnsettledMap.get(deliveryTag);
-
-                    if (outcome instanceof Accepted)
-                    {
-                        AutoCommitTransaction txn = new AutoCommitTransaction(_addressSpace.getMessageStore());
-                        if (_consumer.acquires())
-                        {
-                            if (queueEntry.acquire() || queueEntry.isAcquired())
-                            {
-                                txn.dequeue(Collections.singleton(queueEntry),
-                                            new ServerTransaction.Action()
-                                            {
-                                                public void postCommit()
-                                                {
-                                                    queueEntry.delete();
-                                                }
-
-                                                public void onRollback()
-                                                {
-                                                }
-                                            });
-                            }
-                        }
-                    }
-                    else if (outcome instanceof Released)
-                    {
-                        AutoCommitTransaction txn = new AutoCommitTransaction(_addressSpace.getMessageStore());
-                        if (_consumer.acquires())
-                        {
-                            txn.dequeue(Collections.singleton(queueEntry),
-                                        new ServerTransaction.Action()
-                                        {
-                                            public void postCommit()
-                                            {
-                                                queueEntry.release(_consumer);
-                                            }
-
-                                            public void onRollback()
-                                            {
-                                            }
-                                        });
-                        }
-                    }
-                    //_unsettledMap.remove(deliveryTag);
-                    initialUnsettledMap.remove(deliveryTag);
-                    _resumeAcceptedTransfers.add(deliveryTag);
-                }
-                else
-                {
-                    _resumeFullTransfers.add(queueEntry);
-                    // exists in receivers map, but not yet got an outcome ... should resend with resume = true
-                }
-                // TODO - else
-            }
-        }
-
-        _target.updateNotifyWorkDesired();
-    }
-
-    public Map getUnsettledOutcomeMap()
-    {
-        Map<Binary, MessageInstance> unsettled = new HashMap<Binary, MessageInstance>(_unsettledMap);
-
-        for(Map.Entry<Binary, MessageInstance> entry : unsettled.entrySet())
-        {
-            entry.setValue(null);
-        }
-
-        return unsettled;
-    }
-
-    public void setCloseAction(Runnable action)
-    {
-        _closeAction = action;
-    }
-
-    public NamedAddressSpace getAddressSpace()
-    {
-        return _addressSpace;
-    }
-
-    public MessageInstanceConsumer getConsumer()
-    {
-        return _consumer;
-    }
-
-    public ConsumerTarget_1_0 getConsumerTarget()
-    {
-        return _target;
-    }
-
-    public SendingDestination getDestination()
-    {
-        return _destination;
+        return _linkEndpoint;
     }
 
-    public void setDestination(final SendingDestination destination)
+    public synchronized void setLinkAttachment(final Session_1_0 session,
+                                               final SendingLinkEndpoint linkEndpoint) throws AmqpErrorException
     {
-        _destination = destination;
+        _linkEndpoint = linkEndpoint;
+        _linkEndpoint.doLinkAttachment(session, getEndpoint().getConsumer());
     }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1783244&r1=1783243&r2=1783244&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Thu Feb 16 16:11:27 2017
@@ -228,9 +228,26 @@ public class Session_1_0 extends Abstrac
                 LinkEndpoint endpoint = linkMap.get(attach.getName());
                 if(endpoint == null)
                 {
-                    endpoint = attach.getRole() == Role.RECEIVER
-                               ? new SendingLinkEndpoint(this, attach)
-                               : new ReceivingLinkEndpoint(this, attach);
+
+                    if (attach.getRole() == Role.RECEIVER)
+                    {
+                        endpoint = new SendingLinkEndpoint(this, attach);
+                    }
+                    else if (attach.getRole() == Role.SENDER)
+                    {
+                        if (attach.getTarget() instanceof Coordinator)
+                        {
+                            endpoint = new TxnCoordinatorReceivingLinkEndpoint(this, attach);
+                        }
+                        else
+                        {
+                            endpoint = new StandardReceivingLinkEndpoint(this, attach);
+                        }
+                    }
+                    else
+                    {
+                        // TODO error handling
+                    }
 
                     if(_blockingEntities.contains(this) && attach.getRole() == Role.SENDER)
                     {
@@ -770,7 +787,7 @@ public class Session_1_0 extends Abstrac
         }
         else
         {
-            link.start();
+            endpoint.start();
         }
     }
 
@@ -873,17 +890,15 @@ public class Session_1_0 extends Abstrac
             if (destination != null)
             {
                 final ReceivingDestination receivingDestination = (ReceivingDestination) destination;
+
                 MessageDestination messageDestination = receivingDestination.getMessageDestination();
                 if(!(messageDestination instanceof Queue) || ((Queue<?>)messageDestination).isHoldOnPublishEnabled())
                 {
                     capabilities.add(DELAYED_DELIVERY);
                 }
                 final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
-                final StandardReceivingLink_1_0 receivingLink =
-                        new StandardReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint),
-                                                      getAddressSpace(),
-                                                      receivingDestination);
-
+                final StandardReceivingLink_1_0 receivingLink = new StandardReceivingLink_1_0(receivingLinkEndpoint);
+                receivingLinkEndpoint.setDestination(receivingDestination);
                 receivingLinkEndpoint.setLink(receivingLink);
                 link = receivingLink;
                 if (TerminusDurability.UNSETTLED_STATE.equals(target.getDurable())
@@ -895,11 +910,11 @@ public class Session_1_0 extends Abstrac
         }
         else
         {
-            ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
-            previousLink.setLinkAttachment(new ReceivingLinkAttachment(this, receivingLinkEndpoint));
+            StandardReceivingLinkEndpoint receivingLinkEndpoint = (StandardReceivingLinkEndpoint) endpoint;
+            previousLink.setLinkAttachment(receivingLinkEndpoint);
             receivingLinkEndpoint.setLink(previousLink);
             link = previousLink;
-            endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
+            receivingLinkEndpoint.setLocalUnsettled(receivingLinkEndpoint.getUnsettledOutcomeMap());
         }
         return link;
     }
@@ -934,9 +949,8 @@ public class Session_1_0 extends Abstrac
 
         final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
         final TxnCoordinatorReceivingLink_1_0 coordinatorLink =
-                new TxnCoordinatorReceivingLink_1_0(getAddressSpace(),
-                                                    this,
-                                                    receivingLinkEndpoint
+                new TxnCoordinatorReceivingLink_1_0(
+                        receivingLinkEndpoint
                 );
         receivingLinkEndpoint.setLink(coordinatorLink);
         return coordinatorLink;
@@ -1056,13 +1070,16 @@ public class Session_1_0 extends Abstrac
             if (destination != null)
             {
                 final SendingLink_1_0 sendingLink =
-                        new SendingLink_1_0(new SendingLinkAttachment(this, sendingLinkEndpoint),
-                                            getAddressSpace(),
-                                            destination);
-                sendingLink.createConsumerTarget();
+                        new SendingLink_1_0(
+                                sendingLinkEndpoint);
+                //sendingLink.createConsumerTarget();
+
+                sendingLinkEndpoint.doStuff(destination);
+                sendingLinkEndpoint.createConsumerTarget();
 
                 sendingLinkEndpoint.setLink(sendingLink);
-                registerConsumer(sendingLink);
+                sendingLinkEndpoint.setDurability(((Source) attach.getSource()).getDurable());
+                registerConsumer(sendingLinkEndpoint);
 
                 if (destination instanceof ExchangeDestination)
                 {
@@ -1084,31 +1101,42 @@ public class Session_1_0 extends Abstrac
             Source newSource = (Source) attach.getSource();
             Source oldSource = (Source) previousLink.getEndpoint().getSource();
 
-            if (previousLink.getDestination() instanceof ExchangeDestination && newSource != null && !Boolean.TRUE.equals(newSource.getDynamic()))
+            if (sendingLinkEndpoint.getDestination() == null)
             {
-                final SendingDestination newDestination = getSendingDestination(previousLink.getEndpoint().getName(), newSource);
-                if (updateSourceForSubscription(previousLink, newSource, newDestination))
+                final SendingDestination oldDestination =
+                        getSendingDestination(previousLink.getEndpoint().getName(), oldSource);
+                sendingLinkEndpoint.doStuff(oldDestination);
+                sendingLinkEndpoint.setDurability(oldSource.getDurable());
+
+            }
+
+            if (sendingLinkEndpoint.getDestination() instanceof ExchangeDestination
+                && newSource != null
+                && !Boolean.TRUE.equals(newSource.getDynamic()))
+            {
+                final SendingDestination newDestination =
+                        getSendingDestination(previousLink.getEndpoint().getName(), newSource);
+                if (updateSourceForSubscription(sendingLinkEndpoint, newSource, newDestination))
                 {
-                    previousLink.setDestination(newDestination);
+                    sendingLinkEndpoint.setDestination(newDestination);
                 }
             }
 
             sendingLinkEndpoint.setSource(oldSource);
-            previousLink.setLinkAttachment(new SendingLinkAttachment(this, sendingLinkEndpoint));
+            previousLink.setLinkAttachment(this, sendingLinkEndpoint);
             sendingLinkEndpoint.setLink(previousLink);
             link = previousLink;
-            sendingLinkEndpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
-            registerConsumer(previousLink);
+            sendingLinkEndpoint.setLocalUnsettled(sendingLinkEndpoint.getUnsettledOutcomeMap());
+            registerConsumer(sendingLinkEndpoint);
 
         }
         return link;
     }
 
-    private boolean updateSourceForSubscription(final SendingLink_1_0 previousLink,
-                                                final Source newSource,
+    private boolean updateSourceForSubscription(final SendingLinkEndpoint linkEndpoint, final Source newSource,
                                                 final SendingDestination newDestination)
     {
-        SendingDestination oldDestination = previousLink.getDestination();
+        SendingDestination oldDestination = linkEndpoint.getDestination();
         if (oldDestination instanceof ExchangeDestination)
         {
             ExchangeDestination oldExchangeDestination = (ExchangeDestination) oldDestination;
@@ -1118,7 +1146,7 @@ public class Session_1_0 extends Abstrac
                 ExchangeDestination newExchangeDestination = (ExchangeDestination) newDestination;
                 if (oldExchangeDestination.getQueue() != newExchangeDestination.getQueue())
                 {
-                    Source oldSource = (Source) previousLink.getEndpoint().getSource();
+                    Source oldSource = (Source) linkEndpoint.getSource();
                     oldSource.setAddress(newAddress);
                     oldSource.setFilter(newSource.getFilter());
                     return true;
@@ -1354,9 +1382,9 @@ public class Session_1_0 extends Abstrac
     }
 
 
-    private void registerConsumer(final SendingLink_1_0 link)
+    private void registerConsumer(final SendingLinkEndpoint linkEndpoint)
     {
-        MessageInstanceConsumer consumer = link.getConsumer();
+        MessageInstanceConsumer consumer = linkEndpoint.getConsumer();
         if(consumer instanceof Consumer<?,?>)
         {
             Consumer<?,ConsumerTarget_1_0> modelConsumer = (Consumer<?,ConsumerTarget_1_0>) consumer;
@@ -1563,8 +1591,7 @@ public class Session_1_0 extends Abstrac
     {
         for(SendingLinkEndpoint endpoint : _sendingLinkMap.values())
         {
-            Link_1_0 link = endpoint.getLink();
-            ConsumerTarget_1_0 target = ((SendingLink_1_0)link).getConsumerTarget();
+            ConsumerTarget_1_0 target = endpoint.getConsumerTarget();
             target.flowStateChanged();
         }
 
@@ -1597,9 +1624,7 @@ public class Session_1_0 extends Abstrac
 
             for (ReceivingLinkEndpoint endpoint : _receivingLinkMap.values())
             {
-                StandardReceivingLink_1_0 link = (StandardReceivingLink_1_0) endpoint.getLink();
-
-                if (isQueueDestinationForLink(queue, link.getDestination()))
+                if (isQueueDestinationForLink(queue, endpoint.getReceivingDestination()))
                 {
                     endpoint.setStopped(true);
                 }
@@ -1639,8 +1664,7 @@ public class Session_1_0 extends Abstrac
             }
             for (ReceivingLinkEndpoint endpoint : _receivingLinkMap.values())
             {
-                StandardReceivingLink_1_0 link = (StandardReceivingLink_1_0) endpoint.getLink();
-                if (isQueueDestinationForLink(queue, link.getDestination()))
+                if (isQueueDestinationForLink(queue, endpoint.getReceivingDestination()))
                 {
                     endpoint.setStopped(false);
                 }
@@ -1701,8 +1725,7 @@ public class Session_1_0 extends Abstrac
             }
             for(ReceivingLinkEndpoint endpoint : _receivingLinkMap.values())
             {
-                StandardReceivingLink_1_0 link = (StandardReceivingLink_1_0) endpoint.getLink();
-                if(!_blockingEntities.contains(link.getDestination()))
+                if(!_blockingEntities.contains(endpoint.getReceivingDestination()))
                 {
                     endpoint.setStopped(false);
                 }
@@ -1925,7 +1948,7 @@ public class Session_1_0 extends Abstrac
                     {
                         try
                         {
-                            link.setLinkAttachment(new SendingLinkAttachment(null, (SendingLinkEndpoint) linkEndpoint));
+                            link.setLinkAttachment(null, (SendingLinkEndpoint) linkEndpoint);
                         }
                         catch (AmqpErrorException e)
                         {
@@ -1947,7 +1970,7 @@ public class Session_1_0 extends Abstrac
                 {
                     if (link.getEndpoint() == linkEndpoint)
                     {
-                        link.setLinkAttachment(new ReceivingLinkAttachment(null, (ReceivingLinkEndpoint) linkEndpoint));
+                        link.setLinkAttachment((ReceivingLinkEndpoint) linkEndpoint);
                     }
                 }
             }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org