You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/03/06 15:06:57 UTC
svn commit: r1785660 [1/2] - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid/server/protocol/
broker-core/src/main/java/or...
Author: lquack
Date: Mon Mar 6 15:06:57 2017
New Revision: 1785660
URL: http://svn.apache.org/viewvc?rev=1785660&view=rev
Log:
QPID-7658: [Java Broker] Improve LinkRegistry
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java
- copied, changed from r1785348, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/LinkRegistry.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryFactory.java
- copied, changed from r1785348, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLink.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryFactoryImpl.java
- copied, changed from r1785348, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java
Removed:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/LinkRegistry.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/LinkRegistryImpl.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Mon Mar 6 15:06:57 2017
@@ -133,7 +133,7 @@ public abstract class AbstractConsumerTa
@Override
public boolean processPending()
{
- if (!getSession().getAMQPConnection().isIOThread())
+ if (getSession() == null || !getSession().getAMQPConnection().isIOThread())
{
return false;
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java Mon Mar 6 15:06:57 2017
@@ -2610,8 +2610,8 @@ public abstract class AbstractConfigured
return returnVal;
}
- protected static <V> void addFutureCallback(ListenableFuture<V> future, final FutureCallback<V> callback,
- Executor taskExecutor)
+ public static <V> void addFutureCallback(ListenableFuture<V> future, final FutureCallback<V> callback,
+ Executor taskExecutor)
{
final Subject subject = Subject.getSubject(AccessController.getContext());
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java Mon Mar 6 15:06:57 2017
@@ -29,7 +29,7 @@ import java.util.UUID;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.LinkRegistry;
+import org.apache.qpid.server.protocol.LinkModel;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.txn.DtxRegistry;
@@ -55,7 +55,7 @@ public interface NamedAddressSpace exten
MessageDestination getDefaultDestination();
- LinkRegistry getLinkRegistry(String remoteContainerId);
+ <T extends LinkModel> T getLink(String remoteContainerId, String linkName, Class<T> type);
boolean authoriseCreateConnection(AMQPConnection<?> connection);
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java Mon Mar 6 15:06:57 2017
@@ -23,6 +23,8 @@ package org.apache.qpid.server.session;
import java.util.Collection;
import java.util.UUID;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.logging.EventLoggerProvider;
import org.apache.qpid.server.logging.LogSubject;
@@ -95,4 +97,6 @@ public interface AMQPSession<S extends o
void notifyWork(X target);
void close();
+
+ ListenableFuture<Void> doOnIOThreadAsync(final Runnable task);
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java Mon Mar 6 15:06:57 2017
@@ -33,8 +33,10 @@ import java.util.concurrent.CopyOnWriteA
import javax.security.auth.Subject;
import com.google.common.base.Supplier;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
@@ -59,6 +61,7 @@ import org.apache.qpid.server.security.S
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.transport.TransactionTimeoutTicker;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.FutureHelper;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.apache.qpid.server.transport.network.Ticker;
@@ -411,4 +414,18 @@ public abstract class AbstractAMQPSessio
protected abstract void updateBlockedStateIfNecessary();
public abstract boolean isClosing();
+
+ @Override
+ public ListenableFuture<Void> doOnIOThreadAsync(final Runnable task)
+ {
+ final ListenableFuture<Void> future = getAMQPConnection().doOnIOThreadAsync(task);
+ return doAfter(MoreExecutors.directExecutor(), future, new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ getAMQPConnection().notifyWork(AbstractAMQPSession.this);
+ }
+ });
+ }
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java Mon Mar 6 15:06:57 2017
@@ -39,7 +39,7 @@ import org.apache.qpid.server.model.Conn
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.LinkRegistry;
+import org.apache.qpid.server.protocol.LinkModel;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.txn.DtxRegistry;
@@ -139,8 +139,11 @@ public abstract class AbstractNonConnect
}
@Override
- public LinkRegistry getLinkRegistry(final String remoteContainerId)
+ public <T extends LinkModel> T getLink(final String remoteContainerId,
+ final String linkName,
+ final Class<T> type)
{
+ throwUnsupported();
return null;
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Mon Mar 6 15:06:57 2017
@@ -100,8 +100,7 @@ import org.apache.qpid.server.model.pref
import org.apache.qpid.server.plugin.ConnectionValidator;
import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.plugin.SystemNodeCreator;
-import org.apache.qpid.server.protocol.LinkRegistry;
-import org.apache.qpid.server.protocol.LinkRegistryImpl;
+import org.apache.qpid.server.protocol.LinkModel;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.AccessControl;
import org.apache.qpid.server.security.CompoundAccessControl;
@@ -179,7 +178,7 @@ public abstract class AbstractVirtualHos
private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
- private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>();
+ private volatile LinkRegistry _linkRegistry;
private AtomicBoolean _blocked = new AtomicBoolean();
private final Map<String, MessageDestination> _systemNodeDestinations =
@@ -596,6 +595,21 @@ public abstract class AbstractVirtualHos
PreferencesRoot preferencesRoot = (VirtualHostNode) getParent();
_preferenceStore = preferencesRoot.createPreferenceStore();
+ Iterator<LinkRegistryFactory> linkRegistryFactories = (new QpidServiceLoader()).instancesOf(LinkRegistryFactory.class).iterator();
+ if (linkRegistryFactories.hasNext())
+ {
+ final LinkRegistryFactory linkRegistryFactory = linkRegistryFactories.next();
+ if (linkRegistryFactories.hasNext())
+ {
+ throw new RuntimeException("Found multiple implementations of LinkRegistry");
+ }
+ _linkRegistry = linkRegistryFactory.create(this);
+ }
+ else
+ {
+ _linkRegistry = null;
+ }
+
createHousekeepingExecutor();
}
@@ -1594,15 +1608,10 @@ public abstract class AbstractVirtualHos
}
}
- public synchronized LinkRegistry getLinkRegistry(String remoteContainerId)
+ @Override
+ public <T extends LinkModel> T getLink(String remoteContainerId, String linkName, Class<T> type)
{
- LinkRegistry linkRegistry = _linkRegistry.get(remoteContainerId);
- if(linkRegistry == null)
- {
- linkRegistry = new LinkRegistryImpl();
- _linkRegistry.put(remoteContainerId, linkRegistry);
- }
- return linkRegistry;
+ return _linkRegistry.getLink(remoteContainerId, linkName, type);
}
public DtxRegistry getDtxRegistry()
Copied: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java (from r1785348, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/LinkRegistry.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java?p2=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/LinkRegistry.java&r1=1785348&r2=1785660&rev=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/LinkRegistry.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java Mon Mar 6 15:06:57 2017
@@ -18,17 +18,11 @@
* under the License.
*
*/
-package org.apache.qpid.server.protocol;
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.qpid.server.protocol.LinkModel;
public interface LinkRegistry
{
- LinkModel getDurableSendingLink(String name);
-
- boolean registerSendingLink(String name, LinkModel link);
-
- boolean unregisterSendingLink(String name);
-
- LinkModel getDurableReceivingLink(String name);
-
- boolean registerReceivingLink(String name, LinkModel link);
+ <T extends LinkModel> T getLink(String remoteContainerId, String linkName, Class<T> type);
}
Copied: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryFactory.java (from r1785348, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryFactory.java?p2=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryFactory.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java&r1=1785348&r2=1785660&rev=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryFactory.java Mon Mar 6 15:06:57 2017
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,14 +15,14 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
-package org.apache.qpid.server.protocol.v1_0;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.plugin.Pluggable;
-public interface ReceivingLink_1_0 extends Link_1_0
+public interface LinkRegistryFactory extends Pluggable
{
- void setLinkAttachmentToNull();
+ LinkRegistry create(NamedAddressSpace addressSpace);
}
Added: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLink.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLink.java?rev=1785660&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLink.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLink.java Mon Mar 6 15:06:57 2017
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.protocol.v1_0;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+
+public abstract class AbstractLink<T extends LinkEndpoint<?>> implements Link_1_0
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractLink.class);
+
+ protected final String _linkName;
+ protected T _linkEndpoint;
+ protected volatile BaseSource _source;
+ protected volatile BaseTarget _target;
+
+ public AbstractLink(final String linkName)
+ {
+ _linkName = linkName;
+ }
+
+ @Override
+ public final ListenableFuture<T> attach(final Session_1_0 session, final Attach attach)
+ {
+ final ListenableFuture<T> future;
+ try
+ {
+ boolean isAttachingLocalTerminusNull = (attach.getRole() == Role.SENDER ? attach.getTarget() == null : attach.getSource() == null);
+ boolean isLocalTerminusNull = (attach.getRole() == Role.SENDER ? getTarget() == null : getSource() == null);
+
+ if (isAttachingLocalTerminusNull)
+ {
+ future = recoverLink(session, attach);
+ }
+ else if (isLocalTerminusNull)
+ {
+ future = establishLink(session, attach);
+ }
+ else if (_linkEndpoint != null && _linkEndpoint.getSession() != null && !session.equals(_linkEndpoint.getSession()))
+ {
+ future = stealLink(session, attach);
+ }
+ else if (attach.getUnsettled() != null)
+ {
+ future = resumeLink(session, attach);
+ }
+ else
+ {
+ future = reattachLink(session, attach);
+ }
+ }
+ catch (Throwable t)
+ {
+ return Futures.immediateFailedFuture(t);
+ }
+ AbstractConfiguredObject.addFutureCallback(future, new FutureCallback<T>()
+ {
+ @Override
+ public void onSuccess(final T result)
+ {
+ _linkEndpoint = result;
+ }
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ }
+ }, MoreExecutors.directExecutor());
+ return future;
+ }
+
+ protected abstract ListenableFuture<T> recoverLink(final Session_1_0 session, final Attach attach);
+
+ protected abstract ListenableFuture<T> establishLink(final Session_1_0 session, final Attach attach);
+
+ protected abstract ListenableFuture<T> stealLink(final Session_1_0 session, final Attach attach);
+
+ protected abstract ListenableFuture<T> resumeLink(final Session_1_0 session, final Attach attach);
+
+ protected abstract ListenableFuture<T> reattachLink(final Session_1_0 session, final Attach attach);
+
+ @Override
+ public void linkClosed()
+ {
+ discardEndpoint();
+ }
+
+ @Override
+ public void discardEndpoint()
+ {
+ _linkEndpoint = null;
+ }
+
+ @Override
+ public final String getName()
+ {
+ return _linkName;
+ }
+
+ @Override
+ public BaseSource getSource()
+ {
+ return _source;
+ }
+
+ @Override
+ public BaseTarget getTarget()
+ {
+ return _target;
+ }
+
+ TerminusDurability getLocalTerminusDurability()
+ {
+ if (_linkEndpoint.getRole() == Role.SENDER)
+ {
+ return ((Source) getSource()).getDurable();
+ }
+ else if (getTarget() instanceof Target)
+ {
+ return ((Target) getTarget()).getDurable();
+ }
+ else
+ {
+ return TerminusDurability.NONE;
+ }
+ }
+}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Mon Mar 6 15:06:57 2017
@@ -607,7 +607,7 @@ class ConsumerTarget_1_0 extends Abstrac
{
BaseTarget target = _linkEndpoint.getTarget();
- return target instanceof org.apache.qpid.server.protocol.v1_0.type.messaging.Target ? ((org.apache.qpid.server.protocol.v1_0.type.messaging.Target) target).getAddress() : _linkEndpoint.getName();
+ return target instanceof org.apache.qpid.server.protocol.v1_0.type.messaging.Target ? ((org.apache.qpid.server.protocol.v1_0.type.messaging.Target) target).getAddress() : _linkEndpoint.getLinkName();
}
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java Mon Mar 6 15:06:57 2017
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import static org.apache.qpid.server.protocol.v1_0.Session_1_0.DELAYED_DELIVERY;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -85,6 +87,7 @@ public class ExchangeDestination extends
destinationCapabilities.add(REJECT_UNROUTABLE);
}
destinationCapabilities.add(TOPIC_CAPABILITY);
+ destinationCapabilities.add(DELAYED_DELIVERY);
_capabilities = destinationCapabilities.toArray(new Symbol[destinationCapabilities.size()]);
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java Mon Mar 6 15:06:57 2017
@@ -27,7 +27,11 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
@@ -45,8 +49,9 @@ import org.apache.qpid.server.protocol.v
public abstract class LinkEndpoint<T extends Link_1_0>
{
-
- private T _link;
+ private static final Logger LOGGER = LoggerFactory.getLogger(LinkEndpoint.class);
+ private final T _link;
+ private Session_1_0 _session;
private Object _flowTransactionId;
private SenderSettleMode _sendingSettlementMode;
private ReceiverSettleMode _receivingSettlementMode;
@@ -56,6 +61,15 @@ public abstract class LinkEndpoint<T ext
private volatile boolean _stopped;
private volatile boolean _stoppedUpdated;
private Symbol[] _capabilities;
+ private UnsignedInteger _deliveryCount;
+ private UnsignedInteger _linkCredit;
+ private UnsignedInteger _available;
+ private Boolean _drain;
+ private UnsignedInteger _localHandle;
+ private UnsignedLong _maxMessageSize;
+ private Map<Symbol, Object> _properties;
+
+ protected volatile State _state = State.ATTACH_RECVD;
protected enum State
{
@@ -65,37 +79,37 @@ public abstract class LinkEndpoint<T ext
ATTACHED,
DETACH_SENT,
DETACH_RECVD
- };
+ }
- private final String _name;
- private Session_1_0 _session;
+ LinkEndpoint(final T link)
+ {
+ _link = link;
+ }
+ public abstract void start();
- protected volatile State _state = State.DETACHED;
+ public abstract Role getRole();
- private BaseSource _source;
- private BaseTarget _target;
- private UnsignedInteger _deliveryCount;
- private UnsignedInteger _linkCredit;
- private UnsignedInteger _available;
- private Boolean _drain;
- private UnsignedInteger _localHandle;
- private UnsignedLong _maxMessageSize;
- private Map<Symbol, Object> _properties;
+ public abstract void flowStateChanged();
- LinkEndpoint(final Session_1_0 sessionEndpoint,final Attach attach)
- {
- _session = sessionEndpoint;
+ public abstract void receiveFlow(final Flow flow);
+
+ protected abstract void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled);
+
+ protected abstract void remoteDetachedPerformDetach(final Detach detach);
- _name = attach.getName();
+ protected abstract Map<Symbol,Object> initProperties(final Attach attach);
+
+ public void attachReceived(final Attach attach) throws AmqpErrorException
+ {
+ _sendingSettlementMode = attach.getSndSettleMode();
+ _receivingSettlementMode = attach.getRcvSettleMode();
_initialUnsettledMap = attach.getUnsettled();
_properties = initProperties(attach);
_state = State.ATTACH_RECVD;
}
- public abstract void start();
-
public boolean isStopped()
{
return _stopped;
@@ -111,38 +125,24 @@ public abstract class LinkEndpoint<T ext
}
}
- protected abstract Map<Symbol,Object> initProperties(final Attach attach);
-
- public String getName()
+ public String getLinkName()
{
- return _name;
+ return _link.getName();
}
- public abstract Role getRole();
-
public BaseSource getSource()
{
- return _source;
- }
-
- public NamedAddressSpace getAddressSpace()
- {
- return getSession().getConnection().getAddressSpace();
- }
-
- public void setSource(final BaseSource source)
- {
- _source = source;
+ return _link.getSource();
}
public BaseTarget getTarget()
{
- return _target;
+ return _link.getTarget();
}
- public void setTarget(final BaseTarget target)
+ public NamedAddressSpace getAddressSpace()
{
- _target = target;
+ return getSession().getConnection().getAddressSpace();
}
public void setDeliveryCount(final UnsignedInteger deliveryCount)
@@ -199,12 +199,6 @@ public abstract class LinkEndpoint<T ext
}
}
- protected abstract void remoteDetachedPerformDetach(final Detach detach);
-
- public void receiveFlow(final Flow flow)
- {
- }
-
public void addUnsettled(final Delivery unsettled)
{
}
@@ -221,8 +215,6 @@ public abstract class LinkEndpoint<T ext
}
}
- protected abstract void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled);
-
public void settle(final Binary deliveryTag)
{
@@ -233,51 +225,6 @@ public abstract class LinkEndpoint<T ext
_localHandle = localHandle;
}
- void receiveAttach(final Attach attach)
- {
- switch (_state)
- {
- case ATTACH_SENT:
- {
-
- _state = State.ATTACHED;
-
- _initialUnsettledMap = attach.getUnsettled();
- /* TODO - don't yet handle:
-
- attach.getProperties();
- attach.getDurable();
- attach.getExpiryPolicy();
- attach.getTimeout();
- */
-
- break;
- }
-
- case DETACHED:
- {
- _state = State.ATTACH_RECVD;
- break;
- }
-
-
- }
-
- if (attach.getRole() == Role.SENDER)
- {
- _source = attach.getSource();
- }
- else
- {
- _target = attach.getTarget();
- }
-
- if (getRole() == Role.SENDER)
- {
- _maxMessageSize = attach.getMaxMessageSize();
- }
- }
-
boolean isAttached()
{
return _state == State.ATTACHED;
@@ -293,21 +240,32 @@ public abstract class LinkEndpoint<T ext
return _session;
}
- public void setSession(final Session_1_0 session)
+ public void associateSession(final Session_1_0 session)
{
+ if (session == null)
+ {
+ throw new IllegalStateException("To dissociate session from Endpoint call LinkEndpoint#dissociateSession() "
+ + "instead of LinkEndpoint#associate(null)");
+ }
_session = session;
}
+ public void dissociateSession()
+ {
+ setLocalHandle(null);
+ _session = null;
+ getLink().discardEndpoint();
+ }
+
UnsignedInteger getLocalHandle()
{
return _localHandle;
}
-
public void attach()
{
Attach attachToSend = new Attach();
- attachToSend.setName(getName());
+ attachToSend.setName(getLinkName());
attachToSend.setRole(getRole());
attachToSend.setHandle(getLocalHandle());
attachToSend.setSource(getSource());
@@ -332,14 +290,13 @@ public abstract class LinkEndpoint<T ext
_state = State.ATTACHED;
break;
default:
- // TODO ERROR
+ throw new UnsupportedOperationException(_state.toString());
}
getSession().sendAttach(attachToSend);
}
-
public void detach()
{
detach(null, false);
@@ -350,14 +307,14 @@ public abstract class LinkEndpoint<T ext
detach(null, true);
}
- public void close(Error error)
+ public void detach(Error error)
{
- detach(error, true);
+ detach(error, false);
}
- public void detach(Error error)
+ public void close(Error error)
{
- detach(error, false);
+ detach(error, true);
}
private void detach(Error error, boolean close)
@@ -375,20 +332,26 @@ public abstract class LinkEndpoint<T ext
return;
}
- if (!(getSession().getSessionState() == SessionState.END_RECVD || getSession().isEnded()))
+ if (getSession().getSessionState() != SessionState.END_RECVD && !getSession().isEnded())
{
Detach detach = new Detach();
detach.setHandle(getLocalHandle());
if (close)
+ {
detach.setClosed(close);
+ }
detach.setError(error);
getSession().sendDetach(detach);
}
- }
-
-
+ if (close)
+ {
+ dissociateSession();
+ _link.linkClosed();
+ }
+ setLocalHandle(null);
+ }
public void setTransactionId(final Object txnId)
{
@@ -477,16 +440,6 @@ public abstract class LinkEndpoint<T ext
return _link;
}
- public void setLink(final T link)
- {
- _link = link;
- }
-
- public void setSendingSettlementMode(SenderSettleMode sendingSettlementMode)
- {
- _sendingSettlementMode = sendingSettlementMode;
- }
-
public SenderSettleMode getSendingSettlementMode()
{
return _sendingSettlementMode;
@@ -497,11 +450,6 @@ public abstract class LinkEndpoint<T ext
return _receivingSettlementMode;
}
- public void setReceivingSettlementMode(ReceiverSettleMode receivingSettlementMode)
- {
- _receivingSettlementMode = receivingSettlementMode;
- }
-
public List<Symbol> getCapabilities()
{
return _capabilities == null ? null : Collections.unmodifiableList(Arrays.asList(_capabilities));
@@ -517,9 +465,6 @@ public abstract class LinkEndpoint<T ext
return _initialUnsettledMap;
}
-
- public abstract void flowStateChanged();
-
public void setLocalUnsettled(Map unsettled)
{
_localUnsettled = unsettled;
@@ -528,12 +473,12 @@ public abstract class LinkEndpoint<T ext
@Override public String toString()
{
return "LinkEndpoint{" +
- "_name='" + _name + '\'' +
+ "_name='" + getLinkName() + '\'' +
", _session=" + _session +
", _state=" + _state +
", _role=" + getRole() +
- ", _source=" + _source +
- ", _target=" + _target +
+ ", _source=" + getSource() +
+ ", _target=" + getTarget() +
", _transferCount=" + _deliveryCount +
", _linkCredit=" + _linkCredit +
", _available=" + _available +
Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryFactoryImpl.java (from r1785348, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryFactoryImpl.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryFactoryImpl.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java&r1=1785348&r2=1785660&rev=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryFactoryImpl.java Mon Mar 6 15:06:57 2017
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,24 +15,27 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
+
package org.apache.qpid.server.protocol.v1_0;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.TxnCapability;
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.plugin.PluggableService;
+import org.apache.qpid.server.virtualhost.LinkRegistry;
+import org.apache.qpid.server.virtualhost.LinkRegistryFactory;
-public class TxnCoordinatorReceivingLink_1_0 implements ReceivingLink_1_0
+@PluggableService
+public class LinkRegistryFactoryImpl implements LinkRegistryFactory
{
- public TxnCoordinatorReceivingLink_1_0(ReceivingLinkEndpoint endpoint)
+ @Override
+ public String getType()
{
- ((Coordinator)endpoint.getTarget()).setCapabilities(TxnCapability.LOCAL_TXN, TxnCapability.MULTI_SSNS_PER_TXN, TxnCapability.MULTI_TXNS_PER_SSN);
+ return "org.apache.qpid.LinkRegistryFactory.amqp_1_0";
}
-
@Override
- public void setLinkAttachmentToNull()
+ public LinkRegistry create(final NamedAddressSpace addressSpace)
{
+ return new LinkRegistryImpl(addressSpace);
}
-
}
Added: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java?rev=1785660&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java Mon Mar 6 15:06:57 2017
@@ -0,0 +1,114 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.protocol.LinkModel;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.LinkRegistry;
+
+public class LinkRegistryImpl implements LinkRegistry
+{
+ private final Map<String, Map<String, SendingLink_1_0>> _sendingLinkRegistry = new HashMap<>();
+ private final Map<String, Map<String, StandardReceivingLink_1_0>> _receivingLinkRegistry = new HashMap<>();
+ private final Map<String, Map<String, TxnCoordinatorReceivingLink_1_0>> _coordinatorLinkRegistry = new HashMap<>();
+ private final NamedAddressSpace _addressSpace;
+
+ LinkRegistryImpl(final NamedAddressSpace addressSpace)
+ {
+ _addressSpace = addressSpace;
+ }
+
+ @Override
+ public synchronized <T extends LinkModel> T getLink(final String remoteContainerId, final String linkName, final Class<T> type)
+ {
+ if (SendingLink_1_0.class.equals(type))
+ {
+ return (T) getSendingLink(remoteContainerId, linkName);
+ }
+ else if (StandardReceivingLink_1_0.class.equals(type))
+ {
+ return (T) getReceivingLink(remoteContainerId, linkName);
+ }
+ else if (TxnCoordinatorReceivingLink_1_0.class.equals(type))
+ {
+ return (T) getCoordinatorLink(remoteContainerId, linkName);
+ }
+ else
+ {
+ throw new ConnectionScopedRuntimeException(String.format("Unsupported link type: '%s'", type.getSimpleName()));
+ }
+ }
+
+ private TxnCoordinatorReceivingLink_1_0 getCoordinatorLink(final String remoteContainerId, final String linkName)
+ {
+ Map<String, TxnCoordinatorReceivingLink_1_0> containerRegistry = _coordinatorLinkRegistry.get(remoteContainerId);
+ if (containerRegistry == null)
+ {
+ containerRegistry = new HashMap<>();
+ _coordinatorLinkRegistry.put(remoteContainerId, containerRegistry);
+ }
+ TxnCoordinatorReceivingLink_1_0 link = containerRegistry.get(linkName);
+ if (link == null)
+ {
+ link = new TxnCoordinatorReceivingLink_1_0(linkName);
+ containerRegistry.put(linkName, link);
+ }
+ return link;
+ }
+
+ private SendingLink_1_0 getSendingLink(final String remoteContainerId, final String linkName)
+ {
+ Map<String, SendingLink_1_0> containerRegistry = _sendingLinkRegistry.get(remoteContainerId);
+ if (containerRegistry == null)
+ {
+ containerRegistry = new HashMap<>();
+ _sendingLinkRegistry.put(remoteContainerId, containerRegistry);
+ }
+ SendingLink_1_0 link = containerRegistry.get(linkName);
+ if (link == null)
+ {
+ link = new SendingLink_1_0(linkName);
+ containerRegistry.put(linkName, link);
+ }
+ return link;
+ }
+
+ private StandardReceivingLink_1_0 getReceivingLink(final String remoteContainerId, final String linkName)
+ {
+ Map<String, StandardReceivingLink_1_0> containerRegistry = _receivingLinkRegistry.get(remoteContainerId);
+ if (containerRegistry == null)
+ {
+ containerRegistry = new HashMap<>();
+ _receivingLinkRegistry.put(remoteContainerId, containerRegistry);
+ }
+ StandardReceivingLink_1_0 link = containerRegistry.get(linkName);
+ if (link == null)
+ {
+ link = new StandardReceivingLink_1_0(linkName);
+ containerRegistry.put(linkName, link);
+ }
+ return link;
+ }
+}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java Mon Mar 6 15:06:57 2017
@@ -20,12 +20,24 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.protocol.LinkModel;
-import org.apache.qpid.server.protocol.v1_0.type.Binary;
-import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
public interface Link_1_0 extends LinkModel
{
+ ListenableFuture<? extends LinkEndpoint<?>> attach(Session_1_0 session, final Attach attach);
+
+ void linkClosed();
+
+ void discardEndpoint();
+
+ String getName();
+
+ BaseSource getSource();
+ BaseTarget getTarget();
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java Mon Mar 6 15:06:57 2017
@@ -25,12 +25,12 @@ import static org.apache.qpid.server.mod
import java.util.ArrayList;
import java.util.List;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
-import org.apache.qpid.server.message.MessageSource;
public class MessageSourceDestination implements SendingDestination
{
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java Mon Mar 6 15:06:57 2017
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import static org.apache.qpid.server.protocol.v1_0.Session_1_0.DELAYED_DELIVERY;
+
import java.util.Arrays;
import java.util.Collections;
@@ -222,8 +224,9 @@ public class NodeReceivingDestination im
@Override
public Symbol[] getCapabilities()
{
- Symbol[] capabilities = new Symbol[1];
+ Symbol[] capabilities = new Symbol[2];
capabilities[0] = _discardUnroutable ? DISCARD_UNROUTABLE : REJECT_UNROUTABLE;
+ capabilities[1] = DELAYED_DELIVERY;
return capabilities;
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java Mon Mar 6 15:06:57 2017
@@ -25,7 +25,6 @@ import java.util.Collections;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java Mon Mar 6 15:06:57 2017
@@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
@@ -43,6 +44,7 @@ public abstract class ReceivingLinkEndpo
private UnsignedInteger _lastDeliveryId;
private ReceivingDestination _receivingDestination;
+ private final SectionDecoder _sectionDecoder;
private static class TransientState
{
@@ -89,12 +91,10 @@ public abstract class ReceivingLinkEndpo
private UnsignedInteger _drainLimit;
- public ReceivingLinkEndpoint(final Session_1_0 session, final Attach attach)
+ public ReceivingLinkEndpoint(final ReceivingLink_1_0 link, final SectionDecoder sectionDecoder)
{
- super(session, attach);
- setDeliveryCount(attach.getInitialDeliveryCount());
- setSendingSettlementMode(attach.getSndSettleMode());
- setReceivingSettlementMode(attach.getRcvSettleMode());
+ super(link);
+ _sectionDecoder = sectionDecoder;
}
@Override
@@ -168,7 +168,6 @@ public abstract class ReceivingLinkEndpo
@Override public void receiveFlow(final Flow flow)
{
- super.receiveFlow(flow);
_remoteDrain = Boolean.TRUE.equals((Boolean) flow.getDrain());
setAvailable(flow.getAvailable());
setDeliveryCount(flow.getDeliveryCount());
@@ -283,9 +282,10 @@ public abstract class ReceivingLinkEndpo
}
}
-
-
-
+ SectionDecoder getSectionDecoder()
+ {
+ return _sectionDecoder;
+ }
@Override
public void settle(Binary deliveryTag)
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java Mon Mar 6 15:06:57 2017
@@ -20,10 +20,6 @@
*/
package org.apache.qpid.server.protocol.v1_0;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-
public interface ReceivingLink_1_0 extends Link_1_0
{
- void setLinkAttachmentToNull();
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java Mon Mar 6 15:06:57 2017
@@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol.
import java.security.AccessControlException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
@@ -95,12 +96,12 @@ public class SendingLinkEndpoint extends
private ConsumerTarget_1_0 _consumerTarget;
private MessageInstanceConsumer<ConsumerTarget_1_0> _consumer;
- public SendingLinkEndpoint(final Session_1_0 session, final Attach attach)
+ public SendingLinkEndpoint(final SendingLink_1_0 link)
{
- super(session, attach);
- setSendingSettlementMode(attach.getSndSettleMode());
- setReceivingSettlementMode(attach.getRcvSettleMode());
- init();
+ super(link);
+ setDeliveryCount(UnsignedInteger.valueOf(0));
+ setAvailable(UnsignedInteger.valueOf(0));
+ setCapabilities(Arrays.asList(AMQPConnection_1_0.SHARED_SUBSCRIPTIONS));
}
@Override
@@ -108,8 +109,9 @@ public class SendingLinkEndpoint extends
{
}
- public void doStuff(final SendingDestination destination) throws AmqpErrorException
+ public void prepareConsumerOptionsAndFilters(final SendingDestination destination) throws AmqpErrorException
{
+ // TODO FIXME: this method might modify the source. this is not good encapsulation. furthermore if it does so then it should inform the link/linkregistry about it!
_destination = destination;
final Source source = (Source) getSource();
@@ -206,11 +208,11 @@ public class SendingLinkEndpoint extends
if(getTarget() instanceof Target)
{
Target target = (Target) getTarget();
- name = target.getAddress() == null ? getName() : target.getAddress();
+ name = target.getAddress() == null ? getLinkName() : target.getAddress();
}
else
{
- name = getName();
+ name = getLinkName();
}
_consumer = _destination.getMessageSource()
@@ -284,12 +286,6 @@ public class SendingLinkEndpoint extends
}
}
- private void init()
- {
- setDeliveryCount(UnsignedInteger.valueOf(0));
- setAvailable(UnsignedInteger.valueOf(0));
- }
-
@Override
public Role getRole()
{
@@ -301,9 +297,9 @@ public class SendingLinkEndpoint extends
return _priority;
}
- public void setDurability(final TerminusDurability durability)
+ public TerminusDurability getTerminusDurability()
{
- _durability = durability;
+ return getLink().getLocalTerminusDurability();
}
public boolean transfer(final Transfer xfr, final boolean decrementCredit)
@@ -358,7 +354,6 @@ public class SendingLinkEndpoint extends
@Override
public void receiveFlow(final Flow flow)
{
- super.receiveFlow(flow);
UnsignedInteger t = flow.getDeliveryCount();
UnsignedInteger c = flow.getLinkCredit();
setDrain(flow.getDrain());
@@ -423,8 +418,8 @@ public class SendingLinkEndpoint extends
//TODO
// if not durable or close
if (Boolean.TRUE.equals(detach.getClosed())
- || !(TerminusDurability.UNSETTLED_STATE.equals(_durability) || TerminusDurability.CONFIGURATION.equals(
- _durability)))
+ || !(TerminusDurability.UNSETTLED_STATE.equals(getTerminusDurability())
+ || TerminusDurability.CONFIGURATION.equals(getTerminusDurability())))
{
Modified state = new Modified();
@@ -436,11 +431,9 @@ public class SendingLinkEndpoint extends
}
_unsettledActionMap.clear();
- close();
-
if (getDestination() instanceof ExchangeDestination
- && (_durability == TerminusDurability.CONFIGURATION
- || _durability == TerminusDurability.UNSETTLED_STATE))
+ && (getTerminusDurability() == TerminusDurability.CONFIGURATION
+ || getTerminusDurability() == TerminusDurability.UNSETTLED_STATE))
{
try
{
@@ -452,30 +445,25 @@ public class SendingLinkEndpoint extends
catch (AccessControlException e)
{
LOGGER.error("Error unregistering subscription", e);
- detach(new Error(AmqpError.NOT_ALLOWED, "Error unregistering subscription"));
+ close(new Error(AmqpError.NOT_ALLOWED, "Error unregistering subscription"));
}
catch (IllegalStateException e)
{
- detach(new Error(AmqpError.RESOURCE_LOCKED, e.getMessage()));
+ close(new Error(AmqpError.RESOURCE_LOCKED, e.getMessage()));
}
catch (NotFoundException e)
{
- detach(new Error(AmqpError.NOT_FOUND, e.getMessage()));
+ close(new Error(AmqpError.NOT_FOUND, e.getMessage()));
}
}
+
+ close();
}
else if (detach.getError() != null && !getSession().isSyntheticError(detach.getError()))
{
- try
- {
- getLink().setLinkAttachment(null, null);
- }
- catch (AmqpErrorException e)
- {
- throw new ConnectionScopedRuntimeException(e);
- }
- getConsumerTarget().flowStateChanged();
detach();
+ dissociateSession();
+ getConsumerTarget().updateNotifyWorkDesired();
}
else
{
@@ -553,66 +541,45 @@ public class SendingLinkEndpoint extends
return _transactionId;
}
- public void doLinkAttachment(final Session_1_0 session, final MessageInstanceConsumer consumer) throws AmqpErrorException
+ @Override
+ public void attachReceived(final Attach attach) throws AmqpErrorException
{
- if (session != null)
- {
- createConsumerTarget();
-
- setSession(session);
- _resumeAcceptedTransfers.clear();
- _resumeFullTransfers.clear();
- final NamedAddressSpace addressSpace = getSession().getConnection().getAddressSpace();
- Map<Binary, MessageInstance> unsettledCopy = new HashMap<>(_unsettledMap2);
- Map initialUnsettledMap = getInitialUnsettledMap();
+ super.attachReceived(attach);
+ final MessageInstanceConsumer consumer = getConsumer();
+ createConsumerTarget();
+ _resumeAcceptedTransfers.clear();
+ _resumeFullTransfers.clear();
+ final NamedAddressSpace addressSpace = getSession().getConnection().getAddressSpace();
+ Map<Binary, MessageInstance> unsettledCopy = new HashMap<>(_unsettledMap2);
+ Map initialUnsettledMap = getInitialUnsettledMap();
- for (Map.Entry<Binary, MessageInstance> entry : unsettledCopy.entrySet())
+ for (Map.Entry<Binary, MessageInstance> entry : unsettledCopy.entrySet())
+ {
+ Binary deliveryTag = entry.getKey();
+ final MessageInstance queueEntry = entry.getValue();
+ if (initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag))
{
- Binary deliveryTag = entry.getKey();
- final MessageInstance queueEntry = entry.getValue();
- if (initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag))
- {
- queueEntry.setRedelivered();
- queueEntry.release(consumer);
- _unsettledMap2.remove(deliveryTag);
- }
- else if (initialUnsettledMap.get(deliveryTag) instanceof Outcome)
- {
- Outcome outcome = (Outcome) initialUnsettledMap.get(deliveryTag);
+ queueEntry.setRedelivered();
+ queueEntry.release(consumer);
+ _unsettledMap2.remove(deliveryTag);
+ }
+ else if (initialUnsettledMap.get(deliveryTag) instanceof Outcome)
+ {
+ Outcome outcome = (Outcome) initialUnsettledMap.get(deliveryTag);
- if (outcome instanceof Accepted)
- {
- AutoCommitTransaction txn = new AutoCommitTransaction(addressSpace.getMessageStore());
- if (consumer.acquires())
- {
- if (queueEntry.acquire() || queueEntry.isAcquired())
- {
- txn.dequeue(Collections.singleton(queueEntry),
- new ServerTransaction.Action()
- {
- public void postCommit()
- {
- queueEntry.delete();
- }
-
- public void onRollback()
- {
- }
- });
- }
- }
- }
- else if (outcome instanceof Released)
+ if (outcome instanceof Accepted)
+ {
+ AutoCommitTransaction txn = new AutoCommitTransaction(addressSpace.getMessageStore());
+ if (consumer.acquires())
{
- AutoCommitTransaction txn = new AutoCommitTransaction(addressSpace.getMessageStore());
- if (consumer.acquires())
+ if (queueEntry.acquire() || queueEntry.isAcquired())
{
txn.dequeue(Collections.singleton(queueEntry),
new ServerTransaction.Action()
{
public void postCommit()
{
- queueEntry.release(consumer);
+ queueEntry.delete();
}
public void onRollback()
@@ -621,16 +588,34 @@ public class SendingLinkEndpoint extends
});
}
}
- //_unsettledMap.remove(deliveryTag);
- initialUnsettledMap.remove(deliveryTag);
- _resumeAcceptedTransfers.add(deliveryTag);
}
- else
+ else if (outcome instanceof Released)
{
- _resumeFullTransfers.add(queueEntry);
- // exists in receivers map, but not yet got an outcome ... should resend with resume = true
+ AutoCommitTransaction txn = new AutoCommitTransaction(addressSpace.getMessageStore());
+ if (consumer.acquires())
+ {
+ txn.dequeue(Collections.singleton(queueEntry),
+ new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ queueEntry.release(consumer);
+ }
+
+ public void onRollback()
+ {
+ }
+ });
+ }
}
- // TODO - else
+ //_unsettledMap.remove(deliveryTag);
+ initialUnsettledMap.remove(deliveryTag);
+ _resumeAcceptedTransfers.add(deliveryTag);
+ }
+ else
+ {
+ _resumeFullTransfers.add(queueEntry);
+ // exists in receivers map, but not yet got an outcome ... should resend with resume = true
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1785660&r1=1785659&r2=1785660&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Mon Mar 6 15:06:57 2017
@@ -20,28 +20,197 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import java.util.Collections;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.State;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
-public class SendingLink_1_0 implements Link_1_0
+public class SendingLink_1_0 extends AbstractLink<SendingLinkEndpoint>
{
+ public SendingLink_1_0(final String linkName)
+ {
+ super(linkName);
+ }
+
+ @Override
+ protected ListenableFuture<SendingLinkEndpoint> stealLink(final Session_1_0 session, final Attach attach)
+ {
+ throw new UnsupportedOperationException("Link stealing is not implemented yet.");
+ /*
+ final SettableFuture<SendingLinkEndpoint> returnFuture = SettableFuture.create();
+ _linkEndpoint.getSession().doOnIOThreadAsync(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ _linkEndpoint.close(new Error(LinkError.STOLEN,
+ String.format("Link is being stolen by connection '%s'",
+ session.getConnection())));
+ try
+ {
+ returnFuture.set(attach(session, attach).get());
+ }
+ catch (InterruptedException e)
+ {
+ returnFuture.setException(e);
+ Thread.currentThread().interrupt();
+ }
+ catch (ExecutionException e)
+ {
+ returnFuture.setException(e.getCause());
+ }
+ }
+ });
+ return returnFuture;
+ */
+ }
- private volatile SendingLinkEndpoint _linkEndpoint;
+ @Override
+ protected ListenableFuture<SendingLinkEndpoint> reattachLink(final Session_1_0 session, final Attach attach)
+ {
+ if (_linkEndpoint == null)
+ {
+ _linkEndpoint = new SendingLinkEndpoint(this);
+ }
+
+ _source = new Source();
+ _target = attach.getTarget();
+
+ try
+ {
+ _linkEndpoint.associateSession(session);
+
+ Source attachSource = (Source) attach.getSource();
+ final SendingDestination destination = session.getSendingDestination(attach.getName(), attachSource);
+ ((Source) getSource()).setAddress(attachSource.getAddress());
+ ((Source) getSource()).setDynamic(attachSource.getDynamic());
+ ((Source) getSource()).setDurable(attachSource.getDurable());
+ ((Source) getSource()).setExpiryPolicy(attachSource.getExpiryPolicy());
+ ((Source) getSource()).setDistributionMode(attachSource.getDistributionMode());
+ ((Source) getSource()).setFilter(attachSource.getFilter());
+ ((Source) getSource()).setCapabilities(destination.getCapabilities());
+ _linkEndpoint.prepareConsumerOptionsAndFilters(destination);
+ _linkEndpoint.attachReceived(attach);
+
+ if (destination instanceof ExchangeDestination)
+ {
+ ExchangeDestination exchangeDestination = (ExchangeDestination) destination;
+ exchangeDestination.getQueue()
+ .setAttributes(Collections.<String, Object>singletonMap(Queue.DESIRED_STATE,
+ State.ACTIVE));
+ }
+ }
+ catch (AmqpErrorException e)
+ {
+ rejectLink(session, attach);
+ }
+ return Futures.immediateFuture(_linkEndpoint);
+ }
- public SendingLink_1_0(final SendingLinkEndpoint linkEndpoint)
+ @Override
+ protected ListenableFuture<SendingLinkEndpoint> resumeLink(final Session_1_0 session, final Attach attach)
{
- _linkEndpoint = linkEndpoint;
+ if (getSource() == null)
+ {
+ throw new IllegalStateException("Terminus should be set when resuming a Link.");
+ }
+ if (attach.getSource() == null)
+ {
+ throw new IllegalStateException("Attach.getSource should not be null when resuming a Link. That would be recovering the Link.");
+ }
+
+ Source newSource = (Source) attach.getSource();
+ Source oldSource = (Source) getSource();
+
+ try
+ {
+ if (_linkEndpoint == null)
+ {
+ _linkEndpoint = new SendingLinkEndpoint(this);
+
+ final SendingDestination destination = session.getSendingDestination(getName(), oldSource);
+ _linkEndpoint.prepareConsumerOptionsAndFilters(destination);
+ }
+
+ if (_linkEndpoint.getDestination() instanceof ExchangeDestination
+ && !Boolean.TRUE.equals(newSource.getDynamic()))
+ {
+ final SendingDestination newDestination =
+ session.getSendingDestination(_linkEndpoint.getLinkName(), newSource);
+ if (session.updateSourceForSubscription(_linkEndpoint, newSource, newDestination))
+ {
+ _linkEndpoint.setDestination(newDestination);
+ }
+ }
+
+ _linkEndpoint.associateSession(session);
+ _linkEndpoint.attachReceived(attach);
+
+ _linkEndpoint.setLocalUnsettled(_linkEndpoint.getUnsettledOutcomeMap());
+ }
+ catch (AmqpErrorException e)
+ {
+ rejectLink(session, attach);
+ }
+ return Futures.immediateFuture(_linkEndpoint);
}
- public SendingLinkEndpoint getEndpoint()
+ @Override
+ protected ListenableFuture<SendingLinkEndpoint> recoverLink(final Session_1_0 session, final Attach attach)
{
- return _linkEndpoint;
+ if (_source == null)
+ {
+ return rejectLink(session, attach);
+ }
+
+ _target = attach.getTarget();
+
+ try
+ {
+ if (_linkEndpoint == null)
+ {
+ _linkEndpoint = new SendingLinkEndpoint(this);
+
+ final SendingDestination destination = session.getSendingDestination(getName(), (Source) _source);
+ _linkEndpoint.prepareConsumerOptionsAndFilters(destination);
+ }
+
+ _linkEndpoint.associateSession(session);
+ _linkEndpoint.attachReceived(attach);
+
+ _linkEndpoint.setLocalUnsettled(_linkEndpoint.getUnsettledOutcomeMap());
+ }
+ catch (AmqpErrorException e)
+ {
+ rejectLink(session, attach);
+ }
+
+ return Futures.immediateFuture(_linkEndpoint);
+ }
+
+ @Override
+ protected ListenableFuture<SendingLinkEndpoint> establishLink(final Session_1_0 session, final Attach attach)
+ {
+ if (_linkEndpoint != null || getSource() != null)
+ {
+ throw new IllegalStateException("LinkEndpoint and Source should be null when establishing a Link.");
+ }
+
+ return reattachLink(session, attach);
}
- public synchronized void setLinkAttachment(final Session_1_0 session,
- final SendingLinkEndpoint linkEndpoint) throws AmqpErrorException
+ private ListenableFuture<SendingLinkEndpoint> rejectLink(final Session_1_0 session, final Attach attach)
{
- _linkEndpoint = linkEndpoint;
- _linkEndpoint.doLinkAttachment(session, getEndpoint().getConsumer());
+ _linkEndpoint = new SendingLinkEndpoint(this);
+ _linkEndpoint.associateSession(session);
+ _source = null;
+ return Futures.immediateFuture(_linkEndpoint);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org