You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/03/10 14:53:11 UTC
svn commit: r1786358 - in /qpid/java/trunk:
broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/
test-profiles/
Author: lquack
Date: Fri Mar 10 14:53:11 2017
New Revision: 1786358
URL: http://svn.apache.org/viewvc?rev=1786358&view=rev
Log:
QPID-7658: [Java Broker] Improve LinkRegistry. Propagate errors during attach back to remote endpoint.
Added:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
- copied, changed from r1786342, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/java/trunk/test-profiles/JavaBDBExcludes
Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java (from r1786342, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java&r1=1786342&r2=1786358&rev=1786358&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java Fri Mar 10 14:53:11 2017
@@ -47,9 +47,9 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
-public abstract class LinkEndpoint
+public abstract class AbstractLinkEndpoint implements LinkEndpoint
{
- private static final Logger LOGGER = LoggerFactory.getLogger(LinkEndpoint.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractLinkEndpoint.class);
private final Link_1_0 _link;
private final Session_1_0 _session;
private Object _flowTransactionId;
@@ -82,20 +82,12 @@ public abstract class LinkEndpoint
}
- LinkEndpoint(final Session_1_0 session, final Link_1_0 link)
+ AbstractLinkEndpoint(final Session_1_0 session, final Link_1_0 link)
{
_session = session;
_link = link;
}
- public abstract void start();
-
- public abstract Role getRole();
-
- public abstract void flowStateChanged();
-
- public abstract void receiveFlow(final Flow flow);
-
protected abstract void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled);
protected abstract void remoteDetachedPerformDetach(final Detach detach);
@@ -103,6 +95,7 @@ public abstract class LinkEndpoint
protected abstract Map<Symbol,Object> initProperties(final Attach attach);
+ @Override
public void receiveAttach(final Attach attach) throws AmqpErrorException
{
boolean isAttachingLocalTerminusNull = (attach.getRole() == Role.SENDER ? attach.getTarget() == null : attach.getSource() == null);
@@ -148,6 +141,7 @@ public abstract class LinkEndpoint
return _stopped;
}
+ @Override
public void setStopped(final boolean stopped)
{
if(_stopped != stopped)
@@ -163,11 +157,13 @@ public abstract class LinkEndpoint
return _link.getName();
}
+ @Override
public BaseSource getSource()
{
return _link.getSource();
}
+ @Override
public BaseTarget getTarget()
{
return _link.getTarget();
@@ -218,6 +214,7 @@ public abstract class LinkEndpoint
return _linkCredit;
}
+ @Override
public void remoteDetached(final Detach detach)
{
switch (_state)
@@ -236,6 +233,7 @@ public abstract class LinkEndpoint
{
}
+ @Override
public void receiveDeliveryState(final Delivery unsettled,
final DeliveryState state,
final Boolean settled)
@@ -253,7 +251,8 @@ public abstract class LinkEndpoint
}
- void setLocalHandle(final UnsignedInteger localHandle)
+ @Override
+ public void setLocalHandle(final UnsignedInteger localHandle)
{
_localHandle = localHandle;
}
@@ -268,23 +267,27 @@ public abstract class LinkEndpoint
return _state == State.DETACHED || _session.isEnded();
}
+ @Override
public Session_1_0 getSession()
{
return _session;
}
+ @Override
public void destroy()
{
setLocalHandle(null);
getLink().discardEndpoint();
}
- UnsignedInteger getLocalHandle()
+ @Override
+ public UnsignedInteger getLocalHandle()
{
return _localHandle;
}
- public void attach()
+ @Override
+ public void sendAttach()
{
Attach attachToSend = new Attach();
attachToSend.setName(getLinkName());
@@ -334,6 +337,7 @@ public abstract class LinkEndpoint
detach(error, false);
}
+ @Override
public void close(Error error)
{
detach(error, true);
@@ -351,8 +355,10 @@ public abstract class LinkEndpoint
_state = State.DETACHED;
break;
default:
+ // "silent link stealing"
if (close)
{
+ getSession().dissociateEndpoint(this);
destroy();
_link.linkClosed();
}
Added: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java?rev=1786358&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java Fri Mar 10 14:53:11 2017
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+
+public class ErrantLinkEndpoint implements LinkEndpoint
+{
+ private final Link_1_0 _link;
+ private final Session_1_0 _session;
+ private Error _error;
+ private UnsignedInteger _localHandle;
+
+ ErrantLinkEndpoint(Link_1_0 link, Session_1_0 session, Error error)
+ {
+ _link = link;
+ _session = session;
+ _error = error;
+ }
+
+ @Override
+ public Role getRole()
+ {
+ return _link.getRole();
+ }
+
+ @Override
+ public BaseSource getSource()
+ {
+ return null;
+ }
+
+ @Override
+ public BaseTarget getTarget()
+ {
+ return null;
+ }
+
+ @Override
+ public Session_1_0 getSession()
+ {
+ return _session;
+ }
+
+ @Override
+ public UnsignedInteger getLocalHandle()
+ {
+ return _localHandle;
+ }
+
+ @Override
+ public void setLocalHandle(final UnsignedInteger localHandle)
+ {
+ _localHandle = localHandle;
+ }
+
+ @Override
+ public void sendAttach()
+ {
+ Attach attachToSend = new Attach();
+ attachToSend.setName(_link.getName());
+ attachToSend.setRole(getRole());
+ attachToSend.setHandle(getLocalHandle());
+ attachToSend.setSource(getSource());
+ attachToSend.setTarget(getTarget());
+ _session.sendAttach(attachToSend);
+ }
+
+ @Override
+ public void receiveAttach(final Attach attach) throws AmqpErrorException
+ {
+ throw new UnsupportedOperationException("This Link is errant");
+ }
+
+ @Override
+ public void destroy()
+ {
+ setLocalHandle(null);
+ _link.discardEndpoint();
+ }
+
+ public void closeWithError()
+ {
+ close(_error);
+ }
+
+ @Override
+ public void close(final Error error)
+ {
+ Detach detach = new Detach();
+ detach.setHandle(_localHandle);
+ detach.setClosed(true);
+ detach.setError(error);
+ _session.sendDetach(detach);
+ _session.dissociateEndpoint(this);
+ destroy();
+ _link.linkClosed();
+ }
+
+ @Override
+ public void remoteDetached(final Detach detach)
+ {
+ // ignore
+ }
+
+ @Override
+ public void receiveFlow(final Flow flow)
+ {
+ throw new UnsupportedOperationException("This Link is errant");
+ }
+
+ @Override
+ public void flowStateChanged()
+ {
+ throw new UnsupportedOperationException("This Link is errant");
+ }
+
+ @Override
+ public void start()
+ {
+ throw new UnsupportedOperationException("This Link is errant");
+ }
+
+ @Override
+ public void setStopped(final boolean stopped)
+ {
+ throw new UnsupportedOperationException("This Link is errant");
+ }
+
+ @Override
+ public void receiveDeliveryState(final Delivery unsettled, final DeliveryState state, final Boolean settled)
+ {
+ throw new UnsupportedOperationException("This Link is errant");
+ }
+}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java?rev=1786358&r1=1786357&r2=1786358&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java Fri Mar 10 14:53:11 2017
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,499 +15,54 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
package org.apache.qpid.server.protocol.v1_0;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
-import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
-import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
-import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
-import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
-import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
-public abstract class LinkEndpoint
+public interface LinkEndpoint
{
- private static final Logger LOGGER = LoggerFactory.getLogger(LinkEndpoint.class);
- private final Link_1_0 _link;
- private final Session_1_0 _session;
- private Object _flowTransactionId;
- private SenderSettleMode _sendingSettlementMode;
- private ReceiverSettleMode _receivingSettlementMode;
- private Map _initialUnsettledMap;
- private UnsignedInteger _lastSentCreditLimit;
- private volatile boolean _stopped;
- private volatile boolean _stoppedUpdated;
- private Symbol[] _capabilities;
- private UnsignedInteger _deliveryCount;
- private UnsignedInteger _linkCredit;
- private UnsignedInteger _available;
- private Boolean _drain;
- private UnsignedInteger _localHandle;
- private UnsignedLong _maxMessageSize;
- private Map<Symbol, Object> _properties;
-
- protected volatile State _state = State.ATTACH_RECVD;
- protected Map _localUnsettled;
-
- protected enum State
- {
- DETACHED,
- ATTACH_SENT,
- ATTACH_RECVD,
- ATTACHED,
- DETACH_SENT,
- DETACH_RECVD
- }
-
-
- LinkEndpoint(final Session_1_0 session, final Link_1_0 link)
- {
- _session = session;
- _link = link;
- }
-
- public abstract void start();
-
- public abstract Role getRole();
-
- public abstract void flowStateChanged();
-
- public abstract void receiveFlow(final Flow flow);
-
- protected abstract void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled);
-
- protected abstract void remoteDetachedPerformDetach(final Detach detach);
-
- protected abstract Map<Symbol,Object> initProperties(final Attach attach);
-
-
- public void receiveAttach(final Attach attach) throws AmqpErrorException
- {
- boolean isAttachingLocalTerminusNull = (attach.getRole() == Role.SENDER ? attach.getTarget() == null : attach.getSource() == null);
- boolean isLocalTerminusNull = (attach.getRole() == Role.SENDER ? getTarget() == null : getSource() == null);
-
- if (isAttachingLocalTerminusNull)
- {
- recoverLink(attach);
- }
- else if (isLocalTerminusNull)
- {
- establishLink(attach);
- }
- else if (attach.getUnsettled() != null)
- {
- resumeLink(attach);
- }
- else
- {
- reattachLink(attach);
- }
- }
-
- protected abstract void reattachLink(final Attach attach) throws AmqpErrorException;
-
- protected abstract void resumeLink(final Attach attach) throws AmqpErrorException;
-
- protected abstract void establishLink(final Attach attach) throws AmqpErrorException;
-
- protected abstract void recoverLink(final Attach attach) throws AmqpErrorException;
-
- public void attachReceived(final Attach attach) throws AmqpErrorException
- {
- _sendingSettlementMode = attach.getSndSettleMode();
- _receivingSettlementMode = attach.getRcvSettleMode();
- _initialUnsettledMap = attach.getUnsettled();
- _properties = initProperties(attach);
- _state = State.ATTACH_RECVD;
- }
-
- public boolean isStopped()
- {
- return _stopped;
- }
-
- public void setStopped(final boolean stopped)
- {
- if(_stopped != stopped)
- {
- _stopped = stopped;
- _stoppedUpdated = true;
- sendFlowConditional();
- }
- }
-
- public String getLinkName()
- {
- return _link.getName();
- }
-
- public BaseSource getSource()
- {
- return _link.getSource();
- }
-
- public BaseTarget getTarget()
- {
- return _link.getTarget();
- }
-
- public NamedAddressSpace getAddressSpace()
- {
- return getSession().getConnection().getAddressSpace();
- }
-
- public void setDeliveryCount(final UnsignedInteger deliveryCount)
- {
- _deliveryCount = deliveryCount;
- }
-
- public void setLinkCredit(final UnsignedInteger linkCredit)
- {
- _linkCredit = linkCredit;
- }
-
- public void setAvailable(final UnsignedInteger available)
- {
- _available = available;
- }
-
- public void setDrain(final Boolean drain)
- {
- _drain = drain;
- }
-
- public UnsignedInteger getDeliveryCount()
- {
- return _deliveryCount;
- }
-
- public UnsignedInteger getAvailable()
- {
- return _available;
- }
-
- public Boolean getDrain()
- {
- return _drain;
- }
-
- public UnsignedInteger getLinkCredit()
- {
- return _linkCredit;
- }
-
- public void remoteDetached(final Detach detach)
- {
- switch (_state)
- {
- case DETACH_SENT:
- _state = State.DETACHED;
- break;
- case ATTACHED:
- _state = State.DETACH_RECVD;
- remoteDetachedPerformDetach(detach);
- break;
- }
- }
-
- public void addUnsettled(final Delivery unsettled)
- {
- }
-
- public void receiveDeliveryState(final Delivery unsettled,
- final DeliveryState state,
- final Boolean settled)
- {
- handle(unsettled.getDeliveryTag(), state, settled);
-
- if (Boolean.TRUE.equals(settled))
- {
- settle(unsettled.getDeliveryTag());
- }
- }
-
- public void settle(final Binary deliveryTag)
- {
-
- }
-
- void setLocalHandle(final UnsignedInteger localHandle)
- {
- _localHandle = localHandle;
- }
-
- boolean isAttached()
- {
- return _state == State.ATTACHED;
- }
-
- boolean isDetached()
- {
- return _state == State.DETACHED || _session.isEnded();
- }
-
- public Session_1_0 getSession()
- {
- return _session;
- }
-
- public void destroy()
- {
- setLocalHandle(null);
- getLink().discardEndpoint();
- }
-
- UnsignedInteger getLocalHandle()
- {
- return _localHandle;
- }
-
- public void attach()
- {
- Attach attachToSend = new Attach();
- attachToSend.setName(getLinkName());
- attachToSend.setRole(getRole());
- attachToSend.setHandle(getLocalHandle());
- attachToSend.setSource(getSource());
- attachToSend.setTarget(getTarget());
- attachToSend.setSndSettleMode(getSendingSettlementMode());
- attachToSend.setRcvSettleMode(getReceivingSettlementMode());
- attachToSend.setUnsettled(_localUnsettled);
- attachToSend.setProperties(_properties);
- attachToSend.setOfferedCapabilities(_capabilities);
-
- if (getRole() == Role.SENDER)
- {
- attachToSend.setInitialDeliveryCount(_deliveryCount);
- }
-
- switch (_state)
- {
- case DETACHED:
- _state = State.ATTACH_SENT;
- break;
- case ATTACH_RECVD:
- _state = State.ATTACHED;
- break;
- default:
- throw new UnsupportedOperationException(_state.toString());
- }
-
- getSession().sendAttach(attachToSend);
-
- }
-
- public void detach()
- {
- detach(null, false);
- }
-
- public void close()
- {
- detach(null, true);
- }
-
- public void detach(Error error)
- {
- detach(error, false);
- }
-
- public void close(Error error)
- {
- detach(error, true);
- }
-
- private void detach(Error error, boolean close)
- {
- //TODO
- switch (_state)
- {
- case ATTACHED:
- _state = State.DETACH_SENT;
- break;
- case DETACH_RECVD:
- _state = State.DETACHED;
- break;
- default:
- if (close)
- {
- destroy();
- _link.linkClosed();
- }
- return;
- }
-
- if (getSession().getSessionState() != SessionState.END_RECVD && !getSession().isEnded())
- {
- Detach detach = new Detach();
- detach.setHandle(getLocalHandle());
- if (close)
- {
- detach.setClosed(close);
- }
- detach.setError(error);
-
- getSession().sendDetach(detach);
- }
-
- if (close)
- {
- destroy();
- _link.linkClosed();
- }
- setLocalHandle(null);
- }
-
- public void setTransactionId(final Object txnId)
- {
- _flowTransactionId = txnId;
- }
-
- public void sendFlowConditional()
- {
- if(_lastSentCreditLimit != null)
- {
- if(_stoppedUpdated)
- {
- sendFlow(_flowTransactionId != null);
- _stoppedUpdated = false;
- }
- else
- {
- UnsignedInteger clientsCredit = _lastSentCreditLimit.subtract(_deliveryCount);
-
- // client has used up over half their credit allowance ?
- boolean sendFlow = _linkCredit.subtract(clientsCredit).compareTo(clientsCredit) >= 0;
- if (sendFlow)
- {
- sendFlow(_flowTransactionId != null);
- }
- else
- {
- getSession().sendFlowConditional();
- }
- }
- }
- else
- {
- sendFlow(_flowTransactionId != null);
- }
- }
-
-
- public void sendFlow()
- {
- sendFlow(_flowTransactionId != null);
- }
-
- public void sendFlowWithEcho()
- {
- sendFlow(_flowTransactionId != null, true);
- }
-
-
- public void sendFlow(boolean setTransactionId)
- {
- sendFlow(setTransactionId, false);
- }
-
- public void sendFlow(boolean setTransactionId, boolean echo)
- {
- if(_state == State.ATTACHED || _state == State.ATTACH_SENT)
- {
- Flow flow = new Flow();
- flow.setDeliveryCount(_deliveryCount);
- flow.setEcho(echo);
- if(_stopped)
- {
- flow.setLinkCredit(UnsignedInteger.ZERO);
- flow.setDrain(true);
- _lastSentCreditLimit = _deliveryCount;
- }
- else
- {
- flow.setLinkCredit(_linkCredit);
- _lastSentCreditLimit = _linkCredit.add(_deliveryCount);
- flow.setDrain(_drain);
- }
- flow.setAvailable(_available);
- if(setTransactionId)
- {
- flow.setProperties(Collections.singletonMap(Symbol.valueOf("txn-id"), _flowTransactionId));
- }
- flow.setHandle(getLocalHandle());
- getSession().sendFlow(flow);
- }
- }
-
- public Link_1_0 getLink()
- {
- return _link;
- }
-
- public SenderSettleMode getSendingSettlementMode()
- {
- return _sendingSettlementMode;
- }
-
- public ReceiverSettleMode getReceivingSettlementMode()
- {
- return _receivingSettlementMode;
- }
-
- public List<Symbol> getCapabilities()
- {
- return _capabilities == null ? null : Collections.unmodifiableList(Arrays.asList(_capabilities));
- }
-
- public void setCapabilities(Collection<Symbol> capabilities)
- {
- _capabilities = capabilities == null ? null : capabilities.toArray(new Symbol[capabilities.size()]);
- }
-
- public Map getInitialUnsettledMap()
- {
- return _initialUnsettledMap;
- }
-
- public abstract void initialiseUnsettled();
-
- @Override public String toString()
- {
- return "LinkEndpoint{" +
- "_name='" + getLinkName() + '\'' +
- ", _session=" + _session +
- ", _state=" + _state +
- ", _role=" + getRole() +
- ", _source=" + getSource() +
- ", _target=" + getTarget() +
- ", _transferCount=" + _deliveryCount +
- ", _linkCredit=" + _linkCredit +
- ", _available=" + _available +
- ", _drain=" + _drain +
- ", _localHandle=" + _localHandle +
- ", _maxMessageSize=" + _maxMessageSize +
- '}';
- }
+ Role getRole();
+
+ BaseSource getSource();
+
+ BaseTarget getTarget();
+
+ Session_1_0 getSession();
+
+ UnsignedInteger getLocalHandle();
+
+ void setLocalHandle(UnsignedInteger localHandle);
+
+ void receiveAttach(Attach attach) throws AmqpErrorException;
+
+ void sendAttach();
+
+ void remoteDetached(Detach detach);
+
+ void receiveDeliveryState(Delivery unsettled,
+ DeliveryState state,
+ Boolean settled);
+
+ void receiveFlow(Flow flow);
+
+ void flowStateChanged();
+
+ void start();
+
+ void setStopped(boolean stopped);
+
+ void destroy();
+
+ void close(Error error);
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java?rev=1786358&r1=1786357&r2=1786358&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java Fri Mar 10 14:53:11 2017
@@ -27,10 +27,12 @@ import com.google.common.util.concurrent
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
@@ -42,11 +44,12 @@ public class LinkImpl implements Link_1_
private static final Logger LOGGER = LoggerFactory.getLogger(LinkImpl.class);
private final String _linkName;
+
private final Role _role;
+
private volatile LinkEndpoint _linkEndpoint;
private volatile BaseSource _source;
private volatile BaseTarget _target;
-
LinkImpl(final String linkName, final Role role)
{
_linkName = linkName;
@@ -54,13 +57,13 @@ public class LinkImpl implements Link_1_
}
@Override
- public final ListenableFuture<LinkEndpoint> attach(final Session_1_0 session, final Attach attach)
+ public final ListenableFuture<? extends LinkEndpoint> attach(final Session_1_0 session, final Attach attach)
{
try
{
if (_role == attach.getRole())
{
- return rejectLink(session);
+ throw new AmqpErrorException(new Error(AmqpError.ILLEGAL_STATE, "Cannot switch SendingLink to ReceivingLink and vice versa"));
}
@@ -82,12 +85,12 @@ public class LinkImpl implements Link_1_
}
_linkEndpoint.receiveAttach(attach);
- return Futures.immediateFuture(_linkEndpoint);
+ return Futures.immediateFuture((LinkEndpoint) _linkEndpoint);
}
}
catch (Throwable t)
{
- return rejectLink(session);
+ return rejectLink(session, t);
}
}
@@ -142,14 +145,20 @@ public class LinkImpl implements Link_1_
return linkEndpoint;
}
-
- private ListenableFuture<LinkEndpoint> rejectLink(final Session_1_0 session)
+ private ListenableFuture<? extends LinkEndpoint> rejectLink(final Session_1_0 session, Throwable t)
{
- _linkEndpoint = new SendingLinkEndpoint(session, this);
- _source = null;
+ if (t instanceof AmqpErrorException)
+ {
+ _linkEndpoint = new ErrantLinkEndpoint(this, session, ((AmqpErrorException) t).getError());
+ }
+ else
+ {
+ _linkEndpoint = new ErrantLinkEndpoint(this, session, new Error(AmqpError.INTERNAL_ERROR, t.getMessage()));
+ }
return Futures.immediateFuture(_linkEndpoint);
}
+
@Override
public void linkClosed()
{
@@ -169,6 +178,12 @@ public class LinkImpl implements Link_1_
}
@Override
+ public Role getRole()
+ {
+ return _role;
+ }
+
+ @Override
public BaseSource getSource()
{
return _source;
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java?rev=1786358&r1=1786357&r2=1786358&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java Fri Mar 10 14:53:11 2017
@@ -26,10 +26,11 @@ import org.apache.qpid.server.protocol.L
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
public interface Link_1_0 extends LinkModel
{
- ListenableFuture<LinkEndpoint> attach(Session_1_0 session, final Attach attach);
+ ListenableFuture<? extends LinkEndpoint> attach(Session_1_0 session, final Attach attach);
void linkClosed();
@@ -37,6 +38,8 @@ public interface Link_1_0 extends LinkMo
String getName();
+ Role getRole();
+
BaseSource getSource();
BaseTarget getTarget();
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java?rev=1786358&r1=1786357&r2=1786358&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java Fri Mar 10 14:53:11 2017
@@ -39,7 +39,7 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-public abstract class ReceivingLinkEndpoint extends LinkEndpoint
+public abstract class ReceivingLinkEndpoint extends AbstractLinkEndpoint
{
private final SectionDecoder _sectionDecoder;
private UnsignedInteger _lastDeliveryId;
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java?rev=1786358&r1=1786357&r2=1786358&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java Fri Mar 10 14:53:11 2017
@@ -74,7 +74,7 @@ import org.apache.qpid.server.txn.Server
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
-public class SendingLinkEndpoint extends LinkEndpoint
+public class SendingLinkEndpoint extends AbstractLinkEndpoint
{
private static final Logger LOGGER = LoggerFactory.getLogger(SendingLinkEndpoint.class);
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1786358&r1=1786357&r2=1786358&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Fri Mar 10 14:53:11 2017
@@ -223,7 +223,7 @@ public class Session_1_0 extends Abstrac
link = getAddressSpace().getReceivingLink(getConnection().getRemoteContainerId(), attach.getName());
}
- final ListenableFuture<LinkEndpoint> future = link.attach(this, attach);
+ final ListenableFuture<? extends LinkEndpoint> future = link.attach(this, attach);
addFutureCallback(future, new EndpointCreationCallback(attach), MoreExecutors.directExecutor());
}
@@ -1510,6 +1510,19 @@ public class Session_1_0 extends Abstrac
return "Session_1_0[" + _connection + ": " + _sendingChannel + ']';
}
+ public void dissociateEndpoint(LinkEndpoint linkEndpoint)
+ {
+ for (Map.Entry<UnsignedInteger, LinkEndpoint> entry : _inputHandleToEndpoint.entrySet())
+ {
+ if (entry.getValue() == linkEndpoint)
+ {
+ _inputHandleToEndpoint.remove(entry.getKey());
+ break;
+ }
+ }
+ _endpointToOutputHandle.remove(linkEndpoint);
+ _associatedLinkEndpoints.remove(linkEndpoint);
+ }
private void detach(UnsignedInteger handle, Detach detach)
{
@@ -1587,7 +1600,7 @@ public class Session_1_0 extends Abstrac
return primaryDomain;
}
- private class EndpointCreationCallback implements FutureCallback<LinkEndpoint>
+ private class EndpointCreationCallback<T extends LinkEndpoint> implements FutureCallback<T>
{
private final Attach _attach;
@@ -1607,9 +1620,14 @@ public class Session_1_0 extends Abstrac
{
_associatedLinkEndpoints.add(endpoint);
endpoint.setLocalHandle(findNextAvailableOutputHandle());
- if (attachWasUnsuccessful(endpoint))
+ if (endpoint instanceof ErrantLinkEndpoint)
+ {
+ endpoint.sendAttach();
+ ((ErrantLinkEndpoint) endpoint).closeWithError();
+ }
+ else if (attachWasUnsuccessful(endpoint))
{
- endpoint.attach();
+ endpoint.sendAttach();
Error error = new Error();
error.setCondition(AmqpError.NOT_FOUND);
@@ -1628,7 +1646,7 @@ public class Session_1_0 extends Abstrac
if (!_endpointToOutputHandle.containsKey(endpoint))
{
_endpointToOutputHandle.put(endpoint, endpoint.getLocalHandle());
- endpoint.attach();
+ endpoint.sendAttach();
endpoint.start();
}
else
Modified: qpid/java/trunk/test-profiles/JavaBDBExcludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/JavaBDBExcludes?rev=1786358&r1=1786357&r2=1786358&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/JavaBDBExcludes (original)
+++ qpid/java/trunk/test-profiles/JavaBDBExcludes Fri Mar 10 14:53:11 2017
@@ -17,3 +17,8 @@
// under the License.
//
+
+
+// Links are currently not persisted so when the broker is restarted this test fails
+// When Link persistance is implemented (QPID-7663) this test should be included again
+org.apache.qpid.systests.jms_2_0.subscription.SharedSubscriptionTest#testUnsubscribe
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org