You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2017/01/05 17:19:34 UTC
svn commit: r1777483 -
/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/
Author: rgodfrey
Date: Thu Jan 5 17:19:33 2017
New Revision: 1777483
URL: http://svn.apache.org/viewvc?rev=1777483&view=rev
Log:
QPID-7568 : Address review comments by [~k-wall]
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java?rev=1777483&r1=1777482&r2=1777483&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java Thu Jan 5 17:19:33 2017
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
@@ -134,6 +135,12 @@ public class ExchangeDestination impleme
}
@Override
+ public MessageDestination getMessageDestination()
+ {
+ return _exchange;
+ }
+
+ @Override
public void authorizePublish(final SecurityToken securityToken,
final String routingAddress)
{
@@ -232,9 +239,8 @@ public class ExchangeDestination impleme
@Override
public Symbol[] getCapabilities()
{
- Symbol[] capabilities = new Symbol[2];
+ Symbol[] capabilities = new Symbol[1];
capabilities[0] = _discardUnroutable ? DISCARD_UNROUTABLE : REJECT_UNROUTABLE;
- capabilities[1] = DELAYED_DELIVERY;
return capabilities;
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java?rev=1777483&r1=1777482&r2=1777483&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java Thu Jan 5 17:19:33 2017
@@ -21,15 +21,18 @@
package org.apache.qpid.server.protocol.v1_0;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
-import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
-import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
@@ -52,6 +55,7 @@ public abstract class LinkEndpoint<T ext
private UnsignedInteger _lastSentCreditLimit;
private volatile boolean _stopped;
private volatile boolean _stoppedUpdated;
+ private Symbol[] _capabilities;
private enum State
{
@@ -303,6 +307,7 @@ public abstract class LinkEndpoint<T ext
attachToSend.setRcvSettleMode(getReceivingSettlementMode());
attachToSend.setUnsettled(_localUnsettled);
attachToSend.setProperties(_properties);
+ attachToSend.setOfferedCapabilities(_capabilities);
if (getRole() == Role.SENDER)
{
@@ -488,6 +493,16 @@ public abstract class LinkEndpoint<T ext
_receivingSettlementMode = receivingSettlementMode;
}
+ public List<Symbol> getCapabilities()
+ {
+ return _capabilities == null ? null : Collections.unmodifiableList(Arrays.asList(_capabilities));
+ }
+
+ public void setCapabilities(Collection<Symbol> capabilities)
+ {
+ _capabilities = capabilities == null ? null : capabilities.toArray(new Symbol[capabilities.size()]);
+ }
+
public Map getInitialUnsettledMap()
{
return _initialUnsettledMap;
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java?rev=1777483&r1=1777482&r2=1777483&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java Thu Jan 5 17:19:33 2017
@@ -131,6 +131,12 @@ public class NodeReceivingDestination im
}
@Override
+ public MessageDestination getMessageDestination()
+ {
+ return _destination;
+ }
+
+ @Override
public void authorizePublish(final SecurityToken securityToken,
final String routingAddress)
{
@@ -203,9 +209,8 @@ public class NodeReceivingDestination im
@Override
public Symbol[] getCapabilities()
{
- Symbol[] capabilities = new Symbol[2];
+ Symbol[] capabilities = new Symbol[1];
capabilities[0] = _discardUnroutable ? DISCARD_UNROUTABLE : REJECT_UNROUTABLE;
- capabilities[1] = DELAYED_DELIVERY;
return capabilities;
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java?rev=1777483&r1=1777482&r2=1777483&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java Thu Jan 5 17:19:33 2017
@@ -20,17 +20,14 @@
*/
package org.apache.qpid.server.protocol.v1_0;
-import java.util.Arrays;
import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
-import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.store.MessageEnqueueRecord;
@@ -124,13 +121,8 @@ public class QueueDestination extends Me
}
@Override
- public Symbol[] getCapabilities()
+ public MessageDestination getMessageDestination()
{
- Set<Symbol> capabilities = new HashSet<>(Arrays.asList(super.getCapabilities()));
- if(_queue.isHoldOnPublishEnabled())
- {
- capabilities.add(DELAYED_DELIVERY);
- }
- return capabilities.toArray(new Symbol[capabilities.size()]);
+ return _queue;
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java?rev=1777483&r1=1777482&r2=1777483&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java Thu Jan 5 17:19:33 2017
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
@@ -34,7 +35,6 @@ public interface ReceivingDestination ex
Symbol REJECT_UNROUTABLE = Symbol.valueOf("REJECT_UNROUTABLE");
Symbol DISCARD_UNROUTABLE = Symbol.valueOf("DISCARD_UNROUTABLE");
- Symbol DELAYED_DELIVERY = Symbol.valueOf("DELAYED_DELIVERY");
Outcome[] getOutcomes();
@@ -51,4 +51,6 @@ public interface ReceivingDestination ex
String getAddress();
void authorizePublish(SecurityToken securityToken, final String routingAddress);
+
+ MessageDestination getMessageDestination();
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1777483&r1=1777482&r2=1777483&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Thu Jan 5 17:19:33 2017
@@ -32,6 +32,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -114,6 +115,7 @@ import org.apache.qpid.transport.network
public class Session_1_0 implements AMQSessionModel<Session_1_0, ConsumerTarget_1_0>, LogSubject
{
+ public static final Symbol DELAYED_DELIVERY = Symbol.valueOf("DELAYED_DELIVERY");
private static final Logger _logger = LoggerFactory.getLogger(Session_1_0.class);
private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
private static final EnumSet<SessionState> END_STATES =
@@ -776,7 +778,7 @@ public class Session_1_0 implements AMQS
Error error = null;
final LinkRegistry linkRegistry = getAddressSpace().getLinkRegistry(getConnection().getRemoteContainerId());
-
+ Set<Symbol> capabilities = new HashSet<>();
if (endpoint.getRole() == Role.SENDER)
{
@@ -910,175 +912,176 @@ public class Session_1_0 implements AMQS
}
}
- else
+ else if (endpoint.getTarget() instanceof Coordinator)
{
- if (endpoint.getTarget() instanceof Coordinator)
+ Coordinator coordinator = (Coordinator) endpoint.getTarget();
+ TxnCapability[] coordinatorCapabilities = coordinator.getCapabilities();
+ boolean localTxn = false;
+ boolean multiplePerSession = false;
+ if (coordinatorCapabilities != null)
{
- Coordinator coordinator = (Coordinator) endpoint.getTarget();
- TxnCapability[] capabilities = coordinator.getCapabilities();
- boolean localTxn = false;
- boolean multiplePerSession = false;
- if (capabilities != null)
+ for (TxnCapability capability : coordinatorCapabilities)
{
- for (TxnCapability capability : capabilities)
+ if (capability.equals(TxnCapability.LOCAL_TXN))
{
- if (capability.equals(TxnCapability.LOCAL_TXN))
- {
- localTxn = true;
- }
- else if (capability.equals(TxnCapability.MULTI_TXNS_PER_SSN))
- {
- multiplePerSession = true;
- }
- else
- {
- error = new Error();
- error.setCondition(AmqpError.NOT_IMPLEMENTED);
- error.setDescription("Unsupported capability: " + capability);
- break;
- }
+ localTxn = true;
+ }
+ else if (capability.equals(TxnCapability.MULTI_TXNS_PER_SSN))
+ {
+ multiplePerSession = true;
+ }
+ else
+ {
+ error = new Error();
+ error.setCondition(AmqpError.NOT_IMPLEMENTED);
+ error.setDescription("Unsupported capability: " + capability);
+ break;
}
}
+ }
/* if(!localTxn)
{
- capabilities.add(TxnCapabilities.LOCAL_TXN);
+ coordinatorCapabilities.add(TxnCapabilities.LOCAL_TXN);
}*/
- final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
- final TxnCoordinatorReceivingLink_1_0 coordinatorLink =
- new TxnCoordinatorReceivingLink_1_0(getAddressSpace(),
- this,
- receivingLinkEndpoint,
- _openTransactions);
- receivingLinkEndpoint.setLink(coordinatorLink);
- link = coordinatorLink;
+ final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
+ final TxnCoordinatorReceivingLink_1_0 coordinatorLink =
+ new TxnCoordinatorReceivingLink_1_0(getAddressSpace(),
+ this,
+ receivingLinkEndpoint,
+ _openTransactions);
+ receivingLinkEndpoint.setLink(coordinatorLink);
+ link = coordinatorLink;
+ }
+ else // standard (non-Coordinator) receiver
+ {
+ StandardReceivingLink_1_0 previousLink =
+ (StandardReceivingLink_1_0) linkRegistry.getDurableReceivingLink(endpoint.getName());
- }
- else
+ if (previousLink == null)
{
- StandardReceivingLink_1_0 previousLink =
- (StandardReceivingLink_1_0) linkRegistry.getDurableReceivingLink(endpoint.getName());
+ Target target = (Target) endpoint.getTarget();
- if (previousLink == null)
+ if (target != null)
{
+ if (Boolean.TRUE.equals(target.getDynamic()))
+ {
- Target target = (Target) endpoint.getTarget();
+ MessageDestination tempQueue = createDynamicDestination(target.getDynamicNodeProperties());
+ target.setAddress(tempQueue.getName());
+ }
- if (target != null)
+ String addr = target.getAddress();
+ if (addr == null || "".equals(addr.trim()))
{
- if (Boolean.TRUE.equals(target.getDynamic()))
- {
+ MessageDestination messageDestination = getAddressSpace().getDefaultDestination();
+ destination = new NodeReceivingDestination(messageDestination, target.getDurable(),
+ target.getExpiryPolicy(), "",
+ target.getCapabilities(),
+ _connection.getEventLogger());
+ target.setCapabilities(destination.getCapabilities());
- MessageDestination tempQueue = createDynamicDestination(target.getDynamicNodeProperties());
- target.setAddress(tempQueue.getName());
+ if (_blockingEntities.contains(messageDestination))
+ {
+ endpoint.setStopped(true);
}
-
- String addr = target.getAddress();
- if (addr == null || "".equals(addr.trim()))
+ }
+ else if (!addr.startsWith("/") && addr.contains("/"))
+ {
+ String[] parts = addr.split("/", 2);
+ Exchange<?> exchange = getExchange(parts[0]);
+ if (exchange != null)
{
- MessageDestination messageDestination = getAddressSpace().getDefaultDestination();
- destination = new NodeReceivingDestination(messageDestination, target.getDurable(),
- target.getExpiryPolicy(), "",
- target.getCapabilities(),
- _connection.getEventLogger());
- target.setCapabilities(destination.getCapabilities());
-
- if(_blockingEntities.contains(messageDestination))
- {
- endpoint.setStopped(true);
- }
+ ExchangeDestination exchangeDestination =
+ new ExchangeDestination(exchange,
+ target.getDurable(),
+ target.getExpiryPolicy(),
+ parts[0],
+ target.getCapabilities());
+ exchangeDestination.setInitialRoutingAddress(parts[1]);
+ target.setCapabilities(exchangeDestination.getCapabilities());
+ destination = exchangeDestination;
}
- else if (!addr.startsWith("/") && addr.contains("/"))
+ else
{
- String[] parts = addr.split("/", 2);
- Exchange<?> exchange =getExchange(parts[0]);
- if (exchange != null)
- {
- ExchangeDestination exchangeDestination =
- new ExchangeDestination(exchange,
- target.getDurable(),
- target.getExpiryPolicy(),
- parts[0],
- target.getCapabilities());
-
- exchangeDestination.setInitialRoutingAddress(parts[1]);
- target.setCapabilities(exchangeDestination.getCapabilities());
- destination = exchangeDestination;
-
- }
- else
- {
- endpoint.setTarget(null);
- destination = null;
- }
+ endpoint.setTarget(null);
+ destination = null;
+ }
+ }
+ else
+ {
+ MessageDestination messageDestination =
+ getAddressSpace().getAttainedMessageDestination(addr);
+ if (messageDestination != null)
+ {
+ destination =
+ new NodeReceivingDestination(messageDestination,
+ target.getDurable(),
+ target.getExpiryPolicy(),
+ addr,
+ target.getCapabilities(),
+ _connection.getEventLogger());
+ target.setCapabilities(destination.getCapabilities());
}
else
{
- MessageDestination messageDestination =
- getAddressSpace().getAttainedMessageDestination(addr);
- if (messageDestination != null)
+ Queue<?> queue = getQueue(addr);
+ if (queue != null)
{
- destination =
- new NodeReceivingDestination(messageDestination, target.getDurable(),
- target.getExpiryPolicy(), addr, target.getCapabilities(), _connection.getEventLogger());
- target.setCapabilities(destination.getCapabilities());
+ destination = new QueueDestination(queue, addr);
}
else
{
- Queue<?> queue = getQueue(addr);
- if (queue != null)
- {
-
- destination = new QueueDestination(queue, addr);
- }
- else
- {
- endpoint.setTarget(null);
- destination = null;
- }
-
+ endpoint.setTarget(null);
+ destination = null;
}
}
-
}
- else
+ }
+ else
+ {
+ destination = null;
+ }
+ if (destination != null)
+ {
+ final ReceivingDestination receivingDestination = (ReceivingDestination) destination;
+ MessageDestination messageDestination = receivingDestination.getMessageDestination();
+ if(!(messageDestination instanceof Queue) || ((Queue<?>)messageDestination).isHoldOnPublishEnabled())
{
- destination = null;
+ capabilities.add(DELAYED_DELIVERY);
}
- if (destination != null)
+ final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
+ final StandardReceivingLink_1_0 receivingLink =
+ new StandardReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint),
+ getAddressSpace(),
+ receivingDestination);
+
+ receivingLinkEndpoint.setLink(receivingLink);
+ link = receivingLink;
+ if (TerminusDurability.UNSETTLED_STATE.equals(target.getDurable())
+ || TerminusDurability.CONFIGURATION.equals(target.getDurable()))
{
- final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
- final StandardReceivingLink_1_0 receivingLink =
- new StandardReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint),
- getAddressSpace(),
- (ReceivingDestination) destination);
-
- receivingLinkEndpoint.setLink(receivingLink);
- link = receivingLink;
- if (TerminusDurability.UNSETTLED_STATE.equals(target.getDurable())
- || TerminusDurability.CONFIGURATION.equals(target.getDurable()))
- {
- linkRegistry.registerReceivingLink(endpoint.getName(), receivingLink);
- }
+ linkRegistry.registerReceivingLink(endpoint.getName(), receivingLink);
}
}
- else
- {
- ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
- previousLink.setLinkAttachment(new ReceivingLinkAttachment(this, receivingLinkEndpoint));
- receivingLinkEndpoint.setLink(previousLink);
- link = previousLink;
- endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
-
- }
+ }
+ else
+ {
+ ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
+ previousLink.setLinkAttachment(new ReceivingLinkAttachment(this, receivingLinkEndpoint));
+ receivingLinkEndpoint.setLink(previousLink);
+ link = previousLink;
+ endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
}
}
+
+ endpoint.setCapabilities(capabilities);
endpoint.attach();
if (link == null)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org