You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/03/08 14:44:42 UTC

svn commit: r1785976 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid/server/virtualhost/ broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ broke...

Author: lquack
Date: Wed Mar  8 14:44:41 2017
New Revision: 1785976

URL: http://svn.apache.org/viewvc?rev=1785976&view=rev
Log:
QPID-7658: [Java Broker] Improve LinkRegistry. Address issues with previous commit.

 * Receiving and Coordinating Links are treated the same as far as link uniqueness is concerned
 * Only have one Link class. move most attach logic into specific linkEndpoints

Added:
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java
Removed:
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLink.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java
    qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java?rev=1785976&r1=1785975&r2=1785976&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java Wed Mar  8 14:44:41 2017
@@ -55,7 +55,8 @@ public interface NamedAddressSpace exten
 
     MessageDestination getDefaultDestination();
 
-    <T extends LinkModel> T getLink(String remoteContainerId, String linkName, Class<T> type);
+    <T extends LinkModel> T getSendingLink(String remoteContainerId, String linkName);
+    <T extends LinkModel> T getReceivingLink(String remoteContainerId, String linkName);
 
     boolean authoriseCreateConnection(AMQPConnection<?> connection);
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java?rev=1785976&r1=1785975&r2=1785976&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java Wed Mar  8 14:44:41 2017
@@ -139,9 +139,14 @@ public abstract class AbstractNonConnect
     }
 
     @Override
-    public <T extends LinkModel> T getLink(final String remoteContainerId,
-                                           final String linkName,
-                                           final Class<T> type)
+    public <T extends LinkModel> T getSendingLink(final String remoteContainerId, final String linkName)
+    {
+        throwUnsupported();
+        return null;
+    }
+
+    @Override
+    public <T extends LinkModel> T getReceivingLink(final String remoteContainerId, final String linkName)
     {
         throwUnsupported();
         return null;

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1785976&r1=1785975&r2=1785976&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Wed Mar  8 14:44:41 2017
@@ -73,13 +73,12 @@ import com.google.common.util.concurrent
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.server.exchange.ExchangeDefaults;
-import org.apache.qpid.server.pool.SuppressingInheritedAccessControlContextThreadFactory;
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.configuration.updater.Task;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
 import org.apache.qpid.server.exchange.DefaultDestination;
+import org.apache.qpid.server.exchange.ExchangeDefaults;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.messages.MessageStoreMessages;
 import org.apache.qpid.server.logging.messages.VirtualHostMessages;
@@ -100,6 +99,7 @@ import org.apache.qpid.server.model.pref
 import org.apache.qpid.server.plugin.ConnectionValidator;
 import org.apache.qpid.server.plugin.QpidServiceLoader;
 import org.apache.qpid.server.plugin.SystemNodeCreator;
+import org.apache.qpid.server.pool.SuppressingInheritedAccessControlContextThreadFactory;
 import org.apache.qpid.server.protocol.LinkModel;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.security.AccessControl;
@@ -1609,9 +1609,15 @@ public abstract class AbstractVirtualHos
     }
 
     @Override
-    public <T extends LinkModel> T getLink(String remoteContainerId, String linkName, Class<T> type)
+    public <T extends LinkModel> T getSendingLink(String remoteContainerId, String linkName)
+    {
+        return _linkRegistry.getSendingLink(remoteContainerId, linkName);
+    }
+
+    @Override
+    public <T extends LinkModel> T getReceivingLink(String remoteContainerId, String linkName)
     {
-        return _linkRegistry.getLink(remoteContainerId, linkName, type);
+        return _linkRegistry.getReceivingLink(remoteContainerId, linkName);
     }
 
     public DtxRegistry getDtxRegistry()

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java?rev=1785976&r1=1785975&r2=1785976&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java Wed Mar  8 14:44:41 2017
@@ -24,5 +24,6 @@ import org.apache.qpid.server.protocol.L
 
 public interface LinkRegistry
 {
-    <T extends LinkModel> T getLink(String remoteContainerId, String linkName, Class<T> type);
+    <T extends LinkModel> T getSendingLink(String remoteContainerId, String linkName);
+    <T extends LinkModel> T getReceivingLink(String remoteContainerId, String linkName);
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java?rev=1785976&r1=1785975&r2=1785976&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java Wed Mar  8 14:44:41 2017
@@ -47,16 +47,15 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 
-public abstract class LinkEndpoint<T extends Link_1_0>
+public abstract class LinkEndpoint
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(LinkEndpoint.class);
-    private final T _link;
-    private Session_1_0 _session;
+    private final Link_1_0 _link;
+    private final Session_1_0 _session;
     private Object _flowTransactionId;
     private SenderSettleMode _sendingSettlementMode;
     private ReceiverSettleMode _receivingSettlementMode;
     private Map _initialUnsettledMap;
-    private Map _localUnsettled;
     private UnsignedInteger _lastSentCreditLimit;
     private volatile boolean _stopped;
     private volatile boolean _stoppedUpdated;
@@ -70,6 +69,7 @@ public abstract class LinkEndpoint<T ext
     private Map<Symbol, Object> _properties;
 
     protected volatile State _state = State.ATTACH_RECVD;
+    protected Map _localUnsettled;
 
     protected enum State
     {
@@ -82,8 +82,9 @@ public abstract class LinkEndpoint<T ext
     }
 
 
-    LinkEndpoint(final T link)
+    LinkEndpoint(final Session_1_0 session, final Link_1_0 link)
     {
+        _session = session;
         _link = link;
     }
 
@@ -101,6 +102,38 @@ public abstract class LinkEndpoint<T ext
 
     protected abstract Map<Symbol,Object> initProperties(final Attach attach);
 
+
+    public void receiveAttach(final Attach attach) throws AmqpErrorException
+    {
+        boolean isAttachingLocalTerminusNull = (attach.getRole() == Role.SENDER ? attach.getTarget() == null : attach.getSource() == null);
+        boolean isLocalTerminusNull = (attach.getRole() == Role.SENDER ? getTarget() == null : getSource() == null);
+
+        if (isAttachingLocalTerminusNull)
+        {
+            recoverLink(attach);
+        }
+        else if (isLocalTerminusNull)
+        {
+            establishLink(attach);
+        }
+        else if (attach.getUnsettled() != null)
+        {
+            resumeLink(attach);
+        }
+        else
+        {
+            reattachLink(attach);
+        }
+    }
+
+    protected abstract void reattachLink(final Attach attach) throws AmqpErrorException;
+
+    protected abstract void resumeLink(final Attach attach) throws AmqpErrorException;
+
+    protected abstract void establishLink(final Attach attach) throws AmqpErrorException;
+
+    protected abstract void recoverLink(final Attach attach) throws AmqpErrorException;
+
     public void attachReceived(final Attach attach) throws AmqpErrorException
     {
         _sendingSettlementMode = attach.getSndSettleMode();
@@ -240,20 +273,9 @@ public abstract class LinkEndpoint<T ext
         return _session;
     }
 
-    public void associateSession(final Session_1_0 session)
-    {
-        if (session == null)
-        {
-            throw new IllegalStateException("To dissociate session from Endpoint call LinkEndpoint#dissociateSession() "
-                                            + "instead of LinkEndpoint#associate(null)");
-        }
-        _session = session;
-    }
-
-    public void dissociateSession()
+    public void destroy()
     {
         setLocalHandle(null);
-        _session = null;
         getLink().discardEndpoint();
     }
 
@@ -329,6 +351,11 @@ public abstract class LinkEndpoint<T ext
                 _state = State.DETACHED;
                 break;
             default:
+                if (close)
+                {
+                    destroy();
+                    _link.linkClosed();
+                }
                 return;
         }
 
@@ -347,7 +374,7 @@ public abstract class LinkEndpoint<T ext
 
         if (close)
         {
-            dissociateSession();
+            destroy();
             _link.linkClosed();
         }
         setLocalHandle(null);
@@ -435,7 +462,7 @@ public abstract class LinkEndpoint<T ext
         }
     }
 
-    public T getLink()
+    public Link_1_0 getLink()
     {
         return _link;
     }
@@ -465,10 +492,7 @@ public abstract class LinkEndpoint<T ext
         return _initialUnsettledMap;
     }
 
-    public void setLocalUnsettled(Map unsettled)
-    {
-        _localUnsettled = unsettled;
-    }
+    public abstract void initialiseUnsettled();
 
     @Override public String toString()
     {

Added: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java?rev=1785976&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java Wed Mar  8 14:44:41 2017
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.protocol.v1_0;
+
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+
+public class LinkImpl implements Link_1_0
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(LinkImpl.class);
+
+    private final String _linkName;
+    private final Role _role;
+    private volatile LinkEndpoint _linkEndpoint;
+    private volatile BaseSource _source;
+    private volatile BaseTarget _target;
+
+    LinkImpl(final String linkName, final Role role)
+    {
+        _linkName = linkName;
+        _role = role;
+    }
+
+    @Override
+    public final ListenableFuture<LinkEndpoint> attach(final Session_1_0 session, final Attach attach)
+    {
+        try
+        {
+            if (_role == attach.getRole())
+            {
+                return rejectLink(session);
+            }
+
+
+            if (_linkEndpoint != null && !session.equals(_linkEndpoint.getSession()))
+            {
+                return stealLink(session, attach);
+            }
+            else
+            {
+                if (_linkEndpoint == null)
+                {
+                    _linkEndpoint = createLinkEndpoint(session, attach);
+                    if (_linkEndpoint == null)
+                    {
+                        throw new ConnectionScopedRuntimeException(String.format(
+                                "LinkEndpoint creation failed for attach: %s",
+                                attach));
+                    }
+                }
+
+                _linkEndpoint.receiveAttach(attach);
+                return Futures.immediateFuture(_linkEndpoint);
+            }
+        }
+        catch (Throwable t)
+        {
+            return rejectLink(session);
+        }
+    }
+
+    private synchronized ListenableFuture<LinkEndpoint> stealLink(final Session_1_0 session, final Attach attach)
+    {
+        final SettableFuture<LinkEndpoint> returnFuture = SettableFuture.create();
+        _linkEndpoint.getSession().doOnIOThreadAsync(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                _linkEndpoint.close(new Error(LinkError.STOLEN,
+                                              String.format("Link is being stolen by connection '%s'",
+                                                            session.getConnection())));
+                try
+                {
+                    returnFuture.set(attach(session, attach).get());
+                }
+                catch (InterruptedException e)
+                {
+                    returnFuture.setException(e);
+                    Thread.currentThread().interrupt();
+                }
+                catch (ExecutionException e)
+                {
+                    returnFuture.setException(e.getCause());
+                }
+            }
+        });
+        return returnFuture;
+    }
+
+    private LinkEndpoint createLinkEndpoint(final Session_1_0 session, final Attach attach)
+    {
+        LinkEndpoint linkEndpoint = null;
+        if (_role == Role.SENDER)
+        {
+            linkEndpoint = new SendingLinkEndpoint(session, this);
+        }
+        else if (_role == Role.RECEIVER && attach.getTarget() != null)
+        {
+
+            if (attach.getTarget() instanceof Target)
+            {
+                linkEndpoint = new StandardReceivingLinkEndpoint(session, this);
+            }
+            else if (attach.getTarget() instanceof Coordinator)
+            {
+                linkEndpoint = new TxnCoordinatorReceivingLinkEndpoint(session, this);
+            }
+        }
+        return linkEndpoint;
+    }
+
+
+    private ListenableFuture<LinkEndpoint> rejectLink(final Session_1_0 session)
+    {
+        _linkEndpoint = new SendingLinkEndpoint(session, this);
+        _source = null;
+        return Futures.immediateFuture(_linkEndpoint);
+    }
+
+    @Override
+    public void linkClosed()
+    {
+        discardEndpoint();
+    }
+
+    @Override
+    public void discardEndpoint()
+    {
+        _linkEndpoint = null;
+    }
+
+    @Override
+    public final String getName()
+    {
+        return _linkName;
+    }
+
+    @Override
+    public BaseSource getSource()
+    {
+        return _source;
+    }
+
+    @Override
+    public void setSource(BaseSource source)
+    {
+        _source = source;
+    }
+
+    @Override
+    public BaseTarget getTarget()
+    {
+        return _target;
+    }
+
+    @Override
+    public void setTarget(BaseTarget target)
+    {
+        _target = target;
+    }
+
+    @Override
+    public void setTermini(BaseSource source, BaseTarget target)
+    {
+        _source = source;
+        _target = target;
+    }
+}

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java?rev=1785976&r1=1785975&r2=1785976&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java Wed Mar  8 14:44:41 2017
@@ -24,15 +24,14 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.qpid.server.model.NamedAddressSpace;
-import org.apache.qpid.server.protocol.LinkModel;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.virtualhost.LinkRegistry;
 
 public class LinkRegistryImpl implements LinkRegistry
 {
-    private final Map<String, Map<String, SendingLink_1_0>> _sendingLinkRegistry = new HashMap<>();
-    private final Map<String, Map<String, StandardReceivingLink_1_0>> _receivingLinkRegistry = new HashMap<>();
-    private final Map<String, Map<String, TxnCoordinatorReceivingLink_1_0>> _coordinatorLinkRegistry = new HashMap<>();
+    private final Map<String, Map<String, Link_1_0>> _sendingLinkRegistry = new HashMap<>();
+    private final Map<String, Map<String, Link_1_0>> _receivingLinkRegistry = new HashMap<>();
+
     private final NamedAddressSpace _addressSpace;
 
     LinkRegistryImpl(final NamedAddressSpace addressSpace)
@@ -40,73 +39,31 @@ public class LinkRegistryImpl implements
         _addressSpace = addressSpace;
     }
 
-    @Override
-    public synchronized <T extends LinkModel> T getLink(final String remoteContainerId, final String linkName, final Class<T> type)
+    public Link_1_0 getSendingLink(final String remoteContainerId, final String linkName)
     {
-        if (SendingLink_1_0.class.equals(type))
-        {
-            return (T) getSendingLink(remoteContainerId, linkName);
-        }
-        else if (StandardReceivingLink_1_0.class.equals(type))
-        {
-            return (T) getReceivingLink(remoteContainerId, linkName);
-        }
-        else if (TxnCoordinatorReceivingLink_1_0.class.equals(type))
-        {
-            return (T) getCoordinatorLink(remoteContainerId, linkName);
-        }
-        else
-        {
-            throw new ConnectionScopedRuntimeException(String.format("Unsupported link type: '%s'", type.getSimpleName()));
-        }
-    }
-
-    private TxnCoordinatorReceivingLink_1_0 getCoordinatorLink(final String remoteContainerId, final String linkName)
-    {
-        Map<String, TxnCoordinatorReceivingLink_1_0> containerRegistry = _coordinatorLinkRegistry.get(remoteContainerId);
-        if (containerRegistry == null)
-        {
-            containerRegistry = new HashMap<>();
-            _coordinatorLinkRegistry.put(remoteContainerId, containerRegistry);
-        }
-        TxnCoordinatorReceivingLink_1_0 link = containerRegistry.get(linkName);
-        if (link == null)
-        {
-            link = new TxnCoordinatorReceivingLink_1_0(linkName);
-            containerRegistry.put(linkName, link);
-        }
-        return link;
+        return getLinkFromRegistry(remoteContainerId, linkName, _sendingLinkRegistry, Role.SENDER);
     }
 
-    private SendingLink_1_0 getSendingLink(final String remoteContainerId, final String linkName)
+    public Link_1_0 getReceivingLink(final String remoteContainerId, final String linkName)
     {
-        Map<String, SendingLink_1_0> containerRegistry = _sendingLinkRegistry.get(remoteContainerId);
-        if (containerRegistry == null)
-        {
-            containerRegistry = new HashMap<>();
-            _sendingLinkRegistry.put(remoteContainerId, containerRegistry);
-        }
-        SendingLink_1_0 link = containerRegistry.get(linkName);
-        if (link == null)
-        {
-            link = new SendingLink_1_0(linkName);
-            containerRegistry.put(linkName, link);
-        }
-        return link;
+        return getLinkFromRegistry(remoteContainerId, linkName, _receivingLinkRegistry, Role.RECEIVER);
     }
 
-    private StandardReceivingLink_1_0 getReceivingLink(final String remoteContainerId, final String linkName)
+    private Link_1_0 getLinkFromRegistry(final String remoteContainerId,
+                                         final String linkName,
+                                         final Map<String, Map<String, Link_1_0>> linkRegistry,
+                                         final Role role)
     {
-        Map<String, StandardReceivingLink_1_0> containerRegistry = _receivingLinkRegistry.get(remoteContainerId);
+        Map<String, Link_1_0> containerRegistry = linkRegistry.get(remoteContainerId);
         if (containerRegistry == null)
         {
             containerRegistry = new HashMap<>();
-            _receivingLinkRegistry.put(remoteContainerId, containerRegistry);
+            linkRegistry.put(remoteContainerId, containerRegistry);
         }
-        StandardReceivingLink_1_0 link = containerRegistry.get(linkName);
+        Link_1_0 link = containerRegistry.get(linkName);
         if (link == null)
         {
-            link = new StandardReceivingLink_1_0(linkName);
+            link = new LinkImpl(linkName, role);
             containerRegistry.put(linkName, link);
         }
         return link;

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java?rev=1785976&r1=1785975&r2=1785976&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java Wed Mar  8 14:44:41 2017
@@ -29,7 +29,7 @@ import org.apache.qpid.server.protocol.v
 
 public interface Link_1_0 extends LinkModel
 {
-    ListenableFuture<? extends LinkEndpoint<?>> attach(Session_1_0 session, final Attach attach);
+    ListenableFuture<LinkEndpoint> attach(Session_1_0 session, final Attach attach);
 
     void linkClosed();
 
@@ -40,4 +40,10 @@ public interface Link_1_0 extends LinkMo
     BaseSource getSource();
 
     BaseTarget getTarget();
+
+    void setSource(BaseSource source);
+
+    void setTarget(BaseTarget target);
+
+    void setTermini(BaseSource source, BaseTarget target);
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java?rev=1785976&r1=1785975&r2=1785976&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java Wed Mar  8 14:44:41 2017
@@ -26,6 +26,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
@@ -38,13 +39,11 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 
-public abstract class ReceivingLinkEndpoint extends LinkEndpoint<ReceivingLink_1_0>
+public abstract class ReceivingLinkEndpoint extends LinkEndpoint
 {
-
-
+    private final SectionDecoder _sectionDecoder;
     private UnsignedInteger _lastDeliveryId;
     private ReceivingDestination _receivingDestination;
-    private final SectionDecoder _sectionDecoder;
 
     private static class TransientState
     {
@@ -91,10 +90,12 @@ public abstract class ReceivingLinkEndpo
     private UnsignedInteger _drainLimit;
 
 
-    public ReceivingLinkEndpoint(final ReceivingLink_1_0 link, final SectionDecoder sectionDecoder)
+    public ReceivingLinkEndpoint(final Session_1_0 session, final Link_1_0 link)
     {
-        super(link);
-        _sectionDecoder = sectionDecoder;
+        super(session, link);
+        _sectionDecoder = new SectionDecoderImpl(session.getConnection()
+                                                        .getDescribedTypeRegistry()
+                                                        .getSectionDecoderRegistry());
     }
 
     @Override

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java?rev=1785976&r1=1785975&r2=1785976&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java Wed Mar  8 14:44:41 2017
@@ -74,7 +74,7 @@ import org.apache.qpid.server.txn.Server
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 
-public class SendingLinkEndpoint extends LinkEndpoint<SendingLink_1_0>
+public class SendingLinkEndpoint extends LinkEndpoint
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(SendingLinkEndpoint.class);
 
@@ -96,9 +96,9 @@ public class SendingLinkEndpoint extends
     private ConsumerTarget_1_0 _consumerTarget;
     private MessageInstanceConsumer<ConsumerTarget_1_0> _consumer;
 
-    public SendingLinkEndpoint(final SendingLink_1_0 link)
+    public SendingLinkEndpoint(final Session_1_0 session, final LinkImpl link)
     {
-        super(link);
+        super(session, link);
         setDeliveryCount(UnsignedInteger.valueOf(0));
         setAvailable(UnsignedInteger.valueOf(0));
         setCapabilities(Arrays.asList(AMQPConnection_1_0.SHARED_SUBSCRIPTIONS));
@@ -287,6 +287,94 @@ public class SendingLinkEndpoint extends
     }
 
     @Override
+    protected void reattachLink(final Attach attach) throws AmqpErrorException
+    {
+        if (getSource() == null)
+        {
+            throw new IllegalStateException("Terminus should be set when resuming a Link.");
+        }
+        if (attach.getSource() == null)
+        {
+            throw new IllegalStateException("Attach.getSource should not be null when resuming a Link. That would be recovering the Link.");
+        }
+
+        Source newSource = (Source) attach.getSource();
+        Source oldSource = (Source) getSource();
+
+        final SendingDestination destination = getSession().getSendingDestination(getLinkName(), oldSource);
+        prepareConsumerOptionsAndFilters(destination);
+
+        if (getDestination() instanceof ExchangeDestination && !Boolean.TRUE.equals(newSource.getDynamic()))
+        {
+            final SendingDestination newDestination =
+                    getSession().getSendingDestination(getLinkName(), newSource);
+            if (getSession().updateSourceForSubscription(this, newSource, newDestination))
+            {
+                setDestination(newDestination);
+            }
+        }
+
+        attachReceived(attach);
+    }
+
+    @Override
+    protected void resumeLink(final Attach attach) throws AmqpErrorException
+    {
+        if (getSource() == null)
+        {
+            throw new IllegalStateException("Terminus should be set when resuming a Link.");
+        }
+        if (attach.getSource() == null)
+        {
+            throw new IllegalStateException("Attach.getSource should not be null when resuming a Link. That would be recovering the Link.");
+        }
+
+        Source newSource = (Source) attach.getSource();
+        Source oldSource = (Source) getSource();
+
+        final SendingDestination destination = getSession().getSendingDestination(getLinkName(), oldSource);
+        prepareConsumerOptionsAndFilters(destination);
+
+        if (getDestination() instanceof ExchangeDestination && !Boolean.TRUE.equals(newSource.getDynamic()))
+        {
+            final SendingDestination newDestination =
+                    getSession().getSendingDestination(getLinkName(), newSource);
+            if (getSession().updateSourceForSubscription(this, newSource, newDestination))
+            {
+                setDestination(newDestination);
+            }
+        }
+
+        attachReceived(attach);
+        initialiseUnsettled();
+    }
+
+    @Override
+    protected void establishLink(final Attach attach) throws AmqpErrorException
+    {
+        if (getSource() != null || getTarget() != null)
+        {
+            throw new IllegalStateException("LinkEndpoint and Termini should be null when establishing a Link.");
+        }
+
+        attachReceived(attach);
+    }
+
+    @Override
+    protected void recoverLink(final Attach attach) throws AmqpErrorException
+    {
+        if (getSource() == null)
+        {
+            throw new AmqpErrorException(new Error(AmqpError.NOT_FOUND, ""));
+        }
+
+        final SendingDestination destination = getSession().getSendingDestination(getLinkName(), (Source) getSource());
+        prepareConsumerOptionsAndFilters(destination);
+
+        attachReceived(attach);
+    }
+
+    @Override
     public Role getRole()
     {
         return Role.SENDER;
@@ -299,7 +387,7 @@ public class SendingLinkEndpoint extends
 
     public TerminusDurability getTerminusDurability()
     {
-        return getLink().getLocalTerminusDurability();
+        return ((Source) getSource()).getDurable();
     }
 
     public boolean transfer(final Transfer xfr, final boolean decrementCredit)
@@ -459,10 +547,10 @@ public class SendingLinkEndpoint extends
 
             close();
         }
-        else if (detach.getError() != null && !getSession().isSyntheticError(detach.getError()))
+        else if (detach.getError() != null)
         {
             detach();
-            dissociateSession();
+            destroy();
             getConsumerTarget().updateNotifyWorkDesired();
         }
         else
@@ -545,6 +633,36 @@ public class SendingLinkEndpoint extends
     public void attachReceived(final Attach attach) throws AmqpErrorException
     {
         super.attachReceived(attach);
+
+        Target target = (Target) attach.getTarget();
+        Source source = (Source) getSource();
+        if (source == null)
+        {
+            source = new Source();
+            Source attachSource = (Source) attach.getSource();
+
+            final SendingDestination destination = getSession().getSendingDestination(attach.getName(), attachSource);
+            source.setAddress(attachSource.getAddress());
+            source.setDynamic(attachSource.getDynamic());
+            source.setDurable(attachSource.getDurable());
+            source.setExpiryPolicy(attachSource.getExpiryPolicy());
+            source.setDistributionMode(attachSource.getDistributionMode());
+            source.setFilter(attachSource.getFilter());
+            source.setCapabilities(destination.getCapabilities());
+            if (destination instanceof ExchangeDestination)
+            {
+                ExchangeDestination exchangeDestination = (ExchangeDestination) destination;
+                exchangeDestination.getQueue()
+                                   .setAttributes(Collections.<String, Object>singletonMap(Queue.DESIRED_STATE,
+                                                                                           org.apache.qpid.server.model.State.ACTIVE));
+            }
+            getLink().setSource(source);
+            prepareConsumerOptionsAndFilters(destination);
+        }
+
+        getLink().setTarget(target);
+
+
         final MessageInstanceConsumer consumer = getConsumer();
         createConsumerTarget();
         _resumeAcceptedTransfers.clear();
@@ -622,16 +740,15 @@ public class SendingLinkEndpoint extends
         getConsumerTarget().updateNotifyWorkDesired();
     }
 
-    public Map<Binary, MessageInstance> getUnsettledOutcomeMap()
+    @Override
+    public void initialiseUnsettled()
     {
-        Map<Binary, MessageInstance> unsettled = new HashMap<>(_unsettledMap2);
+        Map<Binary, MessageInstance> _localUnsettled = new HashMap<>(_unsettledMap2);
 
-        for (Map.Entry<Binary, MessageInstance> entry : unsettled.entrySet())
+        for (Map.Entry<Binary, MessageInstance> entry : _localUnsettled.entrySet())
         {
             entry.setValue(null);
         }
-
-        return unsettled;
     }
 
     public MessageInstanceConsumer<ConsumerTarget_1_0> getConsumer()

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1785976&r1=1785975&r2=1785976&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Wed Mar  8 14:44:41 2017
@@ -71,7 +71,6 @@ import org.apache.qpid.server.model.Name
 import org.apache.qpid.server.model.NotFoundException;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.Session;
-import org.apache.qpid.server.protocol.LinkModel;
 import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
@@ -95,7 +94,6 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
@@ -135,9 +133,9 @@ public class Session_1_0 extends Abstrac
 
     private SessionState _sessionState;
 
-    private final Map<LinkEndpoint<?>, UnsignedInteger> _endpointToOutputHandle = new HashMap<>();
-    private final Map<UnsignedInteger, LinkEndpoint<?>> _inputHandleToEndpoint = new HashMap<>();
-    private final Set<LinkEndpoint<?>> _associatedLinkEndpoints = new HashSet<>();
+    private final Map<LinkEndpoint, UnsignedInteger> _endpointToOutputHandle = new HashMap<>();
+    private final Map<UnsignedInteger, LinkEndpoint> _inputHandleToEndpoint = new HashMap<>();
+    private final Set<LinkEndpoint> _associatedLinkEndpoints = new HashSet<>();
 
     private final short _receivingChannel;
     private final short _sendingChannel;
@@ -215,27 +213,17 @@ public class Session_1_0 extends Abstrac
             }
             else
             {
-                final Class<? extends LinkModel> linkType;
+                final Link_1_0 link;
                 if (attach.getRole() == Role.RECEIVER)
                 {
-                    linkType = SendingLink_1_0.class;
+                    link = getAddressSpace().getSendingLink(getConnection().getRemoteContainerId(), attach.getName());
                 }
                 else
                 {
-                    if (attach.getTarget() instanceof Coordinator)
-                    {
-                        linkType = TxnCoordinatorReceivingLink_1_0.class;
-                    }
-                    else
-                    {
-                        linkType = StandardReceivingLink_1_0.class;
-                    }
+                    link = getAddressSpace().getReceivingLink(getConnection().getRemoteContainerId(), attach.getName());
                 }
 
-                final Link_1_0 link = (Link_1_0) getAddressSpace().getLink(getConnection().getRemoteContainerId(),
-                                                                           attach.getName(),
-                                                                           linkType);
-                final ListenableFuture<? extends LinkEndpoint<?>> future = link.attach(this, attach);
+                final ListenableFuture<LinkEndpoint> future = link.attach(this, attach);
 
                 addFutureCallback(future, new EndpointCreationCallback(attach), MoreExecutors.directExecutor());
             }
@@ -419,7 +407,7 @@ public class Session_1_0 extends Abstrac
     public void receiveFlow(final Flow flow)
     {
         UnsignedInteger handle = flow.getHandle();
-        final LinkEndpoint<?> endpoint = handle == null ? null : _inputHandleToEndpoint.get(handle);
+        final LinkEndpoint endpoint = handle == null ? null : _inputHandleToEndpoint.get(handle);
 
         final UnsignedInteger nextOutgoingId =
                 flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId();
@@ -432,8 +420,8 @@ public class Session_1_0 extends Abstrac
         }
         else
         {
-            final Collection<LinkEndpoint<?>> allLinkEndpoints = _inputHandleToEndpoint.values();
-            for (LinkEndpoint<?> le : allLinkEndpoints)
+            final Collection<LinkEndpoint> allLinkEndpoints = _inputHandleToEndpoint.values();
+            for (LinkEndpoint le : allLinkEndpoints)
             {
                 le.flowStateChanged();
             }
@@ -577,7 +565,7 @@ public class Session_1_0 extends Abstrac
         _nextIncomingTransferId.incr();
 
         UnsignedInteger inputHandle = transfer.getHandle();
-        LinkEndpoint<?> linkEndpoint = _inputHandleToEndpoint.get(inputHandle);
+        LinkEndpoint linkEndpoint = _inputHandleToEndpoint.get(inputHandle);
 
         if (linkEndpoint == null)
         {
@@ -1119,15 +1107,10 @@ public class Session_1_0 extends Abstrac
 
     void remoteEnd(End end)
     {
-        List<LinkEndpoint<?>> linkEndpoints = new ArrayList<>(_endpointToOutputHandle.keySet());
-        for(LinkEndpoint linkEndpoint : linkEndpoints)
+        for (LinkEndpoint linkEndpoint : _associatedLinkEndpoints)
         {
             linkEndpoint.remoteDetached(new Detach());
-            linkEndpoint.dissociateSession();
-        }
-        for (LinkEndpoint<?> linkEndpoint : _associatedLinkEndpoints)
-        {
-            linkEndpoint.dissociateSession();
+            linkEndpoint.destroy();
         }
         _associatedLinkEndpoints.clear();
 
@@ -1215,7 +1198,7 @@ public class Session_1_0 extends Abstrac
     @Override
     public void transportStateChanged()
     {
-        for (LinkEndpoint<?> linkEndpoint : _endpointToOutputHandle.keySet())
+        for (LinkEndpoint linkEndpoint : _endpointToOutputHandle.keySet())
         {
             if (linkEndpoint instanceof SendingLinkEndpoint)
             {
@@ -1251,7 +1234,7 @@ public class Session_1_0 extends Abstrac
         {
             messageWithSubject(ChannelMessages.FLOW_ENFORCED(queue.getName()));
 
-            for (LinkEndpoint<?> linkEndpoint : _endpointToOutputHandle.keySet())
+            for (LinkEndpoint linkEndpoint : _endpointToOutputHandle.keySet())
             {
                 if (linkEndpoint instanceof ReceivingLinkEndpoint
                     && isQueueDestinationForLink(queue, ((ReceivingLinkEndpoint) linkEndpoint).getReceivingDestination()))
@@ -1291,7 +1274,7 @@ public class Session_1_0 extends Abstrac
             {
                 messageWithSubject(ChannelMessages.FLOW_REMOVED());
             }
-            for (LinkEndpoint<?> linkEndpoint : _endpointToOutputHandle.keySet())
+            for (LinkEndpoint linkEndpoint : _endpointToOutputHandle.keySet())
             {
                 if (linkEndpoint instanceof ReceivingLinkEndpoint
                         && isQueueDestinationForLink(queue, ((ReceivingLinkEndpoint) linkEndpoint).getReceivingDestination()))
@@ -1322,7 +1305,7 @@ public class Session_1_0 extends Abstrac
         {
             messageWithSubject(ChannelMessages.FLOW_ENFORCED("** All Queues **"));
 
-            for (LinkEndpoint<?> linkEndpoint : _endpointToOutputHandle.keySet())
+            for (LinkEndpoint linkEndpoint : _endpointToOutputHandle.keySet())
             {
                 if (linkEndpoint instanceof ReceivingLinkEndpoint)
                 {
@@ -1354,7 +1337,7 @@ public class Session_1_0 extends Abstrac
             {
                 messageWithSubject(ChannelMessages.FLOW_REMOVED());
             }
-            for (LinkEndpoint<?> linkEndpoint : _endpointToOutputHandle.keySet())
+            for (LinkEndpoint linkEndpoint : _endpointToOutputHandle.keySet())
             {
                 if (linkEndpoint instanceof ReceivingLinkEndpoint
                     && !_blockingEntities.contains(((ReceivingLinkEndpoint) linkEndpoint).getReceivingDestination()))
@@ -1532,7 +1515,7 @@ public class Session_1_0 extends Abstrac
     {
         if(_inputHandleToEndpoint.containsKey(handle))
         {
-            LinkEndpoint<?> endpoint = _inputHandleToEndpoint.remove(handle);
+            LinkEndpoint endpoint = _inputHandleToEndpoint.remove(handle);
             endpoint.remoteDetached(detach);
             _endpointToOutputHandle.remove(endpoint);
         }
@@ -1553,11 +1536,6 @@ public class Session_1_0 extends Abstrac
             detach.setError(_sessionEndedLinkError);
             detach(handle, detach);
         }
-
-        for (LinkEndpoint<?> linkEndpoint : _associatedLinkEndpoints)
-        {
-            linkEndpoint.dissociateSession();
-        }
     }
 
 
@@ -1609,7 +1587,7 @@ public class Session_1_0 extends Abstrac
         return primaryDomain;
     }
 
-    private class EndpointCreationCallback<T extends LinkEndpoint<?>> implements FutureCallback<T>
+    private class EndpointCreationCallback implements FutureCallback<LinkEndpoint>
     {
 
         private final Attach _attach;
@@ -1620,7 +1598,7 @@ public class Session_1_0 extends Abstrac
         }
 
         @Override
-        public void onSuccess(final T endpoint)
+        public void onSuccess(final LinkEndpoint endpoint)
         {
             doOnIOThreadAsync(new Runnable()
             {
@@ -1671,7 +1649,7 @@ public class Session_1_0 extends Abstrac
             throw new ConnectionScopedRuntimeException(errorMessage, t);
         }
 
-        private boolean attachWasUnsuccessful(final T endpoint)
+        private boolean attachWasUnsuccessful(final LinkEndpoint endpoint)
         {
             if (endpoint.getRole().equals(Role.SENDER))
             {

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java?rev=1785976&r1=1785975&r2=1785976&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java Wed Mar  8 14:44:41 2017
@@ -36,7 +36,6 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.MessageFormat;
 import org.apache.qpid.server.protocol.MessageFormatRegistry;
-import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
@@ -55,6 +54,7 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
@@ -78,9 +78,10 @@ public class StandardReceivingLinkEndpoi
     private Binary _messageDeliveryTag;
     private Map<Binary, Outcome> _unsettledMap = Collections.synchronizedMap(new HashMap<Binary, Outcome>());
 
-    public StandardReceivingLinkEndpoint(final StandardReceivingLink_1_0 link, final SectionDecoder sectionDecoder)
+    public StandardReceivingLinkEndpoint(final Session_1_0 session,
+                                         final Link_1_0 link)
     {
-        super(link, sectionDecoder);
+        super(session, link);
     }
 
     @Override
@@ -331,7 +332,7 @@ public class StandardReceivingLinkEndpoi
         else if(detach == null || detach.getError() != null)
         {
             detach();
-            dissociateSession();
+            destroy();
         }
         else
         {
@@ -339,6 +340,7 @@ public class StandardReceivingLinkEndpoi
         }
     }
 
+
     @Override
     protected void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
     {
@@ -455,8 +457,21 @@ public class StandardReceivingLinkEndpoi
     public void attachReceived(final Attach attach) throws AmqpErrorException
     {
         super.attachReceived(attach);
+
+        Source source = (Source) attach.getSource();
+        Target target = new Target();
+        Target attachTarget = (Target) attach.getTarget();
+
         setDeliveryCount(attach.getInitialDeliveryCount());
 
+        final ReceivingDestination destination = getSession().getReceivingDestination(attachTarget);
+        target.setAddress(attachTarget.getAddress());
+        target.setDynamic(attachTarget.getDynamic());
+        target.setCapabilities(destination.getCapabilities());
+
+        setCapabilities(Arrays.asList(destination.getCapabilities()));
+        setDestination(destination);
+
         Map initialUnsettledMap = getInitialUnsettledMap();
         Map<Binary, Outcome> unsettledCopy = new HashMap<Binary, Outcome>(_unsettledMap);
         for(Map.Entry<Binary, Outcome> entry : unsettledCopy.entrySet())
@@ -467,12 +482,77 @@ public class StandardReceivingLinkEndpoi
                 _unsettledMap.remove(deliveryTag);
             }
         }
+
+        getLink().setTermini(source, target);
+    }
+
+    @Override
+    public void initialiseUnsettled()
+    {
+        _localUnsettled = new HashMap(_unsettledMap);
+    }
+
+    @Override
+    protected void recoverLink(final Attach attach) throws AmqpErrorException
+    {
+        if (getTarget() == null || !(getTarget() instanceof Target))
+        {
+            throw new AmqpErrorException(new Error(AmqpError.NOT_FOUND,
+                                                   String.format("Link '%s' not found", getLinkName())));
+        }
+
+        Source source = (Source) attach.getSource();
+        Target target = (Target) getTarget();
+
+        final ReceivingDestination destination = getSession().getReceivingDestination((Target) getTarget());
+        target.setCapabilities(destination.getCapabilities());
+        setCapabilities(Arrays.asList(destination.getCapabilities()));
+        setDestination(destination);
+        attachReceived(attach);
+
+        getLink().setTermini(source, target);
+    }
+
+
+    @Override
+    protected void reattachLink(final Attach attach) throws AmqpErrorException
+    {
+        if (attach.getTarget() instanceof Coordinator)
+        {
+            throw new AmqpErrorException(new Error(AmqpError.PRECONDITION_FAILED, "Cannot reattach standard receiving Link as a transaction coordinator"));
+        }
+
+        attachReceived(attach);
     }
 
-    public Map<Binary, Outcome> getUnsettledOutcomeMap()
+    @Override
+    protected void resumeLink(final Attach attach) throws AmqpErrorException
     {
-        return _unsettledMap;
+        if (getTarget() == null)
+        {
+            throw new IllegalStateException("Terminus should be set when resuming a Link.");
+        }
+        if (attach.getTarget() == null)
+        {
+            throw new IllegalStateException("Attach.getTarget should not be null when resuming a Link. That would be recovering the Link.");
+        }
+        if (attach.getTarget() instanceof Coordinator)
+        {
+            throw new AmqpErrorException(new Error(AmqpError.PRECONDITION_FAILED, "Cannot resume standard receiving Link as a transaction coordinator"));
+        }
+
+        attachReceived(attach);
+        initialiseUnsettled();
     }
 
+    @Override
+    protected void establishLink(final Attach attach) throws AmqpErrorException
+    {
+        if (getSource() != null || getTarget() != null)
+        {
+            throw new IllegalStateException("Termini should be null when establishing a Link.");
+        }
 
+        attachReceived(attach);
+    }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java?rev=1785976&r1=1785975&r2=1785976&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java Wed Mar  8 14:44:41 2017
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
@@ -33,6 +32,8 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Declare;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Declared;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge;
@@ -51,10 +52,9 @@ public class TxnCoordinatorReceivingLink
     private final LinkedHashMap<Integer, ServerTransaction> _createdTransactions = new LinkedHashMap<>();
     private ArrayList<Transfer> _incompleteMessage;
 
-    public TxnCoordinatorReceivingLinkEndpoint(final TxnCoordinatorReceivingLink_1_0 link,
-                                               final SectionDecoder sectionDecoder)
+    public TxnCoordinatorReceivingLinkEndpoint(final Session_1_0 session, final Link_1_0 link)
     {
-        super(link, sectionDecoder);
+        super(session, link);
     }
 
     @Override
@@ -217,6 +217,39 @@ public class TxnCoordinatorReceivingLink
     }
 
     @Override
+    protected void reattachLink(final Attach attach) throws AmqpErrorException
+    {
+        throw new AmqpErrorException(new Error(AmqpError.NOT_IMPLEMENTED, "Cannot reattach a Coordinator Link."));
+    }
+
+    @Override
+    protected void resumeLink(final Attach attach) throws AmqpErrorException
+    {
+        throw new AmqpErrorException(new Error(AmqpError.NOT_IMPLEMENTED, "Cannot resume a Coordinator Link."));
+    }
+
+    @Override
+    protected void establishLink(final Attach attach) throws AmqpErrorException
+    {
+        if (getSource() != null || getTarget() != null)
+        {
+            throw new IllegalStateException("LinkEndpoint and Termini should be null when establishing a Link.");
+        }
+
+        Coordinator target = new Coordinator();
+        Source source = (Source) attach.getSource();
+        getLink().setTermini(source, target);
+
+        attachReceived(attach);
+    }
+
+    @Override
+    protected void recoverLink(final Attach attach) throws AmqpErrorException
+    {
+        throw new AmqpErrorException(new Error(AmqpError.NOT_IMPLEMENTED, "Cannot recover a Coordinator Link."));
+    }
+
+    @Override
     protected void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
     {
 
@@ -228,4 +261,9 @@ public class TxnCoordinatorReceivingLink
         super.attachReceived(attach);
         setDeliveryCount(attach.getInitialDeliveryCount());
     }
+
+    @Override
+    public void initialiseUnsettled()
+    {
+    }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java?rev=1785976&r1=1785975&r2=1785976&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java Wed Mar  8 14:44:41 2017
@@ -38,29 +38,25 @@ public class LinkRegistryTest extends Qp
         _linkRegistry = new LinkRegistryImpl(_virtualHost);
     }
 
-    private void doTestGetLink(final Class<? extends LinkModel> type) throws Exception
+    public void testGetSendingLink() throws Exception
     {
         String remoteContainerId = "testRemoteContainerId";
         String linkName = "testLinkName";
-        LinkModel link = _linkRegistry.getLink(remoteContainerId, linkName, type);
-        assertNotNull("LinkRegistry#getLink should always return an object", link);
-        LinkModel link2 = _linkRegistry.getLink(remoteContainerId, linkName, type);
-        assertNotNull("LinkRegistry#getLink should always return an object", link2);
-        assertSame("Two calls to LinkRegistry#getLink should return the same object", link, link2);
-    }
-
-    public void testGetSendingLink() throws Exception
-    {
-        doTestGetLink(SendingLink_1_0.class);
+        LinkModel link = _linkRegistry.getSendingLink(remoteContainerId, linkName);
+        assertNotNull("LinkRegistry#getSendingLink should always return an object", link);
+        LinkModel link2 = _linkRegistry.getSendingLink(remoteContainerId, linkName);
+        assertNotNull("LinkRegistry#getSendingLink should always return an object", link2);
+        assertSame("Two calls to LinkRegistry#getSendingLink should return the same object", link, link2);
     }
 
     public void testGetReceivingLink() throws Exception
     {
-        doTestGetLink(StandardReceivingLink_1_0.class);
-    }
-
-    public void testGetCoordinatingLink() throws Exception
-    {
-        doTestGetLink(TxnCoordinatorReceivingLink_1_0.class);
+        String remoteContainerId = "testRemoteContainerId";
+        String linkName = "testLinkName";
+        LinkModel link = _linkRegistry.getReceivingLink(remoteContainerId, linkName);
+        assertNotNull("LinkRegistry#getReceivingLink should always return an object", link);
+        LinkModel link2 = _linkRegistry.getReceivingLink(remoteContainerId, linkName);
+        assertNotNull("LinkRegistry#getReceivingLink should always return an object", link2);
+        assertSame("Two calls to LinkRegistry#getReceivingLink should return the same object", link, link2);
     }
 }

Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java?rev=1785976&r1=1785975&r2=1785976&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java Wed Mar  8 14:44:41 2017
@@ -40,8 +40,8 @@ import javax.security.auth.Subject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.server.exchange.ExchangeDefaults;
 import org.apache.qpid.server.connection.SessionPrincipal;
+import org.apache.qpid.server.exchange.ExchangeDefaults;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageSender;
@@ -56,7 +56,6 @@ import org.apache.qpid.server.model.port
 import org.apache.qpid.server.plugin.QpidServiceLoader;
 import org.apache.qpid.server.plugin.SystemAddressSpaceCreator;
 import org.apache.qpid.server.protocol.LinkModel;
-import org.apache.qpid.server.virtualhost.LinkRegistry;
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.security.access.Operation;
 import org.apache.qpid.server.session.AMQPSession;
@@ -67,7 +66,7 @@ import org.apache.qpid.server.transport.
 import org.apache.qpid.server.txn.DtxNotSupportedException;
 import org.apache.qpid.server.txn.DtxRegistry;
 import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.LinkRegistry;
 import org.apache.qpid.server.virtualhost.LinkRegistryFactory;
 import org.apache.qpid.server.virtualhost.VirtualHostPropertiesNode;
 
@@ -228,9 +227,15 @@ public class ManagementAddressSpace impl
     }
 
     @Override
-    public <T extends LinkModel> T getLink(final String remoteContainerId, final String linkName, final Class<T> type)
+    public <T extends LinkModel> T getSendingLink(final String remoteContainerId, final String linkName)
+    {
+        return _linkRegistry.getSendingLink(remoteContainerId, linkName);
+    }
+
+    @Override
+    public <T extends LinkModel> T getReceivingLink(final String remoteContainerId, final String linkName)
     {
-        return _linkRegistry.getLink(remoteContainerId, linkName, type);
+        return _linkRegistry.getReceivingLink(remoteContainerId, linkName);
     }
 
     @Override



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org