You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/03/08 14:44:42 UTC
svn commit: r1785976 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid/server/virtualhost/
broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/
broke...
Author: lquack
Date: Wed Mar 8 14:44:41 2017
New Revision: 1785976
URL: http://svn.apache.org/viewvc?rev=1785976&view=rev
Log:
QPID-7658: [Java Broker] Improve LinkRegistry. Address issues with previous commit.
* Receiving and Coordinating Links are treated the same as far as link uniqueness is concerned
* Only have one Link class. move most attach logic into specific linkEndpoints
Added:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java
Removed:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLink.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java?rev=1785976&r1=1785975&r2=1785976&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java Wed Mar 8 14:44:41 2017
@@ -55,7 +55,8 @@ public interface NamedAddressSpace exten
MessageDestination getDefaultDestination();
- <T extends LinkModel> T getLink(String remoteContainerId, String linkName, Class<T> type);
+ <T extends LinkModel> T getSendingLink(String remoteContainerId, String linkName);
+ <T extends LinkModel> T getReceivingLink(String remoteContainerId, String linkName);
boolean authoriseCreateConnection(AMQPConnection<?> connection);
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java?rev=1785976&r1=1785975&r2=1785976&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java Wed Mar 8 14:44:41 2017
@@ -139,9 +139,14 @@ public abstract class AbstractNonConnect
}
@Override
- public <T extends LinkModel> T getLink(final String remoteContainerId,
- final String linkName,
- final Class<T> type)
+ public <T extends LinkModel> T getSendingLink(final String remoteContainerId, final String linkName)
+ {
+ throwUnsupported();
+ return null;
+ }
+
+ @Override
+ public <T extends LinkModel> T getReceivingLink(final String remoteContainerId, final String linkName)
{
throwUnsupported();
return null;
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1785976&r1=1785975&r2=1785976&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Wed Mar 8 14:44:41 2017
@@ -73,13 +73,12 @@ import com.google.common.util.concurrent
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.server.exchange.ExchangeDefaults;
-import org.apache.qpid.server.pool.SuppressingInheritedAccessControlContextThreadFactory;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.updater.Task;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
import org.apache.qpid.server.exchange.DefaultDestination;
+import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
@@ -100,6 +99,7 @@ import org.apache.qpid.server.model.pref
import org.apache.qpid.server.plugin.ConnectionValidator;
import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.plugin.SystemNodeCreator;
+import org.apache.qpid.server.pool.SuppressingInheritedAccessControlContextThreadFactory;
import org.apache.qpid.server.protocol.LinkModel;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.AccessControl;
@@ -1609,9 +1609,15 @@ public abstract class AbstractVirtualHos
}
@Override
- public <T extends LinkModel> T getLink(String remoteContainerId, String linkName, Class<T> type)
+ public <T extends LinkModel> T getSendingLink(String remoteContainerId, String linkName)
+ {
+ return _linkRegistry.getSendingLink(remoteContainerId, linkName);
+ }
+
+ @Override
+ public <T extends LinkModel> T getReceivingLink(String remoteContainerId, String linkName)
{
- return _linkRegistry.getLink(remoteContainerId, linkName, type);
+ return _linkRegistry.getReceivingLink(remoteContainerId, linkName);
}
public DtxRegistry getDtxRegistry()
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java?rev=1785976&r1=1785975&r2=1785976&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java Wed Mar 8 14:44:41 2017
@@ -24,5 +24,6 @@ import org.apache.qpid.server.protocol.L
public interface LinkRegistry
{
- <T extends LinkModel> T getLink(String remoteContainerId, String linkName, Class<T> type);
+ <T extends LinkModel> T getSendingLink(String remoteContainerId, String linkName);
+ <T extends LinkModel> T getReceivingLink(String remoteContainerId, String linkName);
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java?rev=1785976&r1=1785975&r2=1785976&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java Wed Mar 8 14:44:41 2017
@@ -47,16 +47,15 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
-public abstract class LinkEndpoint<T extends Link_1_0>
+public abstract class LinkEndpoint
{
private static final Logger LOGGER = LoggerFactory.getLogger(LinkEndpoint.class);
- private final T _link;
- private Session_1_0 _session;
+ private final Link_1_0 _link;
+ private final Session_1_0 _session;
private Object _flowTransactionId;
private SenderSettleMode _sendingSettlementMode;
private ReceiverSettleMode _receivingSettlementMode;
private Map _initialUnsettledMap;
- private Map _localUnsettled;
private UnsignedInteger _lastSentCreditLimit;
private volatile boolean _stopped;
private volatile boolean _stoppedUpdated;
@@ -70,6 +69,7 @@ public abstract class LinkEndpoint<T ext
private Map<Symbol, Object> _properties;
protected volatile State _state = State.ATTACH_RECVD;
+ protected Map _localUnsettled;
protected enum State
{
@@ -82,8 +82,9 @@ public abstract class LinkEndpoint<T ext
}
- LinkEndpoint(final T link)
+ LinkEndpoint(final Session_1_0 session, final Link_1_0 link)
{
+ _session = session;
_link = link;
}
@@ -101,6 +102,38 @@ public abstract class LinkEndpoint<T ext
protected abstract Map<Symbol,Object> initProperties(final Attach attach);
+
+ public void receiveAttach(final Attach attach) throws AmqpErrorException
+ {
+ boolean isAttachingLocalTerminusNull = (attach.getRole() == Role.SENDER ? attach.getTarget() == null : attach.getSource() == null);
+ boolean isLocalTerminusNull = (attach.getRole() == Role.SENDER ? getTarget() == null : getSource() == null);
+
+ if (isAttachingLocalTerminusNull)
+ {
+ recoverLink(attach);
+ }
+ else if (isLocalTerminusNull)
+ {
+ establishLink(attach);
+ }
+ else if (attach.getUnsettled() != null)
+ {
+ resumeLink(attach);
+ }
+ else
+ {
+ reattachLink(attach);
+ }
+ }
+
+ protected abstract void reattachLink(final Attach attach) throws AmqpErrorException;
+
+ protected abstract void resumeLink(final Attach attach) throws AmqpErrorException;
+
+ protected abstract void establishLink(final Attach attach) throws AmqpErrorException;
+
+ protected abstract void recoverLink(final Attach attach) throws AmqpErrorException;
+
public void attachReceived(final Attach attach) throws AmqpErrorException
{
_sendingSettlementMode = attach.getSndSettleMode();
@@ -240,20 +273,9 @@ public abstract class LinkEndpoint<T ext
return _session;
}
- public void associateSession(final Session_1_0 session)
- {
- if (session == null)
- {
- throw new IllegalStateException("To dissociate session from Endpoint call LinkEndpoint#dissociateSession() "
- + "instead of LinkEndpoint#associate(null)");
- }
- _session = session;
- }
-
- public void dissociateSession()
+ public void destroy()
{
setLocalHandle(null);
- _session = null;
getLink().discardEndpoint();
}
@@ -329,6 +351,11 @@ public abstract class LinkEndpoint<T ext
_state = State.DETACHED;
break;
default:
+ if (close)
+ {
+ destroy();
+ _link.linkClosed();
+ }
return;
}
@@ -347,7 +374,7 @@ public abstract class LinkEndpoint<T ext
if (close)
{
- dissociateSession();
+ destroy();
_link.linkClosed();
}
setLocalHandle(null);
@@ -435,7 +462,7 @@ public abstract class LinkEndpoint<T ext
}
}
- public T getLink()
+ public Link_1_0 getLink()
{
return _link;
}
@@ -465,10 +492,7 @@ public abstract class LinkEndpoint<T ext
return _initialUnsettledMap;
}
- public void setLocalUnsettled(Map unsettled)
- {
- _localUnsettled = unsettled;
- }
+ public abstract void initialiseUnsettled();
@Override public String toString()
{
Added: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java?rev=1785976&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java Wed Mar 8 14:44:41 2017
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.protocol.v1_0;
+
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+
+public class LinkImpl implements Link_1_0
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(LinkImpl.class);
+
+ private final String _linkName;
+ private final Role _role;
+ private volatile LinkEndpoint _linkEndpoint;
+ private volatile BaseSource _source;
+ private volatile BaseTarget _target;
+
+ LinkImpl(final String linkName, final Role role)
+ {
+ _linkName = linkName;
+ _role = role;
+ }
+
+ @Override
+ public final ListenableFuture<LinkEndpoint> attach(final Session_1_0 session, final Attach attach)
+ {
+ try
+ {
+ if (_role == attach.getRole())
+ {
+ return rejectLink(session);
+ }
+
+
+ if (_linkEndpoint != null && !session.equals(_linkEndpoint.getSession()))
+ {
+ return stealLink(session, attach);
+ }
+ else
+ {
+ if (_linkEndpoint == null)
+ {
+ _linkEndpoint = createLinkEndpoint(session, attach);
+ if (_linkEndpoint == null)
+ {
+ throw new ConnectionScopedRuntimeException(String.format(
+ "LinkEndpoint creation failed for attach: %s",
+ attach));
+ }
+ }
+
+ _linkEndpoint.receiveAttach(attach);
+ return Futures.immediateFuture(_linkEndpoint);
+ }
+ }
+ catch (Throwable t)
+ {
+ return rejectLink(session);
+ }
+ }
+
+ private synchronized ListenableFuture<LinkEndpoint> stealLink(final Session_1_0 session, final Attach attach)
+ {
+ final SettableFuture<LinkEndpoint> returnFuture = SettableFuture.create();
+ _linkEndpoint.getSession().doOnIOThreadAsync(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ _linkEndpoint.close(new Error(LinkError.STOLEN,
+ String.format("Link is being stolen by connection '%s'",
+ session.getConnection())));
+ try
+ {
+ returnFuture.set(attach(session, attach).get());
+ }
+ catch (InterruptedException e)
+ {
+ returnFuture.setException(e);
+ Thread.currentThread().interrupt();
+ }
+ catch (ExecutionException e)
+ {
+ returnFuture.setException(e.getCause());
+ }
+ }
+ });
+ return returnFuture;
+ }
+
+ private LinkEndpoint createLinkEndpoint(final Session_1_0 session, final Attach attach)
+ {
+ LinkEndpoint linkEndpoint = null;
+ if (_role == Role.SENDER)
+ {
+ linkEndpoint = new SendingLinkEndpoint(session, this);
+ }
+ else if (_role == Role.RECEIVER && attach.getTarget() != null)
+ {
+
+ if (attach.getTarget() instanceof Target)
+ {
+ linkEndpoint = new StandardReceivingLinkEndpoint(session, this);
+ }
+ else if (attach.getTarget() instanceof Coordinator)
+ {
+ linkEndpoint = new TxnCoordinatorReceivingLinkEndpoint(session, this);
+ }
+ }
+ return linkEndpoint;
+ }
+
+
+ private ListenableFuture<LinkEndpoint> rejectLink(final Session_1_0 session)
+ {
+ _linkEndpoint = new SendingLinkEndpoint(session, this);
+ _source = null;
+ return Futures.immediateFuture(_linkEndpoint);
+ }
+
+ @Override
+ public void linkClosed()
+ {
+ discardEndpoint();
+ }
+
+ @Override
+ public void discardEndpoint()
+ {
+ _linkEndpoint = null;
+ }
+
+ @Override
+ public final String getName()
+ {
+ return _linkName;
+ }
+
+ @Override
+ public BaseSource getSource()
+ {
+ return _source;
+ }
+
+ @Override
+ public void setSource(BaseSource source)
+ {
+ _source = source;
+ }
+
+ @Override
+ public BaseTarget getTarget()
+ {
+ return _target;
+ }
+
+ @Override
+ public void setTarget(BaseTarget target)
+ {
+ _target = target;
+ }
+
+ @Override
+ public void setTermini(BaseSource source, BaseTarget target)
+ {
+ _source = source;
+ _target = target;
+ }
+}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java?rev=1785976&r1=1785975&r2=1785976&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java Wed Mar 8 14:44:41 2017
@@ -24,15 +24,14 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.qpid.server.model.NamedAddressSpace;
-import org.apache.qpid.server.protocol.LinkModel;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.virtualhost.LinkRegistry;
public class LinkRegistryImpl implements LinkRegistry
{
- private final Map<String, Map<String, SendingLink_1_0>> _sendingLinkRegistry = new HashMap<>();
- private final Map<String, Map<String, StandardReceivingLink_1_0>> _receivingLinkRegistry = new HashMap<>();
- private final Map<String, Map<String, TxnCoordinatorReceivingLink_1_0>> _coordinatorLinkRegistry = new HashMap<>();
+ private final Map<String, Map<String, Link_1_0>> _sendingLinkRegistry = new HashMap<>();
+ private final Map<String, Map<String, Link_1_0>> _receivingLinkRegistry = new HashMap<>();
+
private final NamedAddressSpace _addressSpace;
LinkRegistryImpl(final NamedAddressSpace addressSpace)
@@ -40,73 +39,31 @@ public class LinkRegistryImpl implements
_addressSpace = addressSpace;
}
- @Override
- public synchronized <T extends LinkModel> T getLink(final String remoteContainerId, final String linkName, final Class<T> type)
+ public Link_1_0 getSendingLink(final String remoteContainerId, final String linkName)
{
- if (SendingLink_1_0.class.equals(type))
- {
- return (T) getSendingLink(remoteContainerId, linkName);
- }
- else if (StandardReceivingLink_1_0.class.equals(type))
- {
- return (T) getReceivingLink(remoteContainerId, linkName);
- }
- else if (TxnCoordinatorReceivingLink_1_0.class.equals(type))
- {
- return (T) getCoordinatorLink(remoteContainerId, linkName);
- }
- else
- {
- throw new ConnectionScopedRuntimeException(String.format("Unsupported link type: '%s'", type.getSimpleName()));
- }
- }
-
- private TxnCoordinatorReceivingLink_1_0 getCoordinatorLink(final String remoteContainerId, final String linkName)
- {
- Map<String, TxnCoordinatorReceivingLink_1_0> containerRegistry = _coordinatorLinkRegistry.get(remoteContainerId);
- if (containerRegistry == null)
- {
- containerRegistry = new HashMap<>();
- _coordinatorLinkRegistry.put(remoteContainerId, containerRegistry);
- }
- TxnCoordinatorReceivingLink_1_0 link = containerRegistry.get(linkName);
- if (link == null)
- {
- link = new TxnCoordinatorReceivingLink_1_0(linkName);
- containerRegistry.put(linkName, link);
- }
- return link;
+ return getLinkFromRegistry(remoteContainerId, linkName, _sendingLinkRegistry, Role.SENDER);
}
- private SendingLink_1_0 getSendingLink(final String remoteContainerId, final String linkName)
+ public Link_1_0 getReceivingLink(final String remoteContainerId, final String linkName)
{
- Map<String, SendingLink_1_0> containerRegistry = _sendingLinkRegistry.get(remoteContainerId);
- if (containerRegistry == null)
- {
- containerRegistry = new HashMap<>();
- _sendingLinkRegistry.put(remoteContainerId, containerRegistry);
- }
- SendingLink_1_0 link = containerRegistry.get(linkName);
- if (link == null)
- {
- link = new SendingLink_1_0(linkName);
- containerRegistry.put(linkName, link);
- }
- return link;
+ return getLinkFromRegistry(remoteContainerId, linkName, _receivingLinkRegistry, Role.RECEIVER);
}
- private StandardReceivingLink_1_0 getReceivingLink(final String remoteContainerId, final String linkName)
+ private Link_1_0 getLinkFromRegistry(final String remoteContainerId,
+ final String linkName,
+ final Map<String, Map<String, Link_1_0>> linkRegistry,
+ final Role role)
{
- Map<String, StandardReceivingLink_1_0> containerRegistry = _receivingLinkRegistry.get(remoteContainerId);
+ Map<String, Link_1_0> containerRegistry = linkRegistry.get(remoteContainerId);
if (containerRegistry == null)
{
containerRegistry = new HashMap<>();
- _receivingLinkRegistry.put(remoteContainerId, containerRegistry);
+ linkRegistry.put(remoteContainerId, containerRegistry);
}
- StandardReceivingLink_1_0 link = containerRegistry.get(linkName);
+ Link_1_0 link = containerRegistry.get(linkName);
if (link == null)
{
- link = new StandardReceivingLink_1_0(linkName);
+ link = new LinkImpl(linkName, role);
containerRegistry.put(linkName, link);
}
return link;
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java?rev=1785976&r1=1785975&r2=1785976&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java Wed Mar 8 14:44:41 2017
@@ -29,7 +29,7 @@ import org.apache.qpid.server.protocol.v
public interface Link_1_0 extends LinkModel
{
- ListenableFuture<? extends LinkEndpoint<?>> attach(Session_1_0 session, final Attach attach);
+ ListenableFuture<LinkEndpoint> attach(Session_1_0 session, final Attach attach);
void linkClosed();
@@ -40,4 +40,10 @@ public interface Link_1_0 extends LinkMo
BaseSource getSource();
BaseTarget getTarget();
+
+ void setSource(BaseSource source);
+
+ void setTarget(BaseTarget target);
+
+ void setTermini(BaseSource source, BaseTarget target);
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java?rev=1785976&r1=1785975&r2=1785976&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java Wed Mar 8 14:44:41 2017
@@ -26,6 +26,7 @@ import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
@@ -38,13 +39,11 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-public abstract class ReceivingLinkEndpoint extends LinkEndpoint<ReceivingLink_1_0>
+public abstract class ReceivingLinkEndpoint extends LinkEndpoint
{
-
-
+ private final SectionDecoder _sectionDecoder;
private UnsignedInteger _lastDeliveryId;
private ReceivingDestination _receivingDestination;
- private final SectionDecoder _sectionDecoder;
private static class TransientState
{
@@ -91,10 +90,12 @@ public abstract class ReceivingLinkEndpo
private UnsignedInteger _drainLimit;
- public ReceivingLinkEndpoint(final ReceivingLink_1_0 link, final SectionDecoder sectionDecoder)
+ public ReceivingLinkEndpoint(final Session_1_0 session, final Link_1_0 link)
{
- super(link);
- _sectionDecoder = sectionDecoder;
+ super(session, link);
+ _sectionDecoder = new SectionDecoderImpl(session.getConnection()
+ .getDescribedTypeRegistry()
+ .getSectionDecoderRegistry());
}
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java?rev=1785976&r1=1785975&r2=1785976&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java Wed Mar 8 14:44:41 2017
@@ -74,7 +74,7 @@ import org.apache.qpid.server.txn.Server
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
-public class SendingLinkEndpoint extends LinkEndpoint<SendingLink_1_0>
+public class SendingLinkEndpoint extends LinkEndpoint
{
private static final Logger LOGGER = LoggerFactory.getLogger(SendingLinkEndpoint.class);
@@ -96,9 +96,9 @@ public class SendingLinkEndpoint extends
private ConsumerTarget_1_0 _consumerTarget;
private MessageInstanceConsumer<ConsumerTarget_1_0> _consumer;
- public SendingLinkEndpoint(final SendingLink_1_0 link)
+ public SendingLinkEndpoint(final Session_1_0 session, final LinkImpl link)
{
- super(link);
+ super(session, link);
setDeliveryCount(UnsignedInteger.valueOf(0));
setAvailable(UnsignedInteger.valueOf(0));
setCapabilities(Arrays.asList(AMQPConnection_1_0.SHARED_SUBSCRIPTIONS));
@@ -287,6 +287,94 @@ public class SendingLinkEndpoint extends
}
@Override
+ protected void reattachLink(final Attach attach) throws AmqpErrorException
+ {
+ if (getSource() == null)
+ {
+ throw new IllegalStateException("Terminus should be set when resuming a Link.");
+ }
+ if (attach.getSource() == null)
+ {
+ throw new IllegalStateException("Attach.getSource should not be null when resuming a Link. That would be recovering the Link.");
+ }
+
+ Source newSource = (Source) attach.getSource();
+ Source oldSource = (Source) getSource();
+
+ final SendingDestination destination = getSession().getSendingDestination(getLinkName(), oldSource);
+ prepareConsumerOptionsAndFilters(destination);
+
+ if (getDestination() instanceof ExchangeDestination && !Boolean.TRUE.equals(newSource.getDynamic()))
+ {
+ final SendingDestination newDestination =
+ getSession().getSendingDestination(getLinkName(), newSource);
+ if (getSession().updateSourceForSubscription(this, newSource, newDestination))
+ {
+ setDestination(newDestination);
+ }
+ }
+
+ attachReceived(attach);
+ }
+
+ @Override
+ protected void resumeLink(final Attach attach) throws AmqpErrorException
+ {
+ if (getSource() == null)
+ {
+ throw new IllegalStateException("Terminus should be set when resuming a Link.");
+ }
+ if (attach.getSource() == null)
+ {
+ throw new IllegalStateException("Attach.getSource should not be null when resuming a Link. That would be recovering the Link.");
+ }
+
+ Source newSource = (Source) attach.getSource();
+ Source oldSource = (Source) getSource();
+
+ final SendingDestination destination = getSession().getSendingDestination(getLinkName(), oldSource);
+ prepareConsumerOptionsAndFilters(destination);
+
+ if (getDestination() instanceof ExchangeDestination && !Boolean.TRUE.equals(newSource.getDynamic()))
+ {
+ final SendingDestination newDestination =
+ getSession().getSendingDestination(getLinkName(), newSource);
+ if (getSession().updateSourceForSubscription(this, newSource, newDestination))
+ {
+ setDestination(newDestination);
+ }
+ }
+
+ attachReceived(attach);
+ initialiseUnsettled();
+ }
+
+ @Override
+ protected void establishLink(final Attach attach) throws AmqpErrorException
+ {
+ if (getSource() != null || getTarget() != null)
+ {
+ throw new IllegalStateException("LinkEndpoint and Termini should be null when establishing a Link.");
+ }
+
+ attachReceived(attach);
+ }
+
+ @Override
+ protected void recoverLink(final Attach attach) throws AmqpErrorException
+ {
+ if (getSource() == null)
+ {
+ throw new AmqpErrorException(new Error(AmqpError.NOT_FOUND, ""));
+ }
+
+ final SendingDestination destination = getSession().getSendingDestination(getLinkName(), (Source) getSource());
+ prepareConsumerOptionsAndFilters(destination);
+
+ attachReceived(attach);
+ }
+
+ @Override
public Role getRole()
{
return Role.SENDER;
@@ -299,7 +387,7 @@ public class SendingLinkEndpoint extends
public TerminusDurability getTerminusDurability()
{
- return getLink().getLocalTerminusDurability();
+ return ((Source) getSource()).getDurable();
}
public boolean transfer(final Transfer xfr, final boolean decrementCredit)
@@ -459,10 +547,10 @@ public class SendingLinkEndpoint extends
close();
}
- else if (detach.getError() != null && !getSession().isSyntheticError(detach.getError()))
+ else if (detach.getError() != null)
{
detach();
- dissociateSession();
+ destroy();
getConsumerTarget().updateNotifyWorkDesired();
}
else
@@ -545,6 +633,36 @@ public class SendingLinkEndpoint extends
public void attachReceived(final Attach attach) throws AmqpErrorException
{
super.attachReceived(attach);
+
+ Target target = (Target) attach.getTarget();
+ Source source = (Source) getSource();
+ if (source == null)
+ {
+ source = new Source();
+ Source attachSource = (Source) attach.getSource();
+
+ final SendingDestination destination = getSession().getSendingDestination(attach.getName(), attachSource);
+ source.setAddress(attachSource.getAddress());
+ source.setDynamic(attachSource.getDynamic());
+ source.setDurable(attachSource.getDurable());
+ source.setExpiryPolicy(attachSource.getExpiryPolicy());
+ source.setDistributionMode(attachSource.getDistributionMode());
+ source.setFilter(attachSource.getFilter());
+ source.setCapabilities(destination.getCapabilities());
+ if (destination instanceof ExchangeDestination)
+ {
+ ExchangeDestination exchangeDestination = (ExchangeDestination) destination;
+ exchangeDestination.getQueue()
+ .setAttributes(Collections.<String, Object>singletonMap(Queue.DESIRED_STATE,
+ org.apache.qpid.server.model.State.ACTIVE));
+ }
+ getLink().setSource(source);
+ prepareConsumerOptionsAndFilters(destination);
+ }
+
+ getLink().setTarget(target);
+
+
final MessageInstanceConsumer consumer = getConsumer();
createConsumerTarget();
_resumeAcceptedTransfers.clear();
@@ -622,16 +740,15 @@ public class SendingLinkEndpoint extends
getConsumerTarget().updateNotifyWorkDesired();
}
- public Map<Binary, MessageInstance> getUnsettledOutcomeMap()
+ @Override
+ public void initialiseUnsettled()
{
- Map<Binary, MessageInstance> unsettled = new HashMap<>(_unsettledMap2);
+ Map<Binary, MessageInstance> _localUnsettled = new HashMap<>(_unsettledMap2);
- for (Map.Entry<Binary, MessageInstance> entry : unsettled.entrySet())
+ for (Map.Entry<Binary, MessageInstance> entry : _localUnsettled.entrySet())
{
entry.setValue(null);
}
-
- return unsettled;
}
public MessageInstanceConsumer<ConsumerTarget_1_0> getConsumer()
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1785976&r1=1785975&r2=1785976&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Wed Mar 8 14:44:41 2017
@@ -71,7 +71,6 @@ import org.apache.qpid.server.model.Name
import org.apache.qpid.server.model.NotFoundException;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.Session;
-import org.apache.qpid.server.protocol.LinkModel;
import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
@@ -95,7 +94,6 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
@@ -135,9 +133,9 @@ public class Session_1_0 extends Abstrac
private SessionState _sessionState;
- private final Map<LinkEndpoint<?>, UnsignedInteger> _endpointToOutputHandle = new HashMap<>();
- private final Map<UnsignedInteger, LinkEndpoint<?>> _inputHandleToEndpoint = new HashMap<>();
- private final Set<LinkEndpoint<?>> _associatedLinkEndpoints = new HashSet<>();
+ private final Map<LinkEndpoint, UnsignedInteger> _endpointToOutputHandle = new HashMap<>();
+ private final Map<UnsignedInteger, LinkEndpoint> _inputHandleToEndpoint = new HashMap<>();
+ private final Set<LinkEndpoint> _associatedLinkEndpoints = new HashSet<>();
private final short _receivingChannel;
private final short _sendingChannel;
@@ -215,27 +213,17 @@ public class Session_1_0 extends Abstrac
}
else
{
- final Class<? extends LinkModel> linkType;
+ final Link_1_0 link;
if (attach.getRole() == Role.RECEIVER)
{
- linkType = SendingLink_1_0.class;
+ link = getAddressSpace().getSendingLink(getConnection().getRemoteContainerId(), attach.getName());
}
else
{
- if (attach.getTarget() instanceof Coordinator)
- {
- linkType = TxnCoordinatorReceivingLink_1_0.class;
- }
- else
- {
- linkType = StandardReceivingLink_1_0.class;
- }
+ link = getAddressSpace().getReceivingLink(getConnection().getRemoteContainerId(), attach.getName());
}
- final Link_1_0 link = (Link_1_0) getAddressSpace().getLink(getConnection().getRemoteContainerId(),
- attach.getName(),
- linkType);
- final ListenableFuture<? extends LinkEndpoint<?>> future = link.attach(this, attach);
+ final ListenableFuture<LinkEndpoint> future = link.attach(this, attach);
addFutureCallback(future, new EndpointCreationCallback(attach), MoreExecutors.directExecutor());
}
@@ -419,7 +407,7 @@ public class Session_1_0 extends Abstrac
public void receiveFlow(final Flow flow)
{
UnsignedInteger handle = flow.getHandle();
- final LinkEndpoint<?> endpoint = handle == null ? null : _inputHandleToEndpoint.get(handle);
+ final LinkEndpoint endpoint = handle == null ? null : _inputHandleToEndpoint.get(handle);
final UnsignedInteger nextOutgoingId =
flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId();
@@ -432,8 +420,8 @@ public class Session_1_0 extends Abstrac
}
else
{
- final Collection<LinkEndpoint<?>> allLinkEndpoints = _inputHandleToEndpoint.values();
- for (LinkEndpoint<?> le : allLinkEndpoints)
+ final Collection<LinkEndpoint> allLinkEndpoints = _inputHandleToEndpoint.values();
+ for (LinkEndpoint le : allLinkEndpoints)
{
le.flowStateChanged();
}
@@ -577,7 +565,7 @@ public class Session_1_0 extends Abstrac
_nextIncomingTransferId.incr();
UnsignedInteger inputHandle = transfer.getHandle();
- LinkEndpoint<?> linkEndpoint = _inputHandleToEndpoint.get(inputHandle);
+ LinkEndpoint linkEndpoint = _inputHandleToEndpoint.get(inputHandle);
if (linkEndpoint == null)
{
@@ -1119,15 +1107,10 @@ public class Session_1_0 extends Abstrac
void remoteEnd(End end)
{
- List<LinkEndpoint<?>> linkEndpoints = new ArrayList<>(_endpointToOutputHandle.keySet());
- for(LinkEndpoint linkEndpoint : linkEndpoints)
+ for (LinkEndpoint linkEndpoint : _associatedLinkEndpoints)
{
linkEndpoint.remoteDetached(new Detach());
- linkEndpoint.dissociateSession();
- }
- for (LinkEndpoint<?> linkEndpoint : _associatedLinkEndpoints)
- {
- linkEndpoint.dissociateSession();
+ linkEndpoint.destroy();
}
_associatedLinkEndpoints.clear();
@@ -1215,7 +1198,7 @@ public class Session_1_0 extends Abstrac
@Override
public void transportStateChanged()
{
- for (LinkEndpoint<?> linkEndpoint : _endpointToOutputHandle.keySet())
+ for (LinkEndpoint linkEndpoint : _endpointToOutputHandle.keySet())
{
if (linkEndpoint instanceof SendingLinkEndpoint)
{
@@ -1251,7 +1234,7 @@ public class Session_1_0 extends Abstrac
{
messageWithSubject(ChannelMessages.FLOW_ENFORCED(queue.getName()));
- for (LinkEndpoint<?> linkEndpoint : _endpointToOutputHandle.keySet())
+ for (LinkEndpoint linkEndpoint : _endpointToOutputHandle.keySet())
{
if (linkEndpoint instanceof ReceivingLinkEndpoint
&& isQueueDestinationForLink(queue, ((ReceivingLinkEndpoint) linkEndpoint).getReceivingDestination()))
@@ -1291,7 +1274,7 @@ public class Session_1_0 extends Abstrac
{
messageWithSubject(ChannelMessages.FLOW_REMOVED());
}
- for (LinkEndpoint<?> linkEndpoint : _endpointToOutputHandle.keySet())
+ for (LinkEndpoint linkEndpoint : _endpointToOutputHandle.keySet())
{
if (linkEndpoint instanceof ReceivingLinkEndpoint
&& isQueueDestinationForLink(queue, ((ReceivingLinkEndpoint) linkEndpoint).getReceivingDestination()))
@@ -1322,7 +1305,7 @@ public class Session_1_0 extends Abstrac
{
messageWithSubject(ChannelMessages.FLOW_ENFORCED("** All Queues **"));
- for (LinkEndpoint<?> linkEndpoint : _endpointToOutputHandle.keySet())
+ for (LinkEndpoint linkEndpoint : _endpointToOutputHandle.keySet())
{
if (linkEndpoint instanceof ReceivingLinkEndpoint)
{
@@ -1354,7 +1337,7 @@ public class Session_1_0 extends Abstrac
{
messageWithSubject(ChannelMessages.FLOW_REMOVED());
}
- for (LinkEndpoint<?> linkEndpoint : _endpointToOutputHandle.keySet())
+ for (LinkEndpoint linkEndpoint : _endpointToOutputHandle.keySet())
{
if (linkEndpoint instanceof ReceivingLinkEndpoint
&& !_blockingEntities.contains(((ReceivingLinkEndpoint) linkEndpoint).getReceivingDestination()))
@@ -1532,7 +1515,7 @@ public class Session_1_0 extends Abstrac
{
if(_inputHandleToEndpoint.containsKey(handle))
{
- LinkEndpoint<?> endpoint = _inputHandleToEndpoint.remove(handle);
+ LinkEndpoint endpoint = _inputHandleToEndpoint.remove(handle);
endpoint.remoteDetached(detach);
_endpointToOutputHandle.remove(endpoint);
}
@@ -1553,11 +1536,6 @@ public class Session_1_0 extends Abstrac
detach.setError(_sessionEndedLinkError);
detach(handle, detach);
}
-
- for (LinkEndpoint<?> linkEndpoint : _associatedLinkEndpoints)
- {
- linkEndpoint.dissociateSession();
- }
}
@@ -1609,7 +1587,7 @@ public class Session_1_0 extends Abstrac
return primaryDomain;
}
- private class EndpointCreationCallback<T extends LinkEndpoint<?>> implements FutureCallback<T>
+ private class EndpointCreationCallback implements FutureCallback<LinkEndpoint>
{
private final Attach _attach;
@@ -1620,7 +1598,7 @@ public class Session_1_0 extends Abstrac
}
@Override
- public void onSuccess(final T endpoint)
+ public void onSuccess(final LinkEndpoint endpoint)
{
doOnIOThreadAsync(new Runnable()
{
@@ -1671,7 +1649,7 @@ public class Session_1_0 extends Abstrac
throw new ConnectionScopedRuntimeException(errorMessage, t);
}
- private boolean attachWasUnsuccessful(final T endpoint)
+ private boolean attachWasUnsuccessful(final LinkEndpoint endpoint)
{
if (endpoint.getRole().equals(Role.SENDER))
{
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java?rev=1785976&r1=1785975&r2=1785976&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java Wed Mar 8 14:44:41 2017
@@ -36,7 +36,6 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageFormat;
import org.apache.qpid.server.protocol.MessageFormatRegistry;
-import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
@@ -55,6 +54,7 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
@@ -78,9 +78,10 @@ public class StandardReceivingLinkEndpoi
private Binary _messageDeliveryTag;
private Map<Binary, Outcome> _unsettledMap = Collections.synchronizedMap(new HashMap<Binary, Outcome>());
- public StandardReceivingLinkEndpoint(final StandardReceivingLink_1_0 link, final SectionDecoder sectionDecoder)
+ public StandardReceivingLinkEndpoint(final Session_1_0 session,
+ final Link_1_0 link)
{
- super(link, sectionDecoder);
+ super(session, link);
}
@Override
@@ -331,7 +332,7 @@ public class StandardReceivingLinkEndpoi
else if(detach == null || detach.getError() != null)
{
detach();
- dissociateSession();
+ destroy();
}
else
{
@@ -339,6 +340,7 @@ public class StandardReceivingLinkEndpoi
}
}
+
@Override
protected void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
{
@@ -455,8 +457,21 @@ public class StandardReceivingLinkEndpoi
public void attachReceived(final Attach attach) throws AmqpErrorException
{
super.attachReceived(attach);
+
+ Source source = (Source) attach.getSource();
+ Target target = new Target();
+ Target attachTarget = (Target) attach.getTarget();
+
setDeliveryCount(attach.getInitialDeliveryCount());
+ final ReceivingDestination destination = getSession().getReceivingDestination(attachTarget);
+ target.setAddress(attachTarget.getAddress());
+ target.setDynamic(attachTarget.getDynamic());
+ target.setCapabilities(destination.getCapabilities());
+
+ setCapabilities(Arrays.asList(destination.getCapabilities()));
+ setDestination(destination);
+
Map initialUnsettledMap = getInitialUnsettledMap();
Map<Binary, Outcome> unsettledCopy = new HashMap<Binary, Outcome>(_unsettledMap);
for(Map.Entry<Binary, Outcome> entry : unsettledCopy.entrySet())
@@ -467,12 +482,77 @@ public class StandardReceivingLinkEndpoi
_unsettledMap.remove(deliveryTag);
}
}
+
+ getLink().setTermini(source, target);
+ }
+
+ @Override
+ public void initialiseUnsettled()
+ {
+ _localUnsettled = new HashMap(_unsettledMap);
+ }
+
+ @Override
+ protected void recoverLink(final Attach attach) throws AmqpErrorException
+ {
+ if (getTarget() == null || !(getTarget() instanceof Target))
+ {
+ throw new AmqpErrorException(new Error(AmqpError.NOT_FOUND,
+ String.format("Link '%s' not found", getLinkName())));
+ }
+
+ Source source = (Source) attach.getSource();
+ Target target = (Target) getTarget();
+
+ final ReceivingDestination destination = getSession().getReceivingDestination((Target) getTarget());
+ target.setCapabilities(destination.getCapabilities());
+ setCapabilities(Arrays.asList(destination.getCapabilities()));
+ setDestination(destination);
+ attachReceived(attach);
+
+ getLink().setTermini(source, target);
+ }
+
+
+ @Override
+ protected void reattachLink(final Attach attach) throws AmqpErrorException
+ {
+ if (attach.getTarget() instanceof Coordinator)
+ {
+ throw new AmqpErrorException(new Error(AmqpError.PRECONDITION_FAILED, "Cannot reattach standard receiving Link as a transaction coordinator"));
+ }
+
+ attachReceived(attach);
}
- public Map<Binary, Outcome> getUnsettledOutcomeMap()
+ @Override
+ protected void resumeLink(final Attach attach) throws AmqpErrorException
{
- return _unsettledMap;
+ if (getTarget() == null)
+ {
+ throw new IllegalStateException("Terminus should be set when resuming a Link.");
+ }
+ if (attach.getTarget() == null)
+ {
+ throw new IllegalStateException("Attach.getTarget should not be null when resuming a Link. That would be recovering the Link.");
+ }
+ if (attach.getTarget() instanceof Coordinator)
+ {
+ throw new AmqpErrorException(new Error(AmqpError.PRECONDITION_FAILED, "Cannot resume standard receiving Link as a transaction coordinator"));
+ }
+
+ attachReceived(attach);
+ initialiseUnsettled();
}
+ @Override
+ protected void establishLink(final Attach attach) throws AmqpErrorException
+ {
+ if (getSource() != null || getTarget() != null)
+ {
+ throw new IllegalStateException("Termini should be null when establishing a Link.");
+ }
+ attachReceived(attach);
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java?rev=1785976&r1=1785975&r2=1785976&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java Wed Mar 8 14:44:41 2017
@@ -25,7 +25,6 @@ import java.util.List;
import java.util.Map;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
@@ -33,6 +32,8 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
import org.apache.qpid.server.protocol.v1_0.type.transaction.Declare;
import org.apache.qpid.server.protocol.v1_0.type.transaction.Declared;
import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge;
@@ -51,10 +52,9 @@ public class TxnCoordinatorReceivingLink
private final LinkedHashMap<Integer, ServerTransaction> _createdTransactions = new LinkedHashMap<>();
private ArrayList<Transfer> _incompleteMessage;
- public TxnCoordinatorReceivingLinkEndpoint(final TxnCoordinatorReceivingLink_1_0 link,
- final SectionDecoder sectionDecoder)
+ public TxnCoordinatorReceivingLinkEndpoint(final Session_1_0 session, final Link_1_0 link)
{
- super(link, sectionDecoder);
+ super(session, link);
}
@Override
@@ -217,6 +217,39 @@ public class TxnCoordinatorReceivingLink
}
@Override
+ protected void reattachLink(final Attach attach) throws AmqpErrorException
+ {
+ throw new AmqpErrorException(new Error(AmqpError.NOT_IMPLEMENTED, "Cannot reattach a Coordinator Link."));
+ }
+
+ @Override
+ protected void resumeLink(final Attach attach) throws AmqpErrorException
+ {
+ throw new AmqpErrorException(new Error(AmqpError.NOT_IMPLEMENTED, "Cannot resume a Coordinator Link."));
+ }
+
+ @Override
+ protected void establishLink(final Attach attach) throws AmqpErrorException
+ {
+ if (getSource() != null || getTarget() != null)
+ {
+ throw new IllegalStateException("LinkEndpoint and Termini should be null when establishing a Link.");
+ }
+
+ Coordinator target = new Coordinator();
+ Source source = (Source) attach.getSource();
+ getLink().setTermini(source, target);
+
+ attachReceived(attach);
+ }
+
+ @Override
+ protected void recoverLink(final Attach attach) throws AmqpErrorException
+ {
+ throw new AmqpErrorException(new Error(AmqpError.NOT_IMPLEMENTED, "Cannot recover a Coordinator Link."));
+ }
+
+ @Override
protected void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
{
@@ -228,4 +261,9 @@ public class TxnCoordinatorReceivingLink
super.attachReceived(attach);
setDeliveryCount(attach.getInitialDeliveryCount());
}
+
+ @Override
+ public void initialiseUnsettled()
+ {
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java?rev=1785976&r1=1785975&r2=1785976&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java Wed Mar 8 14:44:41 2017
@@ -38,29 +38,25 @@ public class LinkRegistryTest extends Qp
_linkRegistry = new LinkRegistryImpl(_virtualHost);
}
- private void doTestGetLink(final Class<? extends LinkModel> type) throws Exception
+ public void testGetSendingLink() throws Exception
{
String remoteContainerId = "testRemoteContainerId";
String linkName = "testLinkName";
- LinkModel link = _linkRegistry.getLink(remoteContainerId, linkName, type);
- assertNotNull("LinkRegistry#getLink should always return an object", link);
- LinkModel link2 = _linkRegistry.getLink(remoteContainerId, linkName, type);
- assertNotNull("LinkRegistry#getLink should always return an object", link2);
- assertSame("Two calls to LinkRegistry#getLink should return the same object", link, link2);
- }
-
- public void testGetSendingLink() throws Exception
- {
- doTestGetLink(SendingLink_1_0.class);
+ LinkModel link = _linkRegistry.getSendingLink(remoteContainerId, linkName);
+ assertNotNull("LinkRegistry#getSendingLink should always return an object", link);
+ LinkModel link2 = _linkRegistry.getSendingLink(remoteContainerId, linkName);
+ assertNotNull("LinkRegistry#getSendingLink should always return an object", link2);
+ assertSame("Two calls to LinkRegistry#getSendingLink should return the same object", link, link2);
}
public void testGetReceivingLink() throws Exception
{
- doTestGetLink(StandardReceivingLink_1_0.class);
- }
-
- public void testGetCoordinatingLink() throws Exception
- {
- doTestGetLink(TxnCoordinatorReceivingLink_1_0.class);
+ String remoteContainerId = "testRemoteContainerId";
+ String linkName = "testLinkName";
+ LinkModel link = _linkRegistry.getReceivingLink(remoteContainerId, linkName);
+ assertNotNull("LinkRegistry#getReceivingLink should always return an object", link);
+ LinkModel link2 = _linkRegistry.getReceivingLink(remoteContainerId, linkName);
+ assertNotNull("LinkRegistry#getReceivingLink should always return an object", link2);
+ assertSame("Two calls to LinkRegistry#getReceivingLink should return the same object", link, link2);
}
}
Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java?rev=1785976&r1=1785975&r2=1785976&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java Wed Mar 8 14:44:41 2017
@@ -40,8 +40,8 @@ import javax.security.auth.Subject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.connection.SessionPrincipal;
+import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSender;
@@ -56,7 +56,6 @@ import org.apache.qpid.server.model.port
import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.plugin.SystemAddressSpaceCreator;
import org.apache.qpid.server.protocol.LinkModel;
-import org.apache.qpid.server.virtualhost.LinkRegistry;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.session.AMQPSession;
@@ -67,7 +66,7 @@ import org.apache.qpid.server.transport.
import org.apache.qpid.server.txn.DtxNotSupportedException;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.LinkRegistry;
import org.apache.qpid.server.virtualhost.LinkRegistryFactory;
import org.apache.qpid.server.virtualhost.VirtualHostPropertiesNode;
@@ -228,9 +227,15 @@ public class ManagementAddressSpace impl
}
@Override
- public <T extends LinkModel> T getLink(final String remoteContainerId, final String linkName, final Class<T> type)
+ public <T extends LinkModel> T getSendingLink(final String remoteContainerId, final String linkName)
+ {
+ return _linkRegistry.getSendingLink(remoteContainerId, linkName);
+ }
+
+ @Override
+ public <T extends LinkModel> T getReceivingLink(final String remoteContainerId, final String linkName)
{
- return _linkRegistry.getLink(remoteContainerId, linkName, type);
+ return _linkRegistry.getReceivingLink(remoteContainerId, linkName);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org