You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2011/09/15 14:45:50 UTC
svn commit: r1171079 - in /qpid/trunk/qpid/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/exchange/
broker/src/main/java/org/apache/qpid/server/logging/messages/
broker/src/main/java/org/apache/qpid/serve...
Author: kwall
Date: Thu Sep 15 12:45:49 2011
New Revision: 1171079
URL: http://svn.apache.org/viewvc?rev=1171079&view=rev
Log:
QPID-2672: Unroutable persistent messages should be immediately removed from store. Added new operational logging message (EXH-1003 - Discarded Message) that reports when the Broker drops an unroutable message. Added new System Test.
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Exchange_logmessages.properties
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ExchangeMessagesTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1171079&r1=1171078&r2=1171079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu Sep 15 12:45:49 2011
@@ -50,6 +50,7 @@ import org.apache.qpid.server.logging.Lo
import org.apache.qpid.server.logging.actors.AMQPChannelActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.MessageMetaData;
@@ -315,7 +316,6 @@ public class AMQChannel implements Sessi
try
{
_currentMessage.getStoredMessage().flushToStore();
-
final ArrayList<? extends BaseQueue> destinationQueues = _currentMessage.getDestinationQueues();
if(!checkMessageUserId(_currentMessage.getContentHeader()))
@@ -324,7 +324,7 @@ public class AMQChannel implements Sessi
}
else
{
- if(destinationQueues == null || _currentMessage.getDestinationQueues().isEmpty())
+ if(destinationQueues == null || destinationQueues.isEmpty())
{
if (_currentMessage.isMandatory() || _currentMessage.isImmediate())
{
@@ -332,7 +332,7 @@ public class AMQChannel implements Sessi
}
else
{
- _logger.warn("MESSAGE DISCARDED: No routes for message - " + createAMQMessage(_currentMessage));
+ _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchange().asString(), _currentMessage.getRoutingKey()));
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1171079&r1=1171078&r2=1171079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Thu Sep 15 12:45:49 2011
@@ -356,7 +356,7 @@ public abstract class AbstractExchange i
_receivedMessageCount.incrementAndGet();
_receivedMessageSize.addAndGet(message.getSize());
final ArrayList<? extends BaseQueue> queues = doRoute(message);
- if(queues != null && !queues.isEmpty())
+ if(!queues.isEmpty())
{
_routedMessageCount.incrementAndGet();
_routedMessageSize.addAndGet(message.getSize());
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=1171079&r1=1171078&r2=1171079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java Thu Sep 15 12:45:49 2011
@@ -30,15 +30,12 @@ import org.apache.qpid.server.queue.AMQQ
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.binding.BindingFactory;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.ExchangeConfig;
import javax.management.JMException;
import java.util.ArrayList;
-import java.util.List;
import java.util.Collection;
-import java.util.concurrent.CopyOnWriteArrayList;
public interface Exchange extends ExchangeReferrer, ExchangeConfig
{
@@ -67,7 +64,12 @@ public interface Exchange extends Exchan
void close() throws AMQException;
-
+ /**
+ * Returns a list of queues to which to route this message. If there are
+ * no queues the empty list must be returned.
+ *
+ * @return list of queues to which to route the message.
+ */
ArrayList<? extends BaseQueue> route(InboundMessage message);
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Exchange_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Exchange_logmessages.properties?rev=1171079&r1=1171078&r2=1171079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Exchange_logmessages.properties (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Exchange_logmessages.properties Thu Sep 15 12:45:49 2011
@@ -21,4 +21,5 @@
# 0 - type
# 1 - name
CREATED = EXH-1001 : Create :[ Durable] Type: {0} Name: {1}
-DELETED = EXH-1002 : Deleted
\ No newline at end of file
+DELETED = EXH-1002 : Deleted
+DISCARDMSG = EXH-1003 : Discarded Message : Name: {0} Routing Key: {1}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java?rev=1171079&r1=1171078&r2=1171079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java Thu Sep 15 12:45:49 2011
@@ -34,7 +34,7 @@ import org.apache.qpid.transport.codec.B
import java.nio.ByteBuffer;
import java.lang.ref.SoftReference;
-public class MessageMetaData_0_10 implements StorableMessageMetaData
+public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMessage
{
private Header _header;
private DeliveryProperties _deliveryProps;
@@ -194,6 +194,12 @@ public class MessageMetaData_0_10 implem
return _deliveryProps == null ? 0L : _deliveryProps.getExpiration();
}
+ public boolean isRedelivered()
+ {
+ // The *Message* is never redelivered, only queue entries are...
+ return false;
+ }
+
public long getArrivalTime()
{
return _arrivalTime;
@@ -239,4 +245,6 @@ public class MessageMetaData_0_10 implem
}
}
+
+
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1171079&r1=1171078&r2=1171079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Thu Sep 15 12:45:49 2011
@@ -40,6 +40,7 @@ import org.apache.qpid.server.filter.Fil
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.flow.FlowCreditManager_0_10;
import org.apache.qpid.server.flow.WindowCreditManager;
+import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.message.MessageMetaData_0_10;
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.queue.AMQQueue;
@@ -167,7 +168,6 @@ public class ServerSessionDelegate exten
@Override
public void messageSubscribe(Session session, MessageSubscribe method)
{
-
//TODO - work around broken Python tests
if(!method.hasAcceptMode())
{
@@ -284,25 +284,10 @@ public class ServerSessionDelegate exten
}
}
-
@Override
public void messageTransfer(Session ssn, MessageTransfer xfr)
{
- ExchangeRegistry exchangeRegistry = getExchangeRegistry(ssn);
- Exchange exchange;
- if(xfr.hasDestination())
- {
- exchange = exchangeRegistry.getExchange(xfr.getDestination());
- if(exchange == null)
- {
- exchange = exchangeRegistry.getDefaultExchange();
- }
- }
- else
- {
- exchange = exchangeRegistry.getDefaultExchange();
- }
-
+ final Exchange exchange = getExchangeForMessage(ssn, xfr);
DeliveryProperties delvProps = null;
if(xfr.getHeader() != null && (delvProps = xfr.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration())
@@ -310,7 +295,7 @@ public class ServerSessionDelegate exten
delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
}
- MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
+ final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
if (!getVirtualHost(ssn).getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName()))
{
@@ -320,66 +305,65 @@ public class ServerSessionDelegate exten
return;
}
-
- final MessageStore store = getVirtualHost(ssn).getMessageStore();
- StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData);
- ByteBuffer body = xfr.getBody();
- if(body != null)
+
+ final Exchange exchangeInUse;
+ ArrayList<? extends BaseQueue> queues = exchange.route(messageMetaData);
+ if(queues.isEmpty() && exchange.getAlternateExchange() != null)
+ {
+ final Exchange alternateExchange = exchange.getAlternateExchange();
+ queues = alternateExchange.route(messageMetaData);
+ if (!queues.isEmpty())
+ {
+ exchangeInUse = alternateExchange;
+ }
+ else
+ {
+ exchangeInUse = exchange;
+ }
+ }
+ else
{
- storeMessage.addContent(0, body);
+ exchangeInUse = exchange;
}
- storeMessage.flushToStore();
- MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference());
-
- ArrayList<? extends BaseQueue> queues = exchange.route(message);
-
-
- if(queues != null && queues.size() != 0)
+ if(!queues.isEmpty())
{
+ final MessageStore store = getVirtualHost(ssn).getMessageStore();
+ final StoredMessage<MessageMetaData_0_10> storeMessage = createAndFlushStoreMessage(xfr, messageMetaData, store);
+ MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference());
((ServerSession) ssn).enqueue(message, queues);
}
else
{
- if(delvProps == null || !delvProps.hasDiscardUnroutable() || !delvProps.getDiscardUnroutable())
+ if((delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
{
- if(xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
- {
- RangeSet rejects = new RangeSet();
- rejects.add(xfr.getId());
- MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
- ssn.invoke(reject);
- }
- else
- {
- Exchange alternate = exchange.getAlternateExchange();
- if(alternate != null)
- {
- queues = alternate.route(message);
- if(queues != null && queues.size() != 0)
- {
- ((ServerSession) ssn).enqueue(message, queues);
- }
- else
- {
- //TODO - log the message discard
- }
- }
- else
- {
- //TODO - log the message discard
- }
-
-
- }
+ RangeSet rejects = new RangeSet();
+ rejects.add(xfr.getId());
+ MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
+ ssn.invoke(reject);
+ }
+ else
+ {
+ ((ServerSession) ssn).getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(), messageMetaData.getRoutingKey()));
}
-
-
}
ssn.processed(xfr);
}
+ private StoredMessage<MessageMetaData_0_10> createAndFlushStoreMessage(final MessageTransfer xfr,
+ final MessageMetaData_0_10 messageMetaData, final MessageStore store)
+ {
+ final StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData);
+ ByteBuffer body = xfr.getBody();
+ if(body != null)
+ {
+ storeMessage.addContent(0, body);
+ }
+ storeMessage.flushToStore();
+ return storeMessage;
+ }
+
@Override
public void messageCancel(Session session, MessageCancel method)
{
@@ -582,6 +566,25 @@ public class ServerSessionDelegate exten
}
+ private Exchange getExchangeForMessage(Session ssn, MessageTransfer xfr)
+ {
+ final ExchangeRegistry exchangeRegistry = getExchangeRegistry(ssn);
+ Exchange exchange;
+ if(xfr.hasDestination())
+ {
+ exchange = exchangeRegistry.getExchange(xfr.getDestination());
+ if(exchange == null)
+ {
+ exchange = exchangeRegistry.getDefaultExchange();
+ }
+ }
+ else
+ {
+ exchange = exchangeRegistry.getDefaultExchange();
+ }
+ return exchange;
+ }
+
private VirtualHost getVirtualHost(Session session)
{
ServerConnection conn = getServerConnection(session);
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ExchangeMessagesTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ExchangeMessagesTest.java?rev=1171079&r1=1171078&r2=1171079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ExchangeMessagesTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ExchangeMessagesTest.java Thu Sep 15 12:45:49 2011
@@ -66,7 +66,6 @@ public class ExchangeMessagesTest extend
validateLogMessage(log, "EXH-1001", expected);
}
-
public void testExchangeDeleted()
{
_logMessage = ExchangeMessages.DELETED();
@@ -77,4 +76,21 @@ public class ExchangeMessagesTest extend
validateLogMessage(log, "EXH-1002", expected);
}
+ public void testExchangeDiscardedMessage()
+ {
+ // Get the Default Exchange on the Test Vhost for testing
+ final Exchange exchange = ApplicationRegistry.getInstance().
+ getVirtualHostRegistry().getVirtualHost("test").
+ getExchangeRegistry().getDefaultExchange();
+
+ final String name = exchange.getNameShortString().toString();
+ final String routingKey = "routingKey";
+
+ _logMessage = ExchangeMessages.DISCARDMSG(name, routingKey);
+ List<Object> log = performLog();
+
+ String[] expected = {"Discarded Message :","Name:", name, "Routing Key:", routingKey};
+
+ validateLogMessage(log, "EXH-1003", expected);
+ }
}
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java?rev=1171079&r1=1171078&r2=1171079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java Thu Sep 15 12:45:49 2011
@@ -20,6 +20,16 @@
*/
package org.apache.qpid.server.logging;
+import java.io.IOException;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession_0_10;
import org.apache.qpid.framing.AMQFrame;
@@ -28,13 +38,6 @@ import org.apache.qpid.framing.ExchangeD
import org.apache.qpid.framing.ExchangeDeleteOkBody;
import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.Session;
-import java.io.IOException;
-import java.util.List;
-
/**
* Exchange
*
@@ -214,4 +217,38 @@ public class ExchangeLoggingTest extends
}
+ public void testDiscardedMessage() throws Exception
+ {
+ //Ignore broker startup messages
+ _monitor.reset();
+
+ if (!isBroker010())
+ {
+ // Default 0-8..-0-9-1 behaviour is for messages to be rejected (returned to client).
+ setTestClientSystemProperty("qpid.default_mandatory", "false");
+ }
+
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Do not create consumer so queue is not created and message will be discarded.
+ final MessageProducer producer = _session.createProducer(_queue);
+
+ // Sending message
+ final TextMessage msg = _session.createTextMessage("msg");
+ producer.send(msg);
+
+ final String expectedMessageBody = "Discarded Message : Name: " + _name + " Routing Key: " + _queue.getQueueName();
+
+ // Ensure we have received the EXH log msg.
+ waitForMessage("EXH-1003");
+
+ List<String> results = findMatches(EXH_PREFIX);
+ assertEquals("Result set larger than expected.", 2, results.size());
+
+ final String log = getLogMessage(results, 1);
+ validateMessageID("EXH-1003", log);
+
+ final String message = getMessageString(fromMessage(log));
+ assertEquals("Log Message not as expected", expectedMessageBody, message);
+ }
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org