You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/28 14:20:41 UTC
svn commit: r1771747 - in
/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0:
ExchangeDestination.java NodeReceivingDestination.java Session_1_0.java
Author: rgodfrey
Date: Mon Nov 28 14:20:41 2016
New Revision: 1771747
URL: http://svn.apache.org/viewvc?rev=1771747&view=rev
Log:
QPID-7546 : Fix logging of discarded messages for ExchangeLoggingTest
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java?rev=1771747&r1=1771746&r2=1771747&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java Mon Nov 28 14:20:41 2016
@@ -26,6 +26,7 @@ import java.util.Collections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
@@ -98,14 +99,19 @@ public class ExchangeDestination impleme
return null;
}};
+ final String routingAddress = getRoutingAddress(message);
int enqueues = _exchange.send(message,
- getRoutingAddress(message),
+ routingAddress,
instanceProperties,
txn,
null);
+ if(enqueues == 0)
+ {
+ _exchange.getEventLogger().message(ExchangeMessages.DISCARDMSG(_exchange.getName(), routingAddress));
+ }
- return enqueues == 0 && !_discardUnroutable ? createdRejectedOutcome(getRoutingAddress(message)) : ACCEPTED;
+ return enqueues == 0 && !_discardUnroutable ? createdRejectedOutcome(routingAddress) : ACCEPTED;
}
private Outcome createdRejectedOutcome(final String routingAddress)
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java?rev=1771747&r1=1771746&r2=1771747&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java Mon Nov 28 14:20:41 2016
@@ -23,6 +23,8 @@ package org.apache.qpid.server.protocol.
import java.util.Arrays;
import java.util.Collections;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.model.Exchange;
@@ -43,6 +45,7 @@ public class NodeReceivingDestination im
public static final Rejected REJECTED = new Rejected();
private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED};
private final boolean _discardUnroutable;
+ private final EventLogger _eventLogger;
private MessageDestination _destination;
private TerminusDurability _durability;
@@ -52,12 +55,14 @@ public class NodeReceivingDestination im
public NodeReceivingDestination(MessageDestination destination,
TerminusDurability durable,
TerminusExpiryPolicy expiryPolicy,
- final String address, final Symbol[] capabilities)
+ final String address, final Symbol[] capabilities,
+ final EventLogger eventLogger)
{
_destination = destination;
_durability = durable;
_expiryPolicy = expiryPolicy;
_address = address;
+ _eventLogger = eventLogger;
_discardUnroutable = destination instanceof Exchange
&& ((capabilities != null && Arrays.asList(capabilities).contains(DISCARD_UNROUTABLE))
|| ((Exchange)destination).getUnroutableMessageBehaviour() == Exchange.UnroutableMessageBehaviour.DISCARD);
@@ -99,6 +104,10 @@ public class NodeReceivingDestination im
int enqueues = _destination.send(message, routingAddress, instanceProperties, txn, null);
+ if(enqueues == 0)
+ {
+ _eventLogger.message(ExchangeMessages.DISCARDMSG(_destination.getName(), routingAddress));
+ }
return enqueues == 0 && !_discardUnroutable ? createdRejectedOutcome(message) : ACCEPTED;
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1771747&r1=1771746&r2=1771747&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Mon Nov 28 14:20:41 2016
@@ -967,7 +967,8 @@ public class Session_1_0 implements AMQS
MessageDestination messageDestination = getAddressSpace().getDefaultDestination();
destination = new NodeReceivingDestination(messageDestination, target.getDurable(),
target.getExpiryPolicy(), "",
- target.getCapabilities());
+ target.getCapabilities(),
+ _connection.getEventLogger());
target.setCapabilities(destination.getCapabilities());
if(_blockingEntities.contains(messageDestination))
@@ -1008,7 +1009,7 @@ public class Session_1_0 implements AMQS
{
destination =
new NodeReceivingDestination(messageDestination, target.getDurable(),
- target.getExpiryPolicy(), addr, target.getCapabilities());
+ target.getExpiryPolicy(), addr, target.getCapabilities(), _connection.getEventLogger());
target.setCapabilities(destination.getCapabilities());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org