You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2011/09/15 14:45:50 UTC

svn commit: r1171079 - in /qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/exchange/ broker/src/main/java/org/apache/qpid/server/logging/messages/ broker/src/main/java/org/apache/qpid/serve...

Author: kwall
Date: Thu Sep 15 12:45:49 2011
New Revision: 1171079

URL: http://svn.apache.org/viewvc?rev=1171079&view=rev
Log:
QPID-2672: Unroutable persistent messages should be immediately removed from store. Added new operational logging message (EXH-1003 - Discarded Message) that reports when the Broker drops an unroutable message. Added new System Test.

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Exchange_logmessages.properties
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ExchangeMessagesTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1171079&r1=1171078&r2=1171079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu Sep 15 12:45:49 2011
@@ -50,6 +50,7 @@ import org.apache.qpid.server.logging.Lo
 import org.apache.qpid.server.logging.actors.AMQPChannelActor;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.logging.messages.ExchangeMessages;
 import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
 import org.apache.qpid.server.message.AMQMessage;
 import org.apache.qpid.server.message.MessageMetaData;
@@ -315,7 +316,6 @@ public class AMQChannel implements Sessi
             try
             {
                 _currentMessage.getStoredMessage().flushToStore();
-
                 final ArrayList<? extends BaseQueue> destinationQueues = _currentMessage.getDestinationQueues();
 
                 if(!checkMessageUserId(_currentMessage.getContentHeader()))
@@ -324,7 +324,7 @@ public class AMQChannel implements Sessi
                 }
                 else
                 {
-                    if(destinationQueues == null || _currentMessage.getDestinationQueues().isEmpty())
+                    if(destinationQueues == null || destinationQueues.isEmpty())
                     {
                         if (_currentMessage.isMandatory() || _currentMessage.isImmediate())
                         {
@@ -332,7 +332,7 @@ public class AMQChannel implements Sessi
                         }
                         else
                         {
-                            _logger.warn("MESSAGE DISCARDED: No routes for message - " + createAMQMessage(_currentMessage));
+                            _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchange().asString(), _currentMessage.getRoutingKey()));
                         }
 
                     }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1171079&r1=1171078&r2=1171079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Thu Sep 15 12:45:49 2011
@@ -356,7 +356,7 @@ public abstract class AbstractExchange i
         _receivedMessageCount.incrementAndGet();
         _receivedMessageSize.addAndGet(message.getSize());
         final ArrayList<? extends BaseQueue> queues = doRoute(message);
-        if(queues != null && !queues.isEmpty())
+        if(!queues.isEmpty())
         {
             _routedMessageCount.incrementAndGet();
             _routedMessageSize.addAndGet(message.getSize());

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=1171079&r1=1171078&r2=1171079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java Thu Sep 15 12:45:49 2011
@@ -30,15 +30,12 @@ import org.apache.qpid.server.queue.AMQQ
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.binding.BindingFactory;
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.configuration.ExchangeConfig;
 
 import javax.management.JMException;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.Collection;
-import java.util.concurrent.CopyOnWriteArrayList;
 
 public interface Exchange extends ExchangeReferrer, ExchangeConfig
 {
@@ -67,7 +64,12 @@ public interface Exchange extends Exchan
 
     void close() throws AMQException;
 
-
+    /**
+     * Returns a list of queues to which to route this message.   If there are
+     * no queues the empty list must be returned.
+     *
+     * @return list of queues to which to route the message.
+     */
     ArrayList<? extends BaseQueue> route(InboundMessage message);
 
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Exchange_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Exchange_logmessages.properties?rev=1171079&r1=1171078&r2=1171079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Exchange_logmessages.properties (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Exchange_logmessages.properties Thu Sep 15 12:45:49 2011
@@ -21,4 +21,5 @@
 # 0 - type
 # 1 - name
 CREATED = EXH-1001 : Create :[ Durable] Type: {0} Name: {1}
-DELETED = EXH-1002 : Deleted
\ No newline at end of file
+DELETED = EXH-1002 : Deleted
+DISCARDMSG = EXH-1003 : Discarded Message : Name: {0} Routing Key: {1}

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java?rev=1171079&r1=1171078&r2=1171079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java Thu Sep 15 12:45:49 2011
@@ -34,7 +34,7 @@ import org.apache.qpid.transport.codec.B
 import java.nio.ByteBuffer;
 import java.lang.ref.SoftReference;
 
-public class MessageMetaData_0_10 implements StorableMessageMetaData
+public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMessage
 {
     private Header _header;
     private DeliveryProperties _deliveryProps;
@@ -194,6 +194,12 @@ public class MessageMetaData_0_10 implem
         return _deliveryProps == null ? 0L : _deliveryProps.getExpiration();
     }
 
+    public boolean isRedelivered()
+    {
+        // The *Message* is never redelivered, only queue entries are...
+        return false;
+    }
+
     public long getArrivalTime()
     {
         return _arrivalTime;
@@ -239,4 +245,6 @@ public class MessageMetaData_0_10 implem
 
         }
     }
+
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1171079&r1=1171078&r2=1171079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Thu Sep 15 12:45:49 2011
@@ -40,6 +40,7 @@ import org.apache.qpid.server.filter.Fil
 import org.apache.qpid.server.filter.FilterManagerFactory;
 import org.apache.qpid.server.flow.FlowCreditManager_0_10;
 import org.apache.qpid.server.flow.WindowCreditManager;
+import org.apache.qpid.server.logging.messages.ExchangeMessages;
 import org.apache.qpid.server.message.MessageMetaData_0_10;
 import org.apache.qpid.server.message.MessageTransferMessage;
 import org.apache.qpid.server.queue.AMQQueue;
@@ -167,7 +168,6 @@ public class ServerSessionDelegate exten
     @Override
     public void messageSubscribe(Session session, MessageSubscribe method)
     {
-
         //TODO - work around broken Python tests
         if(!method.hasAcceptMode())
         {
@@ -284,25 +284,10 @@ public class ServerSessionDelegate exten
         }
     }
 
-
     @Override
     public void messageTransfer(Session ssn, MessageTransfer xfr)
     {
-        ExchangeRegistry exchangeRegistry = getExchangeRegistry(ssn);
-        Exchange exchange;
-        if(xfr.hasDestination())
-        {
-            exchange = exchangeRegistry.getExchange(xfr.getDestination());
-            if(exchange == null)
-            {
-                exchange = exchangeRegistry.getDefaultExchange();
-            }
-        }
-        else
-        {
-            exchange = exchangeRegistry.getDefaultExchange();
-        }
-        
+        final Exchange exchange = getExchangeForMessage(ssn, xfr);
 
         DeliveryProperties delvProps = null;
         if(xfr.getHeader() != null && (delvProps = xfr.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration())
@@ -310,7 +295,7 @@ public class ServerSessionDelegate exten
             delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
         }
 
-        MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
+        final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
         
         if (!getVirtualHost(ssn).getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName()))
         {
@@ -320,66 +305,65 @@ public class ServerSessionDelegate exten
             
             return;
         }
-        
-        final MessageStore store = getVirtualHost(ssn).getMessageStore();
-        StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData);
-        ByteBuffer body = xfr.getBody();
-        if(body != null)
+
+        final Exchange exchangeInUse;
+        ArrayList<? extends BaseQueue> queues = exchange.route(messageMetaData);
+        if(queues.isEmpty() && exchange.getAlternateExchange() != null)
+        {
+            final Exchange alternateExchange = exchange.getAlternateExchange();
+            queues = alternateExchange.route(messageMetaData);
+            if (!queues.isEmpty())
+            {
+                exchangeInUse = alternateExchange;
+            }
+            else
+            {
+                exchangeInUse = exchange;
+            }
+        }
+        else
         {
-            storeMessage.addContent(0, body);
+            exchangeInUse = exchange;
         }
-        storeMessage.flushToStore();
-        MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference());
-
-        ArrayList<? extends BaseQueue> queues = exchange.route(message);
 
-
-
-        if(queues != null && queues.size() != 0)
+        if(!queues.isEmpty())
         {
+            final MessageStore store = getVirtualHost(ssn).getMessageStore();
+            final StoredMessage<MessageMetaData_0_10> storeMessage = createAndFlushStoreMessage(xfr, messageMetaData, store);
+            MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference());
             ((ServerSession) ssn).enqueue(message, queues);
         }
         else
         {
-            if(delvProps == null || !delvProps.hasDiscardUnroutable() || !delvProps.getDiscardUnroutable())
+            if((delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
             {
-                if(xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
-                {
-                    RangeSet rejects = new RangeSet();
-                    rejects.add(xfr.getId());
-                    MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
-                    ssn.invoke(reject);
-                }
-                else
-                {
-                    Exchange alternate = exchange.getAlternateExchange();
-                    if(alternate != null)
-                    {
-                        queues = alternate.route(message);
-                        if(queues != null && queues.size() != 0)
-                        {
-                            ((ServerSession) ssn).enqueue(message, queues);
-                        }
-                        else
-                        {
-                            //TODO - log the message discard
-                        }
-                    }
-                    else
-                    {
-                        //TODO - log the message discard
-                    }
-
-
-                }
+                RangeSet rejects = new RangeSet();
+                rejects.add(xfr.getId());
+                MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
+                ssn.invoke(reject);
+            }
+            else
+            {
+                ((ServerSession) ssn).getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(), messageMetaData.getRoutingKey()));
             }
-
-
         }
 
         ssn.processed(xfr);
     }
 
+    private StoredMessage<MessageMetaData_0_10> createAndFlushStoreMessage(final MessageTransfer xfr,
+            final MessageMetaData_0_10 messageMetaData, final MessageStore store)
+    {
+        final StoredMessage<MessageMetaData_0_10> storeMessage = store.addMessage(messageMetaData);
+        ByteBuffer body = xfr.getBody();
+        if(body != null)
+        {
+            storeMessage.addContent(0, body);
+        }
+        storeMessage.flushToStore();
+        return storeMessage;
+    }
+
     @Override
     public void messageCancel(Session session, MessageCancel method)
     {
@@ -582,6 +566,25 @@ public class ServerSessionDelegate exten
 
     }
 
+    private Exchange getExchangeForMessage(Session ssn, MessageTransfer xfr)
+    {
+        final ExchangeRegistry exchangeRegistry = getExchangeRegistry(ssn);
+        Exchange exchange;
+        if(xfr.hasDestination())
+        {
+            exchange = exchangeRegistry.getExchange(xfr.getDestination());
+            if(exchange == null)
+            {
+                exchange = exchangeRegistry.getDefaultExchange();
+            }
+        }
+        else
+        {
+            exchange = exchangeRegistry.getDefaultExchange();
+        }
+        return exchange;
+    }
+
     private VirtualHost getVirtualHost(Session session)
     {
         ServerConnection conn = getServerConnection(session);

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ExchangeMessagesTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ExchangeMessagesTest.java?rev=1171079&r1=1171078&r2=1171079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ExchangeMessagesTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ExchangeMessagesTest.java Thu Sep 15 12:45:49 2011
@@ -66,7 +66,6 @@ public class ExchangeMessagesTest extend
         validateLogMessage(log, "EXH-1001", expected);
     }
 
-
     public void testExchangeDeleted()
     {
         _logMessage = ExchangeMessages.DELETED();
@@ -77,4 +76,21 @@ public class ExchangeMessagesTest extend
         validateLogMessage(log, "EXH-1002", expected);
     }
 
+    public void testExchangeDiscardedMessage()
+    {
+        // Get the Default Exchange on the Test Vhost for testing
+        final Exchange exchange = ApplicationRegistry.getInstance().
+                getVirtualHostRegistry().getVirtualHost("test").
+                getExchangeRegistry().getDefaultExchange();
+
+        final String name = exchange.getNameShortString().toString();
+        final String routingKey = "routingKey";
+
+        _logMessage = ExchangeMessages.DISCARDMSG(name, routingKey);
+        List<Object> log = performLog();
+
+        String[] expected = {"Discarded Message :","Name:", name, "Routing Key:", routingKey};
+
+        validateLogMessage(log, "EXH-1003", expected);
+    }
 }

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java?rev=1171079&r1=1171078&r2=1171079&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ExchangeLoggingTest.java Thu Sep 15 12:45:49 2011
@@ -20,6 +20,16 @@
  */
 package org.apache.qpid.server.logging;
 
+import java.io.IOException;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQSession_0_10;
 import org.apache.qpid.framing.AMQFrame;
@@ -28,13 +38,6 @@ import org.apache.qpid.framing.ExchangeD
 import org.apache.qpid.framing.ExchangeDeleteOkBody;
 import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
 
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.Session;
-import java.io.IOException;
-import java.util.List;
-
 /**
  * Exchange
  *
@@ -214,4 +217,38 @@ public class ExchangeLoggingTest extends
 
     }
 
+    public void testDiscardedMessage() throws Exception
+    {
+        //Ignore broker startup messages
+        _monitor.reset();
+
+        if (!isBroker010())
+        {
+            // Default 0-8..-0-9-1 behaviour is for messages to be rejected (returned to client).
+            setTestClientSystemProperty("qpid.default_mandatory", "false");
+        }
+
+        _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        // Do not create consumer so queue is not created and message will be discarded.
+        final MessageProducer producer = _session.createProducer(_queue);
+
+        // Sending message
+        final TextMessage msg = _session.createTextMessage("msg");
+        producer.send(msg);
+
+        final String expectedMessageBody = "Discarded Message : Name: " + _name + " Routing Key: " + _queue.getQueueName();
+
+        // Ensure we have received the EXH log msg.
+        waitForMessage("EXH-1003");
+
+        List<String> results = findMatches(EXH_PREFIX);
+        assertEquals("Result set larger than expected.", 2, results.size());
+
+        final String log = getLogMessage(results, 1);
+        validateMessageID("EXH-1003", log);
+
+        final String message = getMessageString(fromMessage(log));
+        assertEquals("Log Message not as expected", expectedMessageBody, message);
+    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org