You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/02/19 14:48:18 UTC

svn commit: r509202 - in /incubator/qpid/trunk/qpid/java/client/src: main/java/org/apache/qpid/client/ test/java/org/apache/qpid/client/ test/java/org/apache/qpid/test/unit/client/channelclose/

Author: ritchiem
Date: Mon Feb 19 05:48:17 2007
New Revision: 509202

URL: http://svn.apache.org/viewvc?view=rev&rev=509202
Log:
QPID-379  Bounced Messages do not appear in connection exception listener. 

The previous commit that started the Dispatcher was wrong and caused a lot of failures. This will address that problem by providing a thread pool on the client connection object to deliver bounced messages to the exception handler. 

Tidied up MessageListenerTests so all the asserts are in the given test.

Renamed TestChannelCloseMethodHandlerNoCloseOk as surefire picks it up as a test case.


Added:
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java   (with props)
Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=509202&r1=509201&r2=509202
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Mon Feb 19 05:48:17 2007
@@ -62,6 +62,9 @@
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
 {
@@ -144,6 +147,9 @@
     private AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
     private AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
 
+    /** Thread Pool for executing connection level processes. Such as returning bounced messages. */
+    private final ExecutorService _taskPool = Executors.newCachedThreadPool();
+
     /**
      * @param broker      brokerdetails
      * @param username    username
@@ -716,8 +722,31 @@
             {
                 try
                 {
+                    long startCloseTime = System.currentTimeMillis();
+
+                    _taskPool.shutdown();
                     closeAllSessions(null, timeout);
+
+                    if (!_taskPool.isTerminated())
+                    {
+                        try
+                        {
+                            //adjust timeout
+                            long taskPoolTimeout = adjustTimeout(timeout, startCloseTime);
+
+                            _taskPool.awaitTermination(taskPoolTimeout , TimeUnit.MILLISECONDS);
+                        }
+                        catch (InterruptedException e)
+                        {
+                            _logger.info("Interrupted while shutting down connection thread pool.");
+                        }
+                    }
+
+                    //adjust timeout
+                    timeout = adjustTimeout(timeout, startCloseTime);
+
                     _protocolHandler.closeConnection(timeout);
+
                 }
                 catch (AMQException e)
                 {
@@ -727,6 +756,17 @@
         }
     }
 
+    private long adjustTimeout(long timeout, long startTime)
+    {
+        long now = System.currentTimeMillis();
+        timeout -= now - startTime;
+        if (timeout < 0)
+        {
+            timeout = 0;
+        }
+        return timeout;
+    }
+
     /**
      * Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to
      * mark objects "visible" in userland as closed after failover or other significant event that impacts the
@@ -1146,5 +1186,10 @@
     public void setTemporaryQueueExchangeName(AMQShortString temporaryQueueExchangeName)
     {
         _temporaryQueueExchangeName = temporaryQueueExchangeName;
+    }
+
+    public void performConnectionTask(Runnable task)
+    {
+        _taskPool.execute(task);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=509202&r1=509201&r2=509202
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Mon Feb 19 05:48:17 2007
@@ -72,7 +72,6 @@
 import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
 import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.AMQShortString;
@@ -192,7 +191,6 @@
 
     private boolean _hasMessageListeners;
 
-
     /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
 
     private class Dispatcher extends Thread
@@ -277,42 +275,6 @@
 
                 }
             }
-            else
-            {
-                try
-                {
-                    // Bounced message is processed here, away from the mina thread
-                    AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0,
-                                                                                              false,
-                                                                                              message.getBounceBody().exchange,
-                                                                                              message.getBounceBody().routingKey,
-                                                                                              message.getContentHeader(),
-                                                                                              message.getBodies());
-
-                    AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode);
-                    AMQShortString reason = message.getBounceBody().replyText;
-                    _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
-
-                    //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
-                    if (errorCode == AMQConstant.NO_CONSUMERS)
-                    {
-                        _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
-                    }
-                    else if (errorCode == AMQConstant.NO_ROUTE)
-                    {
-                        _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
-                    }
-                    else
-                    {
-                        _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
-                    }
-
-                }
-                catch (Exception e)
-                {
-                    _logger.error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e);
-                }
-            }
         }
 
         public void close()
@@ -1384,7 +1346,7 @@
 
         if (topicName.indexOf('/') == -1)
         {
-            return new AMQTopic(getDefaultTopicExchangeName(),new AMQShortString(topicName));
+            return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName));
         }
         else
         {
@@ -1474,8 +1436,8 @@
             }
             // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
             // says we must trash the subscription.
-            if (isQueueBound(dest.getExchangeName(),dest.getAMQQueueName()) &&
-                !isQueueBound(dest.getExchangeName(),dest.getAMQQueueName(), topicName))
+            if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) &&
+                !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
             {
                 deleteQueue(dest.getAMQQueueName());
             }
@@ -1634,9 +1596,59 @@
                           + "] received in session with channel id " + _channelId);
         }
 
-        startDistpatcherIfNecessary();
+        if (message.getDeliverBody() == null)
+        {
+            // Return of the bounced message.
+            returnBouncedMessage(message);
+        }
+        else
+        {
+            _queue.add(message);
+        }
+    }
+
+    private void returnBouncedMessage(final UnprocessedMessage message)
+    {
+        _connection.performConnectionTask(
+                new Runnable()
+                {
+                    public void run()
+                    {
+                        try
+                        {
+                            // Bounced message is processed here, away from the mina thread
+                            AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0,
+                                                                                                      false,
+                                                                                                      message.getBounceBody().exchange,
+                                                                                                      message.getBounceBody().routingKey,
+                                                                                                      message.getContentHeader(),
+                                                                                                      message.getBodies());
+
+                            AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode);
+                            AMQShortString reason = message.getBounceBody().replyText;
+                            _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
+
+                            //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
+                            if (errorCode == AMQConstant.NO_CONSUMERS)
+                            {
+                                _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
+                            }
+                            else if (errorCode == AMQConstant.NO_ROUTE)
+                            {
+                                _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
+                            }
+                            else
+                            {
+                                _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
+                            }
 
-        _queue.add(message);
+                        }
+                        catch (Exception e)
+                        {
+                            _logger.error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e);
+                        }
+                    }
+                });
     }
 
     /**
@@ -1882,7 +1894,7 @@
         {
             throw new javax.jms.InvalidDestinationException("Cannot create a subscription on a temporary topic created in another session");
         }
-        if(!(topic instanceof AMQTopic))
+        if (!(topic instanceof AMQTopic))
         {
             throw new javax.jms.InvalidDestinationException("Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: " + topic.getClass().getName());
         }
@@ -1915,7 +1927,6 @@
     {
         return _connection.getTemporaryQueueExchangeName();
     }
-
 
 
     public int getTicket()

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java?view=diff&rev=509202&r1=509201&r2=509202
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java Mon Feb 19 05:48:17 2007
@@ -42,16 +42,13 @@
 import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
 
 /**
- * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue
- * <p/>
- * The message delivery process:
- * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s
- * from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at connection start
- * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a
- * session can run in any order and a synchronous put/poll will block the dispatcher).
- * <p/>
- * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered
- * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery
+ * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread
+ * take()s from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at
+ * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple
+ * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting
+ * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining
+ * messages will be left on the queue and lost, subsequent messages on the session will arrive first.
  */
 public class MessageListenerMultiConsumerTest extends TestCase
 {
@@ -66,7 +63,6 @@
     private MessageConsumer _consumer1;
     private MessageConsumer _consumer2;
 
-    private boolean _testAsync;
     private final CountDownLatch _allMessagesSent = new CountDownLatch(2); //all messages Sent Lock
 
     protected void setUp() throws Exception
@@ -116,16 +112,10 @@
 
         producerConnection.close();
 
-        _testAsync = false;
     }
 
     protected void tearDown() throws Exception
     {
-        //Should have recieved all async messages
-        if (_testAsync)
-        {
-            assertEquals(MSG_COUNT, receivedCount1 + receivedCount2);
-        }
         _clientConnection.close();
 
         super.tearDown();
@@ -161,8 +151,6 @@
 
     public void testAsynchronousRecieve() throws Exception
     {
-        _testAsync = true;
-
         _consumer1.setMessageListener(new MessageListener()
         {
             public void onMessage(Message message)
@@ -173,7 +161,7 @@
 
                 if (receivedCount1 == MSG_COUNT / 2)
                 {
-                    _allMessagesSent.countDown();
+                    _allMessagesSent.countDown();                    
                 }
 
             }
@@ -198,13 +186,14 @@
 
         try
         {
-            _allMessagesSent.await(2000, TimeUnit.MILLISECONDS);
+            _allMessagesSent.await(4000, TimeUnit.MILLISECONDS);
         }
         catch (InterruptedException e)
         {
             //do nothing
         }
 
+        assertEquals(MSG_COUNT, receivedCount1 + receivedCount2);
     }
 
 

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java?view=diff&rev=509202&r1=509201&r2=509202
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java Mon Feb 19 05:48:17 2007
@@ -21,6 +21,8 @@
 package org.apache.qpid.client;
 
 import java.util.Hashtable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -38,18 +40,17 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.url.AMQBindingURL;
 
 /**
- * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue
- * <p/>
- * The message delivery process:
- * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s
- * from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at connection start
- * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a
- * session can run in any order and a synchronous put/poll will block the dispatcher).
- * <p/>
- * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered
- * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery
+ * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread
+ * take()s from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at
+ * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple
+ * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting
+ * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining
+ * messages will be left on the queue and lost, subsequent messages on the session will arrive first.
  */
 public class MessageListenerTest extends TestCase implements MessageListener
 {
@@ -61,7 +62,7 @@
     private int receivedCount = 0;
     private MessageConsumer _consumer;
     private Connection _clientConnection;
-    private boolean _testAsync;
+    private CountDownLatch _awaitMessages = new CountDownLatch(MSG_COUNT);
 
     protected void setUp() throws Exception
     {
@@ -71,9 +72,9 @@
         InitialContextFactory factory = new PropertiesFileInitialContextFactory();
 
         Hashtable<String, String> env = new Hashtable<String, String>();
-        
+
         env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/test?brokerlist='vm://:1'");
-        env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
+        env.put("queue.queue", "MessageListenerTest");
 
         _context = factory.getInitialContext(env);
 
@@ -86,7 +87,6 @@
 
         Session clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
-
         _consumer = clientSession.createConsumer(queue);
 
         //Create Producer
@@ -106,16 +106,10 @@
 
         producerConnection.close();
 
-        _testAsync = false;
     }
 
     protected void tearDown() throws Exception
     {
-        //Should have recieved all async messages
-        if (_testAsync)
-        {
-            assertEquals(MSG_COUNT, receivedCount);
-        }
         _clientConnection.close();
 
         super.tearDown();
@@ -125,7 +119,6 @@
 
     public void testSynchronousRecieve() throws Exception
     {
-
         for (int msg = 0; msg < MSG_COUNT; msg++)
         {
             assertTrue(_consumer.receive(2000) != null);
@@ -134,21 +127,20 @@
 
     public void testAsynchronousRecieve() throws Exception
     {
-        _testAsync = true;
-
         _consumer.setMessageListener(this);
 
-
         _logger.info("Waiting 3 seconds for messages");
 
         try
         {
-            Thread.sleep(2000);
+            _awaitMessages.await(3000, TimeUnit.MILLISECONDS);
         }
         catch (InterruptedException e)
         {
             //do nothing
         }
+        //Should have recieved all async messages
+        assertEquals(MSG_COUNT, receivedCount);
 
     }
 
@@ -157,6 +149,7 @@
         _logger.info("Received Message(" + receivedCount + "):" + message);
 
         receivedCount++;
+        _awaitMessages.countDown();
     }
 
     public static junit.framework.Test suite()

Added: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java?view=auto&rev=509202
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java Mon Feb 19 05:48:17 2007
@@ -0,0 +1,96 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.    
+ *
+ * 
+ */
+package org.apache.qpid.test.unit.client.channelclose;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQNoRouteException;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.AMQInvalidRoutingKeyException;
+import org.apache.qpid.AMQChannelClosedException;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.AMQShortString;
+
+public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListener
+{
+    private static final Logger _logger = Logger.getLogger(ChannelCloseMethodHandlerNoCloseOk.class);
+
+    private static ChannelCloseMethodHandlerNoCloseOk _handler = new ChannelCloseMethodHandlerNoCloseOk();
+
+    public static ChannelCloseMethodHandlerNoCloseOk getInstance()
+    {
+        return _handler;
+    }
+
+    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+    {
+        _logger.debug("ChannelClose method received");
+        ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
+
+        AMQConstant errorCode = AMQConstant.getConstant(method.replyCode);
+        AMQShortString reason = method.replyText;
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
+        }
+
+        // For this test Method Handler .. don't send Close-OK
+//        // TODO: Be aware of possible changes to parameter order as versions change.
+//        AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), method.getMajor(), method.getMinor());
+//        protocolSession.writeFrame(frame);
+        if (errorCode != AMQConstant.REPLY_SUCCESS)
+        {
+            _logger.error("Channel close received with errorCode " + errorCode + ", and reason " + reason);
+            if (errorCode == AMQConstant.NO_CONSUMERS)
+            {
+                throw new AMQNoConsumersException("Error: " + reason, null);
+            }
+            else if (errorCode == AMQConstant.NO_ROUTE)
+            {
+                throw new AMQNoRouteException("Error: " + reason, null);
+            }
+            else if (errorCode == AMQConstant.INVALID_SELECTOR)
+            {
+                _logger.debug("Broker responded with Invalid Selector.");
+
+                throw new AMQInvalidSelectorException(String.valueOf(reason));
+            }
+            else if (errorCode == AMQConstant.INVALID_ROUTING_KEY)
+            {
+                _logger.debug("Broker responded with Invalid Routing Key.");
+
+                throw new AMQInvalidRoutingKeyException(String.valueOf(reason));
+            }
+            else
+            {
+                throw new AMQChannelClosedException(errorCode, "Error: " + reason);
+            }
+
+        }
+        protocolSession.channelClosed(evt.getChannelId(), errorCode, String.valueOf(reason));
+    }
+}
\ No newline at end of file

Propchange: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java?view=diff&rev=509202&r1=509201&r2=509202
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java Mon Feb 19 05:48:17 2007
@@ -27,7 +27,6 @@
 import org.apache.qpid.client.handler.ConnectionTuneMethodHandler;
 import org.apache.qpid.client.handler.ConnectionSecureMethodHandler;
 import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler;
-import org.apache.qpid.client.handler.ChannelCloseMethodHandler;
 import org.apache.qpid.client.handler.ChannelCloseOkMethodHandler;
 import org.apache.qpid.client.handler.BasicDeliverMethodHandler;
 import org.apache.qpid.client.handler.BasicReturnMethodHandler;
@@ -91,7 +90,7 @@
         //
         frame2handlerMap = new HashMap();
         // Use Test Handler for Close methods to not send Close-OKs
-        frame2handlerMap.put(ChannelCloseBody.class, TestChannelCloseMethodHandlerNoCloseOk.getInstance());
+        frame2handlerMap.put(ChannelCloseBody.class, ChannelCloseMethodHandlerNoCloseOk.getInstance());
 
         frame2handlerMap.put(ChannelCloseOkBody.class, ChannelCloseOkMethodHandler.getInstance());
         frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());