You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/03/06 15:06:57 UTC

svn commit: r1785660 [1/2] - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid/server/protocol/ broker-core/src/main/java/or...

Author: lquack
Date: Mon Mar  6 15:06:57 2017
New Revision: 1785660

URL: http://svn.apache.org/viewvc?rev=1785660&view=rev
Log:
QPID-7658: [Java Broker] Improve LinkRegistry

Added:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java
      - copied, changed from r1785348, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/LinkRegistry.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryFactory.java
      - copied, changed from r1785348, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLink.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryFactoryImpl.java
      - copied, changed from r1785348, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java
Removed:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/LinkRegistry.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/LinkRegistryImpl.java
Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
    qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Mon Mar  6 15:06:57 2017
@@ -133,7 +133,7 @@ public abstract class AbstractConsumerTa
     @Override
     public boolean processPending()
     {
-        if (!getSession().getAMQPConnection().isIOThread())
+        if (getSession() == null || !getSession().getAMQPConnection().isIOThread())
         {
             return false;
         }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java Mon Mar  6 15:06:57 2017
@@ -2610,8 +2610,8 @@ public abstract class AbstractConfigured
         return returnVal;
     }
 
-    protected static <V> void addFutureCallback(ListenableFuture<V> future, final FutureCallback<V> callback,
-                                                Executor taskExecutor)
+    public static <V> void addFutureCallback(ListenableFuture<V> future, final FutureCallback<V> callback,
+                                             Executor taskExecutor)
     {
         final Subject subject = Subject.getSubject(AccessController.getContext());
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java Mon Mar  6 15:06:57 2017
@@ -29,7 +29,7 @@ import java.util.UUID;
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.LinkRegistry;
+import org.apache.qpid.server.protocol.LinkModel;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.txn.DtxRegistry;
@@ -55,7 +55,7 @@ public interface NamedAddressSpace exten
 
     MessageDestination getDefaultDestination();
 
-    LinkRegistry getLinkRegistry(String remoteContainerId);
+    <T extends LinkModel> T getLink(String remoteContainerId, String linkName, Class<T> type);
 
     boolean authoriseCreateConnection(AMQPConnection<?> connection);
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java Mon Mar  6 15:06:57 2017
@@ -23,6 +23,8 @@ package org.apache.qpid.server.session;
 import java.util.Collection;
 import java.util.UUID;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.logging.EventLoggerProvider;
 import org.apache.qpid.server.logging.LogSubject;
@@ -95,4 +97,6 @@ public interface AMQPSession<S extends o
     void notifyWork(X target);
 
     void close();
+
+    ListenableFuture<Void> doOnIOThreadAsync(final Runnable task);
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java Mon Mar  6 15:06:57 2017
@@ -33,8 +33,10 @@ import java.util.concurrent.CopyOnWriteA
 import javax.security.auth.Subject;
 
 import com.google.common.base.Supplier;
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 
 import org.apache.qpid.server.connection.SessionPrincipal;
 import org.apache.qpid.server.consumer.AbstractConsumerTarget;
@@ -59,6 +61,7 @@ import org.apache.qpid.server.security.S
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.transport.TransactionTimeoutTicker;
 import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.FutureHelper;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 import org.apache.qpid.server.transport.network.Ticker;
 
@@ -411,4 +414,18 @@ public abstract class AbstractAMQPSessio
     protected abstract void updateBlockedStateIfNecessary();
 
     public abstract boolean isClosing();
+
+    @Override
+    public ListenableFuture<Void> doOnIOThreadAsync(final Runnable task)
+    {
+        final ListenableFuture<Void> future = getAMQPConnection().doOnIOThreadAsync(task);
+        return doAfter(MoreExecutors.directExecutor(), future, new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                getAMQPConnection().notifyWork(AbstractAMQPSession.this);
+            }
+        });
+    }
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java Mon Mar  6 15:06:57 2017
@@ -39,7 +39,7 @@ import org.apache.qpid.server.model.Conn
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.LinkRegistry;
+import org.apache.qpid.server.protocol.LinkModel;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.txn.DtxRegistry;
@@ -139,8 +139,11 @@ public abstract class AbstractNonConnect
     }
 
     @Override
-    public LinkRegistry getLinkRegistry(final String remoteContainerId)
+    public <T extends LinkModel> T getLink(final String remoteContainerId,
+                                           final String linkName,
+                                           final Class<T> type)
     {
+        throwUnsupported();
         return null;
     }
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Mon Mar  6 15:06:57 2017
@@ -100,8 +100,7 @@ import org.apache.qpid.server.model.pref
 import org.apache.qpid.server.plugin.ConnectionValidator;
 import org.apache.qpid.server.plugin.QpidServiceLoader;
 import org.apache.qpid.server.plugin.SystemNodeCreator;
-import org.apache.qpid.server.protocol.LinkRegistry;
-import org.apache.qpid.server.protocol.LinkRegistryImpl;
+import org.apache.qpid.server.protocol.LinkModel;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.security.AccessControl;
 import org.apache.qpid.server.security.CompoundAccessControl;
@@ -179,7 +178,7 @@ public abstract class AbstractVirtualHos
 
     private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
 
-    private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>();
+    private volatile LinkRegistry _linkRegistry;
     private AtomicBoolean _blocked = new AtomicBoolean();
 
     private final Map<String, MessageDestination> _systemNodeDestinations =
@@ -596,6 +595,21 @@ public abstract class AbstractVirtualHos
         PreferencesRoot preferencesRoot = (VirtualHostNode) getParent();
         _preferenceStore = preferencesRoot.createPreferenceStore();
 
+        Iterator<LinkRegistryFactory> linkRegistryFactories = (new QpidServiceLoader()).instancesOf(LinkRegistryFactory.class).iterator();
+        if (linkRegistryFactories.hasNext())
+        {
+            final LinkRegistryFactory linkRegistryFactory = linkRegistryFactories.next();
+            if (linkRegistryFactories.hasNext())
+            {
+                throw new RuntimeException("Found multiple implementations of LinkRegistry");
+            }
+            _linkRegistry = linkRegistryFactory.create(this);
+        }
+        else
+        {
+            _linkRegistry = null;
+        }
+
         createHousekeepingExecutor();
     }
 
@@ -1594,15 +1608,10 @@ public abstract class AbstractVirtualHos
         }
     }
 
-    public synchronized LinkRegistry getLinkRegistry(String remoteContainerId)
+    @Override
+    public <T extends LinkModel> T getLink(String remoteContainerId, String linkName, Class<T> type)
     {
-        LinkRegistry linkRegistry = _linkRegistry.get(remoteContainerId);
-        if(linkRegistry == null)
-        {
-            linkRegistry = new LinkRegistryImpl();
-            _linkRegistry.put(remoteContainerId, linkRegistry);
-        }
-        return linkRegistry;
+        return _linkRegistry.getLink(remoteContainerId, linkName, type);
     }
 
     public DtxRegistry getDtxRegistry()

Copied: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java (from r1785348, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/LinkRegistry.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java?p2=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/LinkRegistry.java&r1=1785348&r2=1785660&rev=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/LinkRegistry.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java Mon Mar  6 15:06:57 2017
@@ -18,17 +18,11 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.protocol;
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.qpid.server.protocol.LinkModel;
 
 public interface LinkRegistry
 {
-    LinkModel getDurableSendingLink(String name);
-
-    boolean registerSendingLink(String name, LinkModel link);
-
-    boolean unregisterSendingLink(String name);
-
-    LinkModel getDurableReceivingLink(String name);
-
-    boolean registerReceivingLink(String name, LinkModel link);
+    <T extends LinkModel> T getLink(String remoteContainerId, String linkName, Class<T> type);
 }

Copied: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryFactory.java (from r1785348, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryFactory.java?p2=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryFactory.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java&r1=1785348&r2=1785660&rev=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryFactory.java Mon Mar  6 15:06:57 2017
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,14 +15,14 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
-package org.apache.qpid.server.protocol.v1_0;
 
-import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.plugin.Pluggable;
 
-public interface ReceivingLink_1_0 extends Link_1_0
+public interface LinkRegistryFactory extends Pluggable
 {
-    void setLinkAttachmentToNull();
+    LinkRegistry create(NamedAddressSpace addressSpace);
 }

Added: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLink.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLink.java?rev=1785660&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLink.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLink.java Mon Mar  6 15:06:57 2017
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.protocol.v1_0;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+
+public abstract class AbstractLink<T extends LinkEndpoint<?>> implements Link_1_0
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractLink.class);
+
+    protected final String _linkName;
+    protected T _linkEndpoint;
+    protected volatile BaseSource _source;
+    protected volatile BaseTarget _target;
+
+    public AbstractLink(final String linkName)
+    {
+        _linkName = linkName;
+    }
+
+    @Override
+    public final ListenableFuture<T> attach(final Session_1_0 session, final Attach attach)
+    {
+        final ListenableFuture<T> future;
+        try
+        {
+            boolean isAttachingLocalTerminusNull = (attach.getRole() == Role.SENDER ? attach.getTarget() == null : attach.getSource() == null);
+            boolean isLocalTerminusNull = (attach.getRole() == Role.SENDER ? getTarget() == null : getSource() == null);
+
+            if (isAttachingLocalTerminusNull)
+            {
+                future = recoverLink(session, attach);
+            }
+            else if (isLocalTerminusNull)
+            {
+                future = establishLink(session, attach);
+            }
+            else if (_linkEndpoint != null && _linkEndpoint.getSession() != null && !session.equals(_linkEndpoint.getSession()))
+            {
+                future = stealLink(session, attach);
+            }
+            else if (attach.getUnsettled() != null)
+            {
+                future = resumeLink(session, attach);
+            }
+            else
+            {
+                future = reattachLink(session, attach);
+            }
+        }
+        catch (Throwable t)
+        {
+            return Futures.immediateFailedFuture(t);
+        }
+        AbstractConfiguredObject.addFutureCallback(future, new FutureCallback<T>()
+        {
+            @Override
+            public void onSuccess(final T result)
+            {
+                _linkEndpoint = result;
+            }
+
+            @Override
+            public void onFailure(final Throwable t)
+            {
+            }
+        }, MoreExecutors.directExecutor());
+        return future;
+    }
+
+    protected abstract ListenableFuture<T> recoverLink(final Session_1_0 session, final Attach attach);
+
+    protected abstract ListenableFuture<T> establishLink(final Session_1_0 session, final Attach attach);
+
+    protected abstract ListenableFuture<T> stealLink(final Session_1_0 session, final Attach attach);
+
+    protected abstract ListenableFuture<T> resumeLink(final Session_1_0 session, final Attach attach);
+
+    protected abstract ListenableFuture<T> reattachLink(final Session_1_0 session, final Attach attach);
+
+    @Override
+    public void linkClosed()
+    {
+        discardEndpoint();
+    }
+
+    @Override
+    public void discardEndpoint()
+    {
+        _linkEndpoint = null;
+    }
+
+    @Override
+    public final String getName()
+    {
+        return _linkName;
+    }
+
+    @Override
+    public BaseSource getSource()
+    {
+        return _source;
+    }
+
+    @Override
+    public BaseTarget getTarget()
+    {
+        return _target;
+    }
+
+    TerminusDurability getLocalTerminusDurability()
+    {
+        if (_linkEndpoint.getRole() == Role.SENDER)
+        {
+            return ((Source) getSource()).getDurable();
+        }
+        else if (getTarget() instanceof Target)
+        {
+            return ((Target) getTarget()).getDurable();
+        }
+        else
+        {
+            return TerminusDurability.NONE;
+        }
+    }
+}

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Mon Mar  6 15:06:57 2017
@@ -607,7 +607,7 @@ class ConsumerTarget_1_0 extends Abstrac
     {
         BaseTarget target = _linkEndpoint.getTarget();
 
-        return target instanceof org.apache.qpid.server.protocol.v1_0.type.messaging.Target ? ((org.apache.qpid.server.protocol.v1_0.type.messaging.Target) target).getAddress() : _linkEndpoint.getName();
+        return target instanceof org.apache.qpid.server.protocol.v1_0.type.messaging.Target ? ((org.apache.qpid.server.protocol.v1_0.type.messaging.Target) target).getAddress() : _linkEndpoint.getLinkName();
     }
 
     @Override

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java Mon Mar  6 15:06:57 2017
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
+import static org.apache.qpid.server.protocol.v1_0.Session_1_0.DELAYED_DELIVERY;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -85,6 +87,7 @@ public class ExchangeDestination extends
             destinationCapabilities.add(REJECT_UNROUTABLE);
         }
         destinationCapabilities.add(TOPIC_CAPABILITY);
+        destinationCapabilities.add(DELAYED_DELIVERY);
 
         _capabilities = destinationCapabilities.toArray(new Symbol[destinationCapabilities.size()]);
     }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java Mon Mar  6 15:06:57 2017
@@ -27,7 +27,11 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
 import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
@@ -45,8 +49,9 @@ import org.apache.qpid.server.protocol.v
 
 public abstract class LinkEndpoint<T extends Link_1_0>
 {
-
-    private T _link;
+    private static final Logger LOGGER = LoggerFactory.getLogger(LinkEndpoint.class);
+    private final T _link;
+    private Session_1_0 _session;
     private Object _flowTransactionId;
     private SenderSettleMode _sendingSettlementMode;
     private ReceiverSettleMode _receivingSettlementMode;
@@ -56,6 +61,15 @@ public abstract class LinkEndpoint<T ext
     private volatile boolean _stopped;
     private volatile boolean _stoppedUpdated;
     private Symbol[] _capabilities;
+    private UnsignedInteger _deliveryCount;
+    private UnsignedInteger _linkCredit;
+    private UnsignedInteger _available;
+    private Boolean _drain;
+    private UnsignedInteger _localHandle;
+    private UnsignedLong _maxMessageSize;
+    private Map<Symbol, Object> _properties;
+
+    protected volatile State _state = State.ATTACH_RECVD;
 
     protected enum State
     {
@@ -65,37 +79,37 @@ public abstract class LinkEndpoint<T ext
         ATTACHED,
         DETACH_SENT,
         DETACH_RECVD
-    };
+    }
 
-    private final String _name;
 
-    private Session_1_0 _session;
+    LinkEndpoint(final T link)
+    {
+        _link = link;
+    }
 
+    public abstract void start();
 
-    protected volatile State _state = State.DETACHED;
+    public abstract Role getRole();
 
-    private BaseSource _source;
-    private BaseTarget _target;
-    private UnsignedInteger _deliveryCount;
-    private UnsignedInteger _linkCredit;
-    private UnsignedInteger _available;
-    private Boolean _drain;
-    private UnsignedInteger _localHandle;
-    private UnsignedLong _maxMessageSize;
-    private Map<Symbol, Object> _properties;
+    public abstract void flowStateChanged();
 
-    LinkEndpoint(final Session_1_0 sessionEndpoint,final Attach attach)
-    {
-        _session = sessionEndpoint;
+    public abstract void receiveFlow(final Flow flow);
+
+    protected abstract void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled);
+
+    protected abstract void remoteDetachedPerformDetach(final Detach detach);
 
-        _name = attach.getName();
+    protected abstract Map<Symbol,Object> initProperties(final Attach attach);
+
+    public void attachReceived(final Attach attach) throws AmqpErrorException
+    {
+        _sendingSettlementMode = attach.getSndSettleMode();
+        _receivingSettlementMode = attach.getRcvSettleMode();
         _initialUnsettledMap = attach.getUnsettled();
         _properties = initProperties(attach);
         _state = State.ATTACH_RECVD;
     }
 
-    public abstract void start();
-
     public boolean isStopped()
     {
         return _stopped;
@@ -111,38 +125,24 @@ public abstract class LinkEndpoint<T ext
         }
     }
 
-    protected abstract Map<Symbol,Object> initProperties(final Attach attach);
-
-    public String getName()
+    public String getLinkName()
     {
-        return _name;
+        return _link.getName();
     }
 
-    public abstract Role getRole();
-
     public BaseSource getSource()
     {
-        return _source;
-    }
-
-    public NamedAddressSpace getAddressSpace()
-    {
-        return getSession().getConnection().getAddressSpace();
-    }
-
-    public void setSource(final BaseSource source)
-    {
-        _source = source;
+        return _link.getSource();
     }
 
     public BaseTarget getTarget()
     {
-        return _target;
+        return _link.getTarget();
     }
 
-    public void setTarget(final BaseTarget target)
+    public NamedAddressSpace getAddressSpace()
     {
-        _target = target;
+        return getSession().getConnection().getAddressSpace();
     }
 
     public void setDeliveryCount(final UnsignedInteger deliveryCount)
@@ -199,12 +199,6 @@ public abstract class LinkEndpoint<T ext
         }
     }
 
-    protected abstract void remoteDetachedPerformDetach(final Detach detach);
-
-    public void receiveFlow(final Flow flow)
-    {
-    }
-
     public void addUnsettled(final Delivery unsettled)
     {
     }
@@ -221,8 +215,6 @@ public abstract class LinkEndpoint<T ext
         }
     }
 
-    protected abstract void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled);
-
     public void settle(final Binary deliveryTag)
     {
 
@@ -233,51 +225,6 @@ public abstract class LinkEndpoint<T ext
         _localHandle = localHandle;
     }
 
-    void receiveAttach(final Attach attach)
-    {
-        switch (_state)
-        {
-            case ATTACH_SENT:
-            {
-
-                _state = State.ATTACHED;
-
-                _initialUnsettledMap = attach.getUnsettled();
-                    /*  TODO - don't yet handle:
-
-                        attach.getProperties();
-                        attach.getDurable();
-                        attach.getExpiryPolicy();
-                        attach.getTimeout();
-                     */
-
-                break;
-            }
-
-            case DETACHED:
-            {
-                _state = State.ATTACH_RECVD;
-                break;
-            }
-
-
-        }
-
-        if (attach.getRole() == Role.SENDER)
-        {
-            _source = attach.getSource();
-        }
-        else
-        {
-            _target = attach.getTarget();
-        }
-
-        if (getRole() == Role.SENDER)
-        {
-            _maxMessageSize = attach.getMaxMessageSize();
-        }
-    }
-
     boolean isAttached()
     {
         return _state == State.ATTACHED;
@@ -293,21 +240,32 @@ public abstract class LinkEndpoint<T ext
         return _session;
     }
 
-    public void setSession(final Session_1_0 session)
+    public void associateSession(final Session_1_0 session)
     {
+        if (session == null)
+        {
+            throw new IllegalStateException("To dissociate session from Endpoint call LinkEndpoint#dissociateSession() "
+                                            + "instead of LinkEndpoint#associate(null)");
+        }
         _session = session;
     }
 
+    public void dissociateSession()
+    {
+        setLocalHandle(null);
+        _session = null;
+        getLink().discardEndpoint();
+    }
+
     UnsignedInteger getLocalHandle()
     {
         return _localHandle;
     }
 
-
     public void attach()
     {
         Attach attachToSend = new Attach();
-        attachToSend.setName(getName());
+        attachToSend.setName(getLinkName());
         attachToSend.setRole(getRole());
         attachToSend.setHandle(getLocalHandle());
         attachToSend.setSource(getSource());
@@ -332,14 +290,13 @@ public abstract class LinkEndpoint<T ext
                 _state = State.ATTACHED;
                 break;
             default:
-                // TODO ERROR
+                throw new UnsupportedOperationException(_state.toString());
         }
 
         getSession().sendAttach(attachToSend);
 
     }
 
-
     public void detach()
     {
         detach(null, false);
@@ -350,14 +307,14 @@ public abstract class LinkEndpoint<T ext
         detach(null, true);
     }
 
-    public void close(Error error)
+    public void detach(Error error)
     {
-        detach(error, true);
+        detach(error, false);
     }
 
-    public void detach(Error error)
+    public void close(Error error)
     {
-        detach(error, false);
+        detach(error, true);
     }
 
     private void detach(Error error, boolean close)
@@ -375,20 +332,26 @@ public abstract class LinkEndpoint<T ext
                 return;
         }
 
-        if (!(getSession().getSessionState() == SessionState.END_RECVD || getSession().isEnded()))
+        if (getSession().getSessionState() != SessionState.END_RECVD && !getSession().isEnded())
         {
             Detach detach = new Detach();
             detach.setHandle(getLocalHandle());
             if (close)
+            {
                 detach.setClosed(close);
+            }
             detach.setError(error);
 
             getSession().sendDetach(detach);
         }
-    }
-
-
 
+        if (close)
+        {
+            dissociateSession();
+            _link.linkClosed();
+        }
+        setLocalHandle(null);
+    }
 
     public void setTransactionId(final Object txnId)
     {
@@ -477,16 +440,6 @@ public abstract class LinkEndpoint<T ext
         return _link;
     }
 
-    public void setLink(final T link)
-    {
-        _link = link;
-    }
-
-    public void setSendingSettlementMode(SenderSettleMode sendingSettlementMode)
-    {
-        _sendingSettlementMode = sendingSettlementMode;
-    }
-
     public SenderSettleMode getSendingSettlementMode()
     {
         return _sendingSettlementMode;
@@ -497,11 +450,6 @@ public abstract class LinkEndpoint<T ext
         return _receivingSettlementMode;
     }
 
-    public void setReceivingSettlementMode(ReceiverSettleMode receivingSettlementMode)
-    {
-        _receivingSettlementMode = receivingSettlementMode;
-    }
-
     public List<Symbol> getCapabilities()
     {
         return _capabilities == null ? null : Collections.unmodifiableList(Arrays.asList(_capabilities));
@@ -517,9 +465,6 @@ public abstract class LinkEndpoint<T ext
         return _initialUnsettledMap;
     }
 
-
-    public abstract void flowStateChanged();
-
     public void setLocalUnsettled(Map unsettled)
     {
         _localUnsettled = unsettled;
@@ -528,12 +473,12 @@ public abstract class LinkEndpoint<T ext
     @Override public String toString()
     {
         return "LinkEndpoint{" +
-               "_name='" + _name + '\'' +
+               "_name='" + getLinkName() + '\'' +
                ", _session=" + _session +
                ", _state=" + _state +
                ", _role=" + getRole() +
-               ", _source=" + _source +
-               ", _target=" + _target +
+               ", _source=" + getSource() +
+               ", _target=" + getTarget() +
                ", _transferCount=" + _deliveryCount +
                ", _linkCredit=" + _linkCredit +
                ", _available=" + _available +

Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryFactoryImpl.java (from r1785348, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryFactoryImpl.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryFactoryImpl.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java&r1=1785348&r2=1785660&rev=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryFactoryImpl.java Mon Mar  6 15:06:57 2017
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,24 +15,27 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
+
 package org.apache.qpid.server.protocol.v1_0;
 
-import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.TxnCapability;
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.plugin.PluggableService;
+import org.apache.qpid.server.virtualhost.LinkRegistry;
+import org.apache.qpid.server.virtualhost.LinkRegistryFactory;
 
-public class TxnCoordinatorReceivingLink_1_0 implements ReceivingLink_1_0
+@PluggableService
+public class LinkRegistryFactoryImpl implements LinkRegistryFactory
 {
-    public TxnCoordinatorReceivingLink_1_0(ReceivingLinkEndpoint endpoint)
+    @Override
+    public String getType()
     {
-        ((Coordinator)endpoint.getTarget()).setCapabilities(TxnCapability.LOCAL_TXN, TxnCapability.MULTI_SSNS_PER_TXN, TxnCapability.MULTI_TXNS_PER_SSN);
+        return "org.apache.qpid.LinkRegistryFactory.amqp_1_0";
     }
 
-
     @Override
-    public void setLinkAttachmentToNull()
+    public LinkRegistry create(final NamedAddressSpace addressSpace)
     {
+        return new LinkRegistryImpl(addressSpace);
     }
-
 }

Added: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java?rev=1785660&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java Mon Mar  6 15:06:57 2017
@@ -0,0 +1,114 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.protocol.LinkModel;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.LinkRegistry;
+
+public class LinkRegistryImpl implements LinkRegistry
+{
+    private final Map<String, Map<String, SendingLink_1_0>> _sendingLinkRegistry = new HashMap<>();
+    private final Map<String, Map<String, StandardReceivingLink_1_0>> _receivingLinkRegistry = new HashMap<>();
+    private final Map<String, Map<String, TxnCoordinatorReceivingLink_1_0>> _coordinatorLinkRegistry = new HashMap<>();
+    private final NamedAddressSpace _addressSpace;
+
+    LinkRegistryImpl(final NamedAddressSpace addressSpace)
+    {
+        _addressSpace = addressSpace;
+    }
+
+    @Override
+    public synchronized <T extends LinkModel> T getLink(final String remoteContainerId, final String linkName, final Class<T> type)
+    {
+        if (SendingLink_1_0.class.equals(type))
+        {
+            return (T) getSendingLink(remoteContainerId, linkName);
+        }
+        else if (StandardReceivingLink_1_0.class.equals(type))
+        {
+            return (T) getReceivingLink(remoteContainerId, linkName);
+        }
+        else if (TxnCoordinatorReceivingLink_1_0.class.equals(type))
+        {
+            return (T) getCoordinatorLink(remoteContainerId, linkName);
+        }
+        else
+        {
+            throw new ConnectionScopedRuntimeException(String.format("Unsupported link type: '%s'", type.getSimpleName()));
+        }
+    }
+
+    private TxnCoordinatorReceivingLink_1_0 getCoordinatorLink(final String remoteContainerId, final String linkName)
+    {
+        Map<String, TxnCoordinatorReceivingLink_1_0> containerRegistry = _coordinatorLinkRegistry.get(remoteContainerId);
+        if (containerRegistry == null)
+        {
+            containerRegistry = new HashMap<>();
+            _coordinatorLinkRegistry.put(remoteContainerId, containerRegistry);
+        }
+        TxnCoordinatorReceivingLink_1_0 link = containerRegistry.get(linkName);
+        if (link == null)
+        {
+            link = new TxnCoordinatorReceivingLink_1_0(linkName);
+            containerRegistry.put(linkName, link);
+        }
+        return link;
+    }
+
+    private SendingLink_1_0 getSendingLink(final String remoteContainerId, final String linkName)
+    {
+        Map<String, SendingLink_1_0> containerRegistry = _sendingLinkRegistry.get(remoteContainerId);
+        if (containerRegistry == null)
+        {
+            containerRegistry = new HashMap<>();
+            _sendingLinkRegistry.put(remoteContainerId, containerRegistry);
+        }
+        SendingLink_1_0 link = containerRegistry.get(linkName);
+        if (link == null)
+        {
+            link = new SendingLink_1_0(linkName);
+            containerRegistry.put(linkName, link);
+        }
+        return link;
+    }
+
+    private StandardReceivingLink_1_0 getReceivingLink(final String remoteContainerId, final String linkName)
+    {
+        Map<String, StandardReceivingLink_1_0> containerRegistry = _receivingLinkRegistry.get(remoteContainerId);
+        if (containerRegistry == null)
+        {
+            containerRegistry = new HashMap<>();
+            _receivingLinkRegistry.put(remoteContainerId, containerRegistry);
+        }
+        StandardReceivingLink_1_0 link = containerRegistry.get(linkName);
+        if (link == null)
+        {
+            link = new StandardReceivingLink_1_0(linkName);
+            containerRegistry.put(linkName, link);
+        }
+        return link;
+    }
+}

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java Mon Mar  6 15:06:57 2017
@@ -20,12 +20,24 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.protocol.LinkModel;
-import org.apache.qpid.server.protocol.v1_0.type.Binary;
-import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 
 public interface Link_1_0 extends LinkModel
 {
+    ListenableFuture<? extends LinkEndpoint<?>> attach(Session_1_0 session, final Attach attach);
+
+    void linkClosed();
+
+    void discardEndpoint();
+
+    String getName();
+
+    BaseSource getSource();
 
+    BaseTarget getTarget();
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java Mon Mar  6 15:06:57 2017
@@ -25,12 +25,12 @@ import static org.apache.qpid.server.mod
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
-import org.apache.qpid.server.message.MessageSource;
 
 public class MessageSourceDestination implements SendingDestination
 {

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java Mon Mar  6 15:06:57 2017
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
+import static org.apache.qpid.server.protocol.v1_0.Session_1_0.DELAYED_DELIVERY;
+
 import java.util.Arrays;
 import java.util.Collections;
 
@@ -222,8 +224,9 @@ public class NodeReceivingDestination im
     @Override
     public Symbol[] getCapabilities()
     {
-        Symbol[] capabilities = new Symbol[1];
+        Symbol[] capabilities = new Symbol[2];
         capabilities[0] = _discardUnroutable ? DISCARD_UNROUTABLE : REJECT_UNROUTABLE;
+        capabilities[1] = DELAYED_DELIVERY;
         return capabilities;
     }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java Mon Mar  6 15:06:57 2017
@@ -25,7 +25,6 @@ import java.util.Collections;
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java Mon Mar  6 15:06:57 2017
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
@@ -43,6 +44,7 @@ public abstract class ReceivingLinkEndpo
 
     private UnsignedInteger _lastDeliveryId;
     private ReceivingDestination _receivingDestination;
+    private final SectionDecoder _sectionDecoder;
 
     private static class TransientState
     {
@@ -89,12 +91,10 @@ public abstract class ReceivingLinkEndpo
     private UnsignedInteger _drainLimit;
 
 
-    public ReceivingLinkEndpoint(final Session_1_0 session, final Attach attach)
+    public ReceivingLinkEndpoint(final ReceivingLink_1_0 link, final SectionDecoder sectionDecoder)
     {
-        super(session, attach);
-        setDeliveryCount(attach.getInitialDeliveryCount());
-        setSendingSettlementMode(attach.getSndSettleMode());
-        setReceivingSettlementMode(attach.getRcvSettleMode());
+        super(link);
+        _sectionDecoder = sectionDecoder;
     }
 
     @Override
@@ -168,7 +168,6 @@ public abstract class ReceivingLinkEndpo
 
     @Override public void receiveFlow(final Flow flow)
     {
-        super.receiveFlow(flow);
         _remoteDrain = Boolean.TRUE.equals((Boolean) flow.getDrain());
         setAvailable(flow.getAvailable());
         setDeliveryCount(flow.getDeliveryCount());
@@ -283,9 +282,10 @@ public abstract class ReceivingLinkEndpo
         }
     }
 
-
-
-
+    SectionDecoder getSectionDecoder()
+    {
+        return _sectionDecoder;
+    }
 
     @Override
     public void settle(Binary deliveryTag)

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java Mon Mar  6 15:06:57 2017
@@ -20,10 +20,6 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
-import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-
 public interface ReceivingLink_1_0 extends Link_1_0
 {
-    void setLinkAttachmentToNull();
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java Mon Mar  6 15:06:57 2017
@@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol.
 
 import java.security.AccessControlException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -95,12 +96,12 @@ public class SendingLinkEndpoint extends
     private ConsumerTarget_1_0 _consumerTarget;
     private MessageInstanceConsumer<ConsumerTarget_1_0> _consumer;
 
-    public SendingLinkEndpoint(final Session_1_0 session, final Attach attach)
+    public SendingLinkEndpoint(final SendingLink_1_0 link)
     {
-        super(session, attach);
-        setSendingSettlementMode(attach.getSndSettleMode());
-        setReceivingSettlementMode(attach.getRcvSettleMode());
-        init();
+        super(link);
+        setDeliveryCount(UnsignedInteger.valueOf(0));
+        setAvailable(UnsignedInteger.valueOf(0));
+        setCapabilities(Arrays.asList(AMQPConnection_1_0.SHARED_SUBSCRIPTIONS));
     }
 
     @Override
@@ -108,8 +109,9 @@ public class SendingLinkEndpoint extends
     {
     }
 
-    public void doStuff(final SendingDestination destination) throws AmqpErrorException
+    public void prepareConsumerOptionsAndFilters(final SendingDestination destination) throws AmqpErrorException
     {
+        // TODO FIXME: this method might modify the source. this is not good encapsulation. furthermore if it does so then it should inform the link/linkregistry about it!
         _destination = destination;
         final Source source = (Source) getSource();
 
@@ -206,11 +208,11 @@ public class SendingLinkEndpoint extends
             if(getTarget() instanceof Target)
             {
                 Target target = (Target) getTarget();
-                name = target.getAddress() == null ? getName() : target.getAddress();
+                name = target.getAddress() == null ? getLinkName() : target.getAddress();
             }
             else
             {
-                name = getName();
+                name = getLinkName();
             }
 
             _consumer = _destination.getMessageSource()
@@ -284,12 +286,6 @@ public class SendingLinkEndpoint extends
         }
     }
 
-    private void init()
-    {
-        setDeliveryCount(UnsignedInteger.valueOf(0));
-        setAvailable(UnsignedInteger.valueOf(0));
-    }
-
     @Override
     public Role getRole()
     {
@@ -301,9 +297,9 @@ public class SendingLinkEndpoint extends
         return _priority;
     }
 
-    public void setDurability(final TerminusDurability durability)
+    public TerminusDurability getTerminusDurability()
     {
-        _durability = durability;
+        return getLink().getLocalTerminusDurability();
     }
 
     public boolean transfer(final Transfer xfr, final boolean decrementCredit)
@@ -358,7 +354,6 @@ public class SendingLinkEndpoint extends
     @Override
     public void receiveFlow(final Flow flow)
     {
-        super.receiveFlow(flow);
         UnsignedInteger t = flow.getDeliveryCount();
         UnsignedInteger c = flow.getLinkCredit();
         setDrain(flow.getDrain());
@@ -423,8 +418,8 @@ public class SendingLinkEndpoint extends
         //TODO
         // if not durable or close
         if (Boolean.TRUE.equals(detach.getClosed())
-            || !(TerminusDurability.UNSETTLED_STATE.equals(_durability) || TerminusDurability.CONFIGURATION.equals(
-                _durability)))
+            || !(TerminusDurability.UNSETTLED_STATE.equals(getTerminusDurability())
+                 || TerminusDurability.CONFIGURATION.equals(getTerminusDurability())))
         {
 
             Modified state = new Modified();
@@ -436,11 +431,9 @@ public class SendingLinkEndpoint extends
             }
             _unsettledActionMap.clear();
 
-            close();
-
             if (getDestination() instanceof ExchangeDestination
-               && (_durability == TerminusDurability.CONFIGURATION
-                   || _durability == TerminusDurability.UNSETTLED_STATE))
+               && (getTerminusDurability() == TerminusDurability.CONFIGURATION
+                   || getTerminusDurability() == TerminusDurability.UNSETTLED_STATE))
             {
                 try
                 {
@@ -452,30 +445,25 @@ public class SendingLinkEndpoint extends
                 catch (AccessControlException e)
                 {
                     LOGGER.error("Error unregistering subscription", e);
-                    detach(new Error(AmqpError.NOT_ALLOWED, "Error unregistering subscription"));
+                    close(new Error(AmqpError.NOT_ALLOWED, "Error unregistering subscription"));
                 }
                 catch (IllegalStateException e)
                 {
-                    detach(new Error(AmqpError.RESOURCE_LOCKED, e.getMessage()));
+                    close(new Error(AmqpError.RESOURCE_LOCKED, e.getMessage()));
                 }
                 catch (NotFoundException e)
                 {
-                    detach(new Error(AmqpError.NOT_FOUND, e.getMessage()));
+                    close(new Error(AmqpError.NOT_FOUND, e.getMessage()));
                 }
             }
+
+            close();
         }
         else if (detach.getError() != null && !getSession().isSyntheticError(detach.getError()))
         {
-            try
-            {
-                getLink().setLinkAttachment(null, null);
-            }
-            catch (AmqpErrorException e)
-            {
-                throw new ConnectionScopedRuntimeException(e);
-            }
-            getConsumerTarget().flowStateChanged();
             detach();
+            dissociateSession();
+            getConsumerTarget().updateNotifyWorkDesired();
         }
         else
         {
@@ -553,66 +541,45 @@ public class SendingLinkEndpoint extends
         return _transactionId;
     }
 
-    public void doLinkAttachment(final Session_1_0 session, final MessageInstanceConsumer consumer) throws AmqpErrorException
+    @Override
+    public void attachReceived(final Attach attach) throws AmqpErrorException
     {
-        if (session != null)
-        {
-            createConsumerTarget();
-
-            setSession(session);
-            _resumeAcceptedTransfers.clear();
-            _resumeFullTransfers.clear();
-            final NamedAddressSpace addressSpace = getSession().getConnection().getAddressSpace();
-            Map<Binary, MessageInstance> unsettledCopy = new HashMap<>(_unsettledMap2);
-            Map initialUnsettledMap = getInitialUnsettledMap();
+        super.attachReceived(attach);
+        final MessageInstanceConsumer consumer = getConsumer();
+        createConsumerTarget();
+        _resumeAcceptedTransfers.clear();
+        _resumeFullTransfers.clear();
+        final NamedAddressSpace addressSpace = getSession().getConnection().getAddressSpace();
+        Map<Binary, MessageInstance> unsettledCopy = new HashMap<>(_unsettledMap2);
+        Map initialUnsettledMap = getInitialUnsettledMap();
 
-            for (Map.Entry<Binary, MessageInstance> entry : unsettledCopy.entrySet())
+        for (Map.Entry<Binary, MessageInstance> entry : unsettledCopy.entrySet())
+        {
+            Binary deliveryTag = entry.getKey();
+            final MessageInstance queueEntry = entry.getValue();
+            if (initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag))
             {
-                Binary deliveryTag = entry.getKey();
-                final MessageInstance queueEntry = entry.getValue();
-                if (initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag))
-                {
-                    queueEntry.setRedelivered();
-                    queueEntry.release(consumer);
-                    _unsettledMap2.remove(deliveryTag);
-                }
-                else if (initialUnsettledMap.get(deliveryTag) instanceof Outcome)
-                {
-                    Outcome outcome = (Outcome) initialUnsettledMap.get(deliveryTag);
+                queueEntry.setRedelivered();
+                queueEntry.release(consumer);
+                _unsettledMap2.remove(deliveryTag);
+            }
+            else if (initialUnsettledMap.get(deliveryTag) instanceof Outcome)
+            {
+                Outcome outcome = (Outcome) initialUnsettledMap.get(deliveryTag);
 
-                    if (outcome instanceof Accepted)
-                    {
-                        AutoCommitTransaction txn = new AutoCommitTransaction(addressSpace.getMessageStore());
-                        if (consumer.acquires())
-                        {
-                            if (queueEntry.acquire() || queueEntry.isAcquired())
-                            {
-                                txn.dequeue(Collections.singleton(queueEntry),
-                                            new ServerTransaction.Action()
-                                            {
-                                                public void postCommit()
-                                                {
-                                                    queueEntry.delete();
-                                                }
-
-                                                public void onRollback()
-                                                {
-                                                }
-                                            });
-                            }
-                        }
-                    }
-                    else if (outcome instanceof Released)
+                if (outcome instanceof Accepted)
+                {
+                    AutoCommitTransaction txn = new AutoCommitTransaction(addressSpace.getMessageStore());
+                    if (consumer.acquires())
                     {
-                        AutoCommitTransaction txn = new AutoCommitTransaction(addressSpace.getMessageStore());
-                        if (consumer.acquires())
+                        if (queueEntry.acquire() || queueEntry.isAcquired())
                         {
                             txn.dequeue(Collections.singleton(queueEntry),
                                         new ServerTransaction.Action()
                                         {
                                             public void postCommit()
                                             {
-                                                queueEntry.release(consumer);
+                                                queueEntry.delete();
                                             }
 
                                             public void onRollback()
@@ -621,16 +588,34 @@ public class SendingLinkEndpoint extends
                                         });
                         }
                     }
-                    //_unsettledMap.remove(deliveryTag);
-                    initialUnsettledMap.remove(deliveryTag);
-                    _resumeAcceptedTransfers.add(deliveryTag);
                 }
-                else
+                else if (outcome instanceof Released)
                 {
-                    _resumeFullTransfers.add(queueEntry);
-                    // exists in receivers map, but not yet got an outcome ... should resend with resume = true
+                    AutoCommitTransaction txn = new AutoCommitTransaction(addressSpace.getMessageStore());
+                    if (consumer.acquires())
+                    {
+                        txn.dequeue(Collections.singleton(queueEntry),
+                                    new ServerTransaction.Action()
+                                    {
+                                        public void postCommit()
+                                        {
+                                            queueEntry.release(consumer);
+                                        }
+
+                                        public void onRollback()
+                                        {
+                                        }
+                                    });
+                    }
                 }
-                // TODO - else
+                //_unsettledMap.remove(deliveryTag);
+                initialUnsettledMap.remove(deliveryTag);
+                _resumeAcceptedTransfers.add(deliveryTag);
+            }
+            else
+            {
+                _resumeFullTransfers.add(queueEntry);
+                // exists in receivers map, but not yet got an outcome ... should resend with resume = true
             }
         }
 

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Mon Mar  6 15:06:57 2017
@@ -20,28 +20,197 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
+import java.util.Collections;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 
-public class SendingLink_1_0 implements Link_1_0
+public class SendingLink_1_0 extends AbstractLink<SendingLinkEndpoint>
 {
+    public SendingLink_1_0(final String linkName)
+    {
+        super(linkName);
+    }
+
+    @Override
+    protected ListenableFuture<SendingLinkEndpoint> stealLink(final Session_1_0 session, final Attach attach)
+    {
+        throw new UnsupportedOperationException("Link stealing is not implemented yet.");
+        /*
+        final SettableFuture<SendingLinkEndpoint> returnFuture = SettableFuture.create();
+        _linkEndpoint.getSession().doOnIOThreadAsync(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                _linkEndpoint.close(new Error(LinkError.STOLEN,
+                                              String.format("Link is being stolen by connection '%s'",
+                                                            session.getConnection())));
+                try
+                {
+                    returnFuture.set(attach(session, attach).get());
+                }
+                catch (InterruptedException e)
+                {
+                    returnFuture.setException(e);
+                    Thread.currentThread().interrupt();
+                }
+                catch (ExecutionException e)
+                {
+                    returnFuture.setException(e.getCause());
+                }
+            }
+        });
+        return returnFuture;
+        */
+    }
 
-    private volatile SendingLinkEndpoint _linkEndpoint;
+    @Override
+    protected ListenableFuture<SendingLinkEndpoint> reattachLink(final Session_1_0 session, final Attach attach)
+    {
+        if (_linkEndpoint == null)
+        {
+            _linkEndpoint = new SendingLinkEndpoint(this);
+        }
+
+        _source = new Source();
+        _target = attach.getTarget();
+
+        try
+        {
+            _linkEndpoint.associateSession(session);
+
+            Source attachSource = (Source) attach.getSource();
+            final SendingDestination destination = session.getSendingDestination(attach.getName(), attachSource);
+            ((Source) getSource()).setAddress(attachSource.getAddress());
+            ((Source) getSource()).setDynamic(attachSource.getDynamic());
+            ((Source) getSource()).setDurable(attachSource.getDurable());
+            ((Source) getSource()).setExpiryPolicy(attachSource.getExpiryPolicy());
+            ((Source) getSource()).setDistributionMode(attachSource.getDistributionMode());
+            ((Source) getSource()).setFilter(attachSource.getFilter());
+            ((Source) getSource()).setCapabilities(destination.getCapabilities());
+            _linkEndpoint.prepareConsumerOptionsAndFilters(destination);
+            _linkEndpoint.attachReceived(attach);
+
+            if (destination instanceof ExchangeDestination)
+            {
+                ExchangeDestination exchangeDestination = (ExchangeDestination) destination;
+                exchangeDestination.getQueue()
+                                   .setAttributes(Collections.<String, Object>singletonMap(Queue.DESIRED_STATE,
+                                                                                           State.ACTIVE));
+            }
+        }
+        catch (AmqpErrorException e)
+        {
+            rejectLink(session, attach);
+        }
 
+        return Futures.immediateFuture(_linkEndpoint);
+    }
 
-    public SendingLink_1_0(final SendingLinkEndpoint linkEndpoint)
+    @Override
+    protected ListenableFuture<SendingLinkEndpoint> resumeLink(final Session_1_0 session, final Attach attach)
     {
-        _linkEndpoint = linkEndpoint;
+        if (getSource() == null)
+        {
+            throw new IllegalStateException("Terminus should be set when resuming a Link.");
+        }
+        if (attach.getSource() == null)
+        {
+            throw new IllegalStateException("Attach.getSource should not be null when resuming a Link. That would be recovering the Link.");
+        }
+
+        Source newSource = (Source) attach.getSource();
+        Source oldSource = (Source) getSource();
+
+        try
+        {
+            if (_linkEndpoint == null)
+            {
+                _linkEndpoint = new SendingLinkEndpoint(this);
+
+                final SendingDestination destination = session.getSendingDestination(getName(), oldSource);
+                _linkEndpoint.prepareConsumerOptionsAndFilters(destination);
+            }
+
+            if (_linkEndpoint.getDestination() instanceof ExchangeDestination
+                && !Boolean.TRUE.equals(newSource.getDynamic()))
+            {
+                final SendingDestination newDestination =
+                        session.getSendingDestination(_linkEndpoint.getLinkName(), newSource);
+                if (session.updateSourceForSubscription(_linkEndpoint, newSource, newDestination))
+                {
+                    _linkEndpoint.setDestination(newDestination);
+                }
+            }
+
+            _linkEndpoint.associateSession(session);
+            _linkEndpoint.attachReceived(attach);
+
+            _linkEndpoint.setLocalUnsettled(_linkEndpoint.getUnsettledOutcomeMap());
+        }
+        catch (AmqpErrorException e)
+        {
+            rejectLink(session, attach);
+        }
+        return Futures.immediateFuture(_linkEndpoint);
     }
 
-    public SendingLinkEndpoint getEndpoint()
+    @Override
+    protected ListenableFuture<SendingLinkEndpoint> recoverLink(final Session_1_0 session, final Attach attach)
     {
-        return _linkEndpoint;
+        if (_source == null)
+        {
+            return rejectLink(session, attach);
+        }
+
+        _target = attach.getTarget();
+
+        try
+        {
+            if (_linkEndpoint == null)
+            {
+                _linkEndpoint = new SendingLinkEndpoint(this);
+
+                final SendingDestination destination = session.getSendingDestination(getName(), (Source) _source);
+                _linkEndpoint.prepareConsumerOptionsAndFilters(destination);
+            }
+
+            _linkEndpoint.associateSession(session);
+            _linkEndpoint.attachReceived(attach);
+
+            _linkEndpoint.setLocalUnsettled(_linkEndpoint.getUnsettledOutcomeMap());
+        }
+        catch (AmqpErrorException e)
+        {
+            rejectLink(session, attach);
+        }
+
+        return Futures.immediateFuture(_linkEndpoint);
+    }
+
+    @Override
+    protected ListenableFuture<SendingLinkEndpoint> establishLink(final Session_1_0 session, final Attach attach)
+    {
+        if (_linkEndpoint != null || getSource() != null)
+        {
+            throw new IllegalStateException("LinkEndpoint and Source should be null when establishing a Link.");
+        }
+
+        return reattachLink(session, attach);
     }
 
-    public synchronized void setLinkAttachment(final Session_1_0 session,
-                                               final SendingLinkEndpoint linkEndpoint) throws AmqpErrorException
+    private ListenableFuture<SendingLinkEndpoint> rejectLink(final Session_1_0 session, final Attach attach)
     {
-        _linkEndpoint = linkEndpoint;
-        _linkEndpoint.doLinkAttachment(session, getEndpoint().getConsumer());
+        _linkEndpoint = new SendingLinkEndpoint(this);
+        _linkEndpoint.associateSession(session);
+        _source = null;
+        return Futures.immediateFuture(_linkEndpoint);
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org