You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/03/10 14:53:11 UTC

svn commit: r1786358 - in /qpid/java/trunk: broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ test-profiles/

Author: lquack
Date: Fri Mar 10 14:53:11 2017
New Revision: 1786358

URL: http://svn.apache.org/viewvc?rev=1786358&view=rev
Log:
QPID-7658: [Java Broker] Improve LinkRegistry. Propagate errors during attach back to remote endpoint.

Added:
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
      - copied, changed from r1786342, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
Modified:
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    qpid/java/trunk/test-profiles/JavaBDBExcludes

Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java (from r1786342, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java&r1=1786342&r2=1786358&rev=1786358&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java Fri Mar 10 14:53:11 2017
@@ -47,9 +47,9 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 
-public abstract class LinkEndpoint
+public abstract class AbstractLinkEndpoint implements LinkEndpoint
 {
-    private static final Logger LOGGER = LoggerFactory.getLogger(LinkEndpoint.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractLinkEndpoint.class);
     private final Link_1_0 _link;
     private final Session_1_0 _session;
     private Object _flowTransactionId;
@@ -82,20 +82,12 @@ public abstract class LinkEndpoint
     }
 
 
-    LinkEndpoint(final Session_1_0 session, final Link_1_0 link)
+    AbstractLinkEndpoint(final Session_1_0 session, final Link_1_0 link)
     {
         _session = session;
         _link = link;
     }
 
-    public abstract void start();
-
-    public abstract Role getRole();
-
-    public abstract void flowStateChanged();
-
-    public abstract void receiveFlow(final Flow flow);
-
     protected abstract void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled);
 
     protected abstract void remoteDetachedPerformDetach(final Detach detach);
@@ -103,6 +95,7 @@ public abstract class LinkEndpoint
     protected abstract Map<Symbol,Object> initProperties(final Attach attach);
 
 
+    @Override
     public void receiveAttach(final Attach attach) throws AmqpErrorException
     {
         boolean isAttachingLocalTerminusNull = (attach.getRole() == Role.SENDER ? attach.getTarget() == null : attach.getSource() == null);
@@ -148,6 +141,7 @@ public abstract class LinkEndpoint
         return _stopped;
     }
 
+    @Override
     public void setStopped(final boolean stopped)
     {
         if(_stopped != stopped)
@@ -163,11 +157,13 @@ public abstract class LinkEndpoint
         return _link.getName();
     }
 
+    @Override
     public BaseSource getSource()
     {
         return _link.getSource();
     }
 
+    @Override
     public BaseTarget getTarget()
     {
         return _link.getTarget();
@@ -218,6 +214,7 @@ public abstract class LinkEndpoint
         return _linkCredit;
     }
 
+    @Override
     public void remoteDetached(final Detach detach)
     {
         switch (_state)
@@ -236,6 +233,7 @@ public abstract class LinkEndpoint
     {
     }
 
+    @Override
     public void receiveDeliveryState(final Delivery unsettled,
                                      final DeliveryState state,
                                      final Boolean settled)
@@ -253,7 +251,8 @@ public abstract class LinkEndpoint
 
     }
 
-    void setLocalHandle(final UnsignedInteger localHandle)
+    @Override
+    public void setLocalHandle(final UnsignedInteger localHandle)
     {
         _localHandle = localHandle;
     }
@@ -268,23 +267,27 @@ public abstract class LinkEndpoint
         return _state == State.DETACHED || _session.isEnded();
     }
 
+    @Override
     public Session_1_0 getSession()
     {
         return _session;
     }
 
+    @Override
     public void destroy()
     {
         setLocalHandle(null);
         getLink().discardEndpoint();
     }
 
-    UnsignedInteger getLocalHandle()
+    @Override
+    public UnsignedInteger getLocalHandle()
     {
         return _localHandle;
     }
 
-    public void attach()
+    @Override
+    public void sendAttach()
     {
         Attach attachToSend = new Attach();
         attachToSend.setName(getLinkName());
@@ -334,6 +337,7 @@ public abstract class LinkEndpoint
         detach(error, false);
     }
 
+    @Override
     public void close(Error error)
     {
         detach(error, true);
@@ -351,8 +355,10 @@ public abstract class LinkEndpoint
                 _state = State.DETACHED;
                 break;
             default:
+                // "silent link stealing"
                 if (close)
                 {
+                    getSession().dissociateEndpoint(this);
                     destroy();
                     _link.linkClosed();
                 }

Added: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java?rev=1786358&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java Fri Mar 10 14:53:11 2017
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+
+public class ErrantLinkEndpoint implements LinkEndpoint
+{
+    private final Link_1_0 _link;
+    private final Session_1_0 _session;
+    private Error _error;
+    private UnsignedInteger _localHandle;
+
+    ErrantLinkEndpoint(Link_1_0 link, Session_1_0 session, Error error)
+    {
+        _link = link;
+        _session = session;
+        _error = error;
+    }
+
+    @Override
+    public Role getRole()
+    {
+        return _link.getRole();
+    }
+
+    @Override
+    public BaseSource getSource()
+    {
+        return null;
+    }
+
+    @Override
+    public BaseTarget getTarget()
+    {
+        return null;
+    }
+
+    @Override
+    public Session_1_0 getSession()
+    {
+        return _session;
+    }
+
+    @Override
+    public UnsignedInteger getLocalHandle()
+    {
+        return _localHandle;
+    }
+
+    @Override
+    public void setLocalHandle(final UnsignedInteger localHandle)
+    {
+        _localHandle = localHandle;
+    }
+
+    @Override
+    public void sendAttach()
+    {
+        Attach attachToSend = new Attach();
+        attachToSend.setName(_link.getName());
+        attachToSend.setRole(getRole());
+        attachToSend.setHandle(getLocalHandle());
+        attachToSend.setSource(getSource());
+        attachToSend.setTarget(getTarget());
+        _session.sendAttach(attachToSend);
+    }
+
+    @Override
+    public void receiveAttach(final Attach attach) throws AmqpErrorException
+    {
+        throw new UnsupportedOperationException("This Link is errant");
+    }
+
+    @Override
+    public void destroy()
+    {
+        setLocalHandle(null);
+        _link.discardEndpoint();
+    }
+
+    public void closeWithError()
+    {
+        close(_error);
+    }
+
+    @Override
+    public void close(final Error error)
+    {
+        Detach detach = new Detach();
+        detach.setHandle(_localHandle);
+        detach.setClosed(true);
+        detach.setError(error);
+        _session.sendDetach(detach);
+        _session.dissociateEndpoint(this);
+        destroy();
+        _link.linkClosed();
+    }
+
+    @Override
+    public void remoteDetached(final Detach detach)
+    {
+        // ignore
+    }
+
+    @Override
+    public void receiveFlow(final Flow flow)
+    {
+        throw new UnsupportedOperationException("This Link is errant");
+    }
+
+    @Override
+    public void flowStateChanged()
+    {
+        throw new UnsupportedOperationException("This Link is errant");
+    }
+
+    @Override
+    public void start()
+    {
+        throw new UnsupportedOperationException("This Link is errant");
+    }
+
+    @Override
+    public void setStopped(final boolean stopped)
+    {
+        throw new UnsupportedOperationException("This Link is errant");
+    }
+
+    @Override
+    public void receiveDeliveryState(final Delivery unsettled, final DeliveryState state, final Boolean settled)
+    {
+        throw new UnsupportedOperationException("This Link is errant");
+    }
+}

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java?rev=1786358&r1=1786357&r2=1786358&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java Fri Mar 10 14:53:11 2017
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,499 +15,54 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
 
 package org.apache.qpid.server.protocol.v1_0;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
 import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
-import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
-import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
-import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
-import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
-import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 
-public abstract class LinkEndpoint
+public interface LinkEndpoint
 {
-    private static final Logger LOGGER = LoggerFactory.getLogger(LinkEndpoint.class);
-    private final Link_1_0 _link;
-    private final Session_1_0 _session;
-    private Object _flowTransactionId;
-    private SenderSettleMode _sendingSettlementMode;
-    private ReceiverSettleMode _receivingSettlementMode;
-    private Map _initialUnsettledMap;
-    private UnsignedInteger _lastSentCreditLimit;
-    private volatile boolean _stopped;
-    private volatile boolean _stoppedUpdated;
-    private Symbol[] _capabilities;
-    private UnsignedInteger _deliveryCount;
-    private UnsignedInteger _linkCredit;
-    private UnsignedInteger _available;
-    private Boolean _drain;
-    private UnsignedInteger _localHandle;
-    private UnsignedLong _maxMessageSize;
-    private Map<Symbol, Object> _properties;
-
-    protected volatile State _state = State.ATTACH_RECVD;
-    protected Map _localUnsettled;
-
-    protected enum State
-    {
-        DETACHED,
-        ATTACH_SENT,
-        ATTACH_RECVD,
-        ATTACHED,
-        DETACH_SENT,
-        DETACH_RECVD
-    }
-
-
-    LinkEndpoint(final Session_1_0 session, final Link_1_0 link)
-    {
-        _session = session;
-        _link = link;
-    }
-
-    public abstract void start();
-
-    public abstract Role getRole();
-
-    public abstract void flowStateChanged();
-
-    public abstract void receiveFlow(final Flow flow);
-
-    protected abstract void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled);
-
-    protected abstract void remoteDetachedPerformDetach(final Detach detach);
-
-    protected abstract Map<Symbol,Object> initProperties(final Attach attach);
-
-
-    public void receiveAttach(final Attach attach) throws AmqpErrorException
-    {
-        boolean isAttachingLocalTerminusNull = (attach.getRole() == Role.SENDER ? attach.getTarget() == null : attach.getSource() == null);
-        boolean isLocalTerminusNull = (attach.getRole() == Role.SENDER ? getTarget() == null : getSource() == null);
-
-        if (isAttachingLocalTerminusNull)
-        {
-            recoverLink(attach);
-        }
-        else if (isLocalTerminusNull)
-        {
-            establishLink(attach);
-        }
-        else if (attach.getUnsettled() != null)
-        {
-            resumeLink(attach);
-        }
-        else
-        {
-            reattachLink(attach);
-        }
-    }
-
-    protected abstract void reattachLink(final Attach attach) throws AmqpErrorException;
-
-    protected abstract void resumeLink(final Attach attach) throws AmqpErrorException;
-
-    protected abstract void establishLink(final Attach attach) throws AmqpErrorException;
-
-    protected abstract void recoverLink(final Attach attach) throws AmqpErrorException;
-
-    public void attachReceived(final Attach attach) throws AmqpErrorException
-    {
-        _sendingSettlementMode = attach.getSndSettleMode();
-        _receivingSettlementMode = attach.getRcvSettleMode();
-        _initialUnsettledMap = attach.getUnsettled();
-        _properties = initProperties(attach);
-        _state = State.ATTACH_RECVD;
-    }
-
-    public boolean isStopped()
-    {
-        return _stopped;
-    }
-
-    public void setStopped(final boolean stopped)
-    {
-        if(_stopped != stopped)
-        {
-            _stopped = stopped;
-            _stoppedUpdated = true;
-            sendFlowConditional();
-        }
-    }
-
-    public String getLinkName()
-    {
-        return _link.getName();
-    }
-
-    public BaseSource getSource()
-    {
-        return _link.getSource();
-    }
-
-    public BaseTarget getTarget()
-    {
-        return _link.getTarget();
-    }
-
-    public NamedAddressSpace getAddressSpace()
-    {
-        return getSession().getConnection().getAddressSpace();
-    }
-
-    public void setDeliveryCount(final UnsignedInteger deliveryCount)
-    {
-        _deliveryCount = deliveryCount;
-    }
-
-    public void setLinkCredit(final UnsignedInteger linkCredit)
-    {
-        _linkCredit = linkCredit;
-    }
-
-    public void setAvailable(final UnsignedInteger available)
-    {
-        _available = available;
-    }
-
-    public void setDrain(final Boolean drain)
-    {
-        _drain = drain;
-    }
-
-    public UnsignedInteger getDeliveryCount()
-    {
-        return _deliveryCount;
-    }
-
-    public UnsignedInteger getAvailable()
-    {
-        return _available;
-    }
-
-    public Boolean getDrain()
-    {
-        return _drain;
-    }
-
-    public UnsignedInteger getLinkCredit()
-    {
-        return _linkCredit;
-    }
-
-    public void remoteDetached(final Detach detach)
-    {
-        switch (_state)
-        {
-            case DETACH_SENT:
-                _state = State.DETACHED;
-                break;
-            case ATTACHED:
-                _state = State.DETACH_RECVD;
-                remoteDetachedPerformDetach(detach);
-                break;
-        }
-    }
-
-    public void addUnsettled(final Delivery unsettled)
-    {
-    }
-
-    public void receiveDeliveryState(final Delivery unsettled,
-                                     final DeliveryState state,
-                                     final Boolean settled)
-    {
-        handle(unsettled.getDeliveryTag(), state, settled);
-
-        if (Boolean.TRUE.equals(settled))
-        {
-            settle(unsettled.getDeliveryTag());
-        }
-    }
-
-    public void settle(final Binary deliveryTag)
-    {
-
-    }
-
-    void setLocalHandle(final UnsignedInteger localHandle)
-    {
-        _localHandle = localHandle;
-    }
-
-    boolean isAttached()
-    {
-        return _state == State.ATTACHED;
-    }
-
-    boolean isDetached()
-    {
-        return _state == State.DETACHED || _session.isEnded();
-    }
-
-    public Session_1_0 getSession()
-    {
-        return _session;
-    }
-
-    public void destroy()
-    {
-        setLocalHandle(null);
-        getLink().discardEndpoint();
-    }
-
-    UnsignedInteger getLocalHandle()
-    {
-        return _localHandle;
-    }
-
-    public void attach()
-    {
-        Attach attachToSend = new Attach();
-        attachToSend.setName(getLinkName());
-        attachToSend.setRole(getRole());
-        attachToSend.setHandle(getLocalHandle());
-        attachToSend.setSource(getSource());
-        attachToSend.setTarget(getTarget());
-        attachToSend.setSndSettleMode(getSendingSettlementMode());
-        attachToSend.setRcvSettleMode(getReceivingSettlementMode());
-        attachToSend.setUnsettled(_localUnsettled);
-        attachToSend.setProperties(_properties);
-        attachToSend.setOfferedCapabilities(_capabilities);
-
-        if (getRole() == Role.SENDER)
-        {
-            attachToSend.setInitialDeliveryCount(_deliveryCount);
-        }
-
-        switch (_state)
-        {
-            case DETACHED:
-                _state = State.ATTACH_SENT;
-                break;
-            case ATTACH_RECVD:
-                _state = State.ATTACHED;
-                break;
-            default:
-                throw new UnsupportedOperationException(_state.toString());
-        }
-
-        getSession().sendAttach(attachToSend);
-
-    }
-
-    public void detach()
-    {
-        detach(null, false);
-    }
-
-    public void close()
-    {
-        detach(null, true);
-    }
-
-    public void detach(Error error)
-    {
-        detach(error, false);
-    }
-
-    public void close(Error error)
-    {
-        detach(error, true);
-    }
-
-    private void detach(Error error, boolean close)
-    {
-        //TODO
-        switch (_state)
-        {
-            case ATTACHED:
-                _state = State.DETACH_SENT;
-                break;
-            case DETACH_RECVD:
-                _state = State.DETACHED;
-                break;
-            default:
-                if (close)
-                {
-                    destroy();
-                    _link.linkClosed();
-                }
-                return;
-        }
-
-        if (getSession().getSessionState() != SessionState.END_RECVD && !getSession().isEnded())
-        {
-            Detach detach = new Detach();
-            detach.setHandle(getLocalHandle());
-            if (close)
-            {
-                detach.setClosed(close);
-            }
-            detach.setError(error);
-
-            getSession().sendDetach(detach);
-        }
-
-        if (close)
-        {
-            destroy();
-            _link.linkClosed();
-        }
-        setLocalHandle(null);
-    }
-
-    public void setTransactionId(final Object txnId)
-    {
-        _flowTransactionId = txnId;
-    }
-
-    public void sendFlowConditional()
-    {
-        if(_lastSentCreditLimit != null)
-        {
-            if(_stoppedUpdated)
-            {
-                sendFlow(_flowTransactionId != null);
-                _stoppedUpdated = false;
-            }
-            else
-            {
-                UnsignedInteger clientsCredit = _lastSentCreditLimit.subtract(_deliveryCount);
-
-                // client has used up over half their credit allowance ?
-                boolean sendFlow = _linkCredit.subtract(clientsCredit).compareTo(clientsCredit) >= 0;
-                if (sendFlow)
-                {
-                    sendFlow(_flowTransactionId != null);
-                }
-                else
-                {
-                    getSession().sendFlowConditional();
-                }
-            }
-        }
-        else
-        {
-            sendFlow(_flowTransactionId != null);
-        }
-    }
-
-
-    public void sendFlow()
-    {
-        sendFlow(_flowTransactionId != null);
-    }
-
-    public void sendFlowWithEcho()
-    {
-        sendFlow(_flowTransactionId != null, true);
-    }
-
-
-    public void sendFlow(boolean setTransactionId)
-    {
-        sendFlow(setTransactionId, false);
-    }
-
-    public void sendFlow(boolean setTransactionId, boolean echo)
-    {
-        if(_state == State.ATTACHED || _state == State.ATTACH_SENT)
-        {
-            Flow flow = new Flow();
-            flow.setDeliveryCount(_deliveryCount);
-            flow.setEcho(echo);
-            if(_stopped)
-            {
-                flow.setLinkCredit(UnsignedInteger.ZERO);
-                flow.setDrain(true);
-                _lastSentCreditLimit = _deliveryCount;
-            }
-            else
-            {
-                flow.setLinkCredit(_linkCredit);
-                _lastSentCreditLimit = _linkCredit.add(_deliveryCount);
-                flow.setDrain(_drain);
-            }
-            flow.setAvailable(_available);
-            if(setTransactionId)
-            {
-                flow.setProperties(Collections.singletonMap(Symbol.valueOf("txn-id"), _flowTransactionId));
-            }
-            flow.setHandle(getLocalHandle());
-            getSession().sendFlow(flow);
-        }
-    }
-
-    public Link_1_0 getLink()
-    {
-        return _link;
-    }
-
-    public SenderSettleMode getSendingSettlementMode()
-    {
-        return _sendingSettlementMode;
-    }
-
-    public ReceiverSettleMode getReceivingSettlementMode()
-    {
-        return _receivingSettlementMode;
-    }
-
-    public List<Symbol> getCapabilities()
-    {
-        return _capabilities == null ? null : Collections.unmodifiableList(Arrays.asList(_capabilities));
-    }
-
-    public void setCapabilities(Collection<Symbol> capabilities)
-    {
-        _capabilities = capabilities == null ? null : capabilities.toArray(new Symbol[capabilities.size()]);
-    }
-
-    public Map getInitialUnsettledMap()
-    {
-        return _initialUnsettledMap;
-    }
-
-    public abstract void initialiseUnsettled();
-
-    @Override public String toString()
-    {
-        return "LinkEndpoint{" +
-               "_name='" + getLinkName() + '\'' +
-               ", _session=" + _session +
-               ", _state=" + _state +
-               ", _role=" + getRole() +
-               ", _source=" + getSource() +
-               ", _target=" + getTarget() +
-               ", _transferCount=" + _deliveryCount +
-               ", _linkCredit=" + _linkCredit +
-               ", _available=" + _available +
-               ", _drain=" + _drain +
-               ", _localHandle=" + _localHandle +
-               ", _maxMessageSize=" + _maxMessageSize +
-               '}';
-    }
+    Role getRole();
+
+    BaseSource getSource();
+
+    BaseTarget getTarget();
+
+    Session_1_0 getSession();
+
+    UnsignedInteger getLocalHandle();
+
+    void setLocalHandle(UnsignedInteger localHandle);
+
+    void receiveAttach(Attach attach) throws AmqpErrorException;
+
+    void sendAttach();
+
+    void remoteDetached(Detach detach);
+
+    void receiveDeliveryState(Delivery unsettled,
+                              DeliveryState state,
+                              Boolean settled);
+
+    void receiveFlow(Flow flow);
+
+    void flowStateChanged();
+
+    void start();
+
+    void setStopped(boolean stopped);
+
+    void destroy();
+
+    void close(Error error);
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java?rev=1786358&r1=1786357&r2=1786358&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java Fri Mar 10 14:53:11 2017
@@ -27,10 +27,12 @@ import com.google.common.util.concurrent
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
 import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
@@ -42,11 +44,12 @@ public class LinkImpl implements Link_1_
     private static final Logger LOGGER = LoggerFactory.getLogger(LinkImpl.class);
 
     private final String _linkName;
+
     private final Role _role;
+
     private volatile LinkEndpoint _linkEndpoint;
     private volatile BaseSource _source;
     private volatile BaseTarget _target;
-
     LinkImpl(final String linkName, final Role role)
     {
         _linkName = linkName;
@@ -54,13 +57,13 @@ public class LinkImpl implements Link_1_
     }
 
     @Override
-    public final ListenableFuture<LinkEndpoint> attach(final Session_1_0 session, final Attach attach)
+    public final ListenableFuture<? extends LinkEndpoint> attach(final Session_1_0 session, final Attach attach)
     {
         try
         {
             if (_role == attach.getRole())
             {
-                return rejectLink(session);
+                throw new AmqpErrorException(new Error(AmqpError.ILLEGAL_STATE, "Cannot switch SendingLink to ReceivingLink and vice versa"));
             }
 
 
@@ -82,12 +85,12 @@ public class LinkImpl implements Link_1_
                 }
 
                 _linkEndpoint.receiveAttach(attach);
-                return Futures.immediateFuture(_linkEndpoint);
+                return Futures.immediateFuture((LinkEndpoint) _linkEndpoint);
             }
         }
         catch (Throwable t)
         {
-            return rejectLink(session);
+            return rejectLink(session, t);
         }
     }
 
@@ -142,14 +145,20 @@ public class LinkImpl implements Link_1_
         return linkEndpoint;
     }
 
-
-    private ListenableFuture<LinkEndpoint> rejectLink(final Session_1_0 session)
+    private ListenableFuture<? extends LinkEndpoint> rejectLink(final Session_1_0 session, Throwable t)
     {
-        _linkEndpoint = new SendingLinkEndpoint(session, this);
-        _source = null;
+        if (t instanceof AmqpErrorException)
+        {
+            _linkEndpoint = new ErrantLinkEndpoint(this, session, ((AmqpErrorException) t).getError());
+        }
+        else
+        {
+            _linkEndpoint = new ErrantLinkEndpoint(this, session, new Error(AmqpError.INTERNAL_ERROR, t.getMessage()));
+        }
         return Futures.immediateFuture(_linkEndpoint);
     }
 
+
     @Override
     public void linkClosed()
     {
@@ -169,6 +178,12 @@ public class LinkImpl implements Link_1_
     }
 
     @Override
+    public Role getRole()
+    {
+        return _role;
+    }
+
+    @Override
     public BaseSource getSource()
     {
         return _source;

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java?rev=1786358&r1=1786357&r2=1786358&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java Fri Mar 10 14:53:11 2017
@@ -26,10 +26,11 @@ import org.apache.qpid.server.protocol.L
 import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
 import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 
 public interface Link_1_0 extends LinkModel
 {
-    ListenableFuture<LinkEndpoint> attach(Session_1_0 session, final Attach attach);
+    ListenableFuture<? extends LinkEndpoint> attach(Session_1_0 session, final Attach attach);
 
     void linkClosed();
 
@@ -37,6 +38,8 @@ public interface Link_1_0 extends LinkMo
 
     String getName();
 
+    Role getRole();
+
     BaseSource getSource();
 
     BaseTarget getTarget();

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java?rev=1786358&r1=1786357&r2=1786358&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java Fri Mar 10 14:53:11 2017
@@ -39,7 +39,7 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 
-public abstract class ReceivingLinkEndpoint extends LinkEndpoint
+public abstract class ReceivingLinkEndpoint extends AbstractLinkEndpoint
 {
     private final SectionDecoder _sectionDecoder;
     private UnsignedInteger _lastDeliveryId;

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java?rev=1786358&r1=1786357&r2=1786358&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java Fri Mar 10 14:53:11 2017
@@ -74,7 +74,7 @@ import org.apache.qpid.server.txn.Server
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 
-public class SendingLinkEndpoint extends LinkEndpoint
+public class SendingLinkEndpoint extends AbstractLinkEndpoint
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(SendingLinkEndpoint.class);
 

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1786358&r1=1786357&r2=1786358&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Fri Mar 10 14:53:11 2017
@@ -223,7 +223,7 @@ public class Session_1_0 extends Abstrac
                     link = getAddressSpace().getReceivingLink(getConnection().getRemoteContainerId(), attach.getName());
                 }
 
-                final ListenableFuture<LinkEndpoint> future = link.attach(this, attach);
+                final ListenableFuture<? extends LinkEndpoint> future = link.attach(this, attach);
 
                 addFutureCallback(future, new EndpointCreationCallback(attach), MoreExecutors.directExecutor());
             }
@@ -1510,6 +1510,19 @@ public class Session_1_0 extends Abstrac
         return "Session_1_0[" + _connection + ": " + _sendingChannel + ']';
     }
 
+    public void dissociateEndpoint(LinkEndpoint linkEndpoint)
+    {
+        for (Map.Entry<UnsignedInteger, LinkEndpoint> entry : _inputHandleToEndpoint.entrySet())
+        {
+            if (entry.getValue() == linkEndpoint)
+            {
+                _inputHandleToEndpoint.remove(entry.getKey());
+                break;
+            }
+        }
+        _endpointToOutputHandle.remove(linkEndpoint);
+        _associatedLinkEndpoints.remove(linkEndpoint);
+    }
 
     private void detach(UnsignedInteger handle, Detach detach)
     {
@@ -1587,7 +1600,7 @@ public class Session_1_0 extends Abstrac
         return primaryDomain;
     }
 
-    private class EndpointCreationCallback implements FutureCallback<LinkEndpoint>
+    private class EndpointCreationCallback<T extends LinkEndpoint> implements FutureCallback<T>
     {
 
         private final Attach _attach;
@@ -1607,9 +1620,14 @@ public class Session_1_0 extends Abstrac
                 {
                     _associatedLinkEndpoints.add(endpoint);
                     endpoint.setLocalHandle(findNextAvailableOutputHandle());
-                    if (attachWasUnsuccessful(endpoint))
+                    if (endpoint instanceof ErrantLinkEndpoint)
+                    {
+                        endpoint.sendAttach();
+                        ((ErrantLinkEndpoint) endpoint).closeWithError();
+                    }
+                    else if (attachWasUnsuccessful(endpoint))
                     {
-                        endpoint.attach();
+                        endpoint.sendAttach();
 
                         Error error = new Error();
                         error.setCondition(AmqpError.NOT_FOUND);
@@ -1628,7 +1646,7 @@ public class Session_1_0 extends Abstrac
                         if (!_endpointToOutputHandle.containsKey(endpoint))
                         {
                             _endpointToOutputHandle.put(endpoint, endpoint.getLocalHandle());
-                            endpoint.attach();
+                            endpoint.sendAttach();
                             endpoint.start();
                         }
                         else

Modified: qpid/java/trunk/test-profiles/JavaBDBExcludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/JavaBDBExcludes?rev=1786358&r1=1786357&r2=1786358&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/JavaBDBExcludes (original)
+++ qpid/java/trunk/test-profiles/JavaBDBExcludes Fri Mar 10 14:53:11 2017
@@ -17,3 +17,8 @@
 // under the License.
 //
 
+
+
+// Links are currently not persisted so when the broker is restarted this test fails
+// When Link persistance is implemented (QPID-7663) this test should be included again
+org.apache.qpid.systests.jms_2_0.subscription.SharedSubscriptionTest#testUnsubscribe



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org