You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2008/03/14 13:57:45 UTC

svn commit: r637086 - in /incubator/qpid/branches/M2.1/java: client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/client/handler/ client/src/main/java/org/apache/qpid/client/message/ systests/src/main/java/org/apache/qpid/t...

Author: ritchiem
Date: Fri Mar 14 05:57:42 2008
New Revision: 637086

URL: http://svn.apache.org/viewvc?rev=637086&view=rev
Log:
QPID-854 : Changes to the client to make the dispatcher responsible for closing the queue browser when all the messages have been processed.

Added:
    incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java   (with props)
Modified:
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
    incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java?rev=637086&r1=637085&r2=637086&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java Fri Mar 14 05:57:42 2008
@@ -22,6 +22,7 @@
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.qpid.AMQException;
 
 import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
@@ -50,7 +51,9 @@
         _messageSelector = ((messageSelector == null) || (messageSelector.trim().length() == 0)) ? null : messageSelector;
         // Create Consumer to verify message selector.
         BasicMessageConsumer consumer =
-            (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
+                (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
+        // Close this consumer as we are not looking to consume only to establish that, at least for now,
+        // the QB can be created
         consumer.close();
     }
 
@@ -88,40 +91,40 @@
         checkState();
         final BasicMessageConsumer consumer =
             (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
-        consumer.closeWhenNoMessages(true);
+
         _consumers.add(consumer);
 
         return new Enumeration()
+        {
+
+            Message _nextMessage = consumer == null ? null : consumer.receive();
+
+            public boolean hasMoreElements()
             {
+                _logger.info("QB:hasMoreElements:" + (_nextMessage != null));
 
-                Message _nextMessage = consumer.receive();
+                return (_nextMessage != null);
+            }
 
-                public boolean hasMoreElements()
+            public Object nextElement()
+            {
+                Message msg = _nextMessage;
+                try
                 {
-                    _logger.info("QB:hasMoreElements:" + (_nextMessage != null));
+                    _logger.info("QB:nextElement about to receive");
 
-                    return (_nextMessage != null);
+                    _nextMessage = consumer.receive();
+                    _logger.info("QB:nextElement received:" + _nextMessage);
                 }
-
-                public Object nextElement()
+                catch (JMSException e)
                 {
-                    Message msg = _nextMessage;
-                    try
-                    {
-                        _logger.info("QB:nextElement about to receive");
-
-                        _nextMessage = consumer.receive();
-                        _logger.info("QB:nextElement received:" + _nextMessage);
-                    }
-                    catch (JMSException e)
-                    {
-                        _logger.warn("Exception caught while queue browsing", e);
-                        _nextMessage = null;
-                    }
-
-                    return msg;
+                    _logger.warn("Exception caught while queue browsing", e);
+                    _nextMessage = null;
                 }
-            };
+
+                return msg;
+            }
+        };
     }
 
     public void close() throws JMSException

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=637086&r1=637085&r2=637086&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Mar 14 05:57:42 2008
@@ -648,6 +648,13 @@
      */
     public void closed(Throwable e) throws JMSException
     {
+        // This method needs to be improved. Throwables only arrive here from the mina : exceptionRecived
+        // calls through connection.closeAllSessions which is also called by the public connection.close()
+        // with a null cause
+        // When we are closing the Session due to a protocol session error we simply create a new AMQException
+        // with the correct error code and text this is cleary WRONG as the instanceof check below will fail.
+        // We need to determin here if the connection should be
+
         synchronized (_connection.getFailoverMutex())
         {
             if (e instanceof AMQDisconnectedException)
@@ -763,13 +770,7 @@
         BasicMessageConsumer consumer = _consumers.get(consumerTag.toIntValue());
         if (consumer != null)
         {
-            // fixme this isn't right.. needs to check if _queue contains data for this consumer
-            if (consumer.isAutoClose()) // && _queue.isEmpty())
-            {
-                consumer.closeWhenNoMessages(true);
-            }
-
-            if (!consumer.isNoConsume())
+            if (!consumer.isNoConsume())  // Normal Consumer
             {
                 // Clean the Maps up first
                 // Flush any pending messages for this consumerTag
@@ -785,7 +786,7 @@
 
                 _dispatcher.rejectPending(consumer);
             }
-            else
+            else // Queue Browser
             {
                 // Just close the consumer
                 // fixme  the CancelOK is being processed before the arriving messages..
@@ -793,13 +794,28 @@
                 // has yet to receive before the close comes in.
 
                 // consumer.markClosed();
+
+
+
+                if (consumer.isAutoClose())
+                {     // There is a small window where the message is between the two queues in the dispatcher.
+                    if (consumer.isClosed())
+                    {
+                        if (_logger.isInfoEnabled())
+                        {
+                            _logger.info("Closing consumer:" + consumer.debugIdentity());
+                        }
+
+                        deregisterConsumer(consumer);
+
+                    }
+                    else
+                    {
+                        _queue.add(new UnprocessedMessage.CloseConsumerMessage(consumer));
+                    }
+                }
             }
         }
-        else
-        {
-            _logger.warn("Unable to confirm cancellation of consumer (" + consumerTag + "). Not found in consumer map.");
-        }
-
     }
 
     public QueueBrowser createBrowser(Queue queue) throws JMSException
@@ -2934,7 +2950,8 @@
                                 _lock.wait(2000);
                             }
 
-                            if (message.getDeliverBody().getDeliveryTag() <= _rollbackMark.get())
+                            if (!(message instanceof UnprocessedMessage.CloseConsumerMessage)
+                                && (message.getDeliverBody().getDeliveryTag() <= _rollbackMark.get()))
                             {
                                 rejectMessage(message, true);
                             }

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=637086&r1=637085&r2=637086&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Fri Mar 14 05:57:42 2008
@@ -38,6 +38,7 @@
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
@@ -121,7 +122,6 @@
      * on the queue.  This is used for queue browsing.
      */
     private final boolean _autoClose;
-    private boolean _closeWhenNoMessages;
 
     private final boolean _noConsume;
     private List<StackTraceElement> _closedStack = null;
@@ -358,7 +358,7 @@
         }
         catch (InterruptedException e)
         {
-            _logger.warn("Interrupted: " + e);
+            _logger.warn("Interrupted acquire: " + e);
             if (isClosed())
             {
                 return null;
@@ -369,11 +369,6 @@
 
         try
         {
-            if (closeOnAutoClose())
-            {
-                return null;
-            }
-
             Object o = null;
             if (l > 0)
             {
@@ -386,7 +381,7 @@
                     }
                     catch (InterruptedException e)
                     {
-                        _logger.warn("Interrupted: " + e);
+                        _logger.warn("Interrupted poll: " + e);
                         if (isClosed())
                         {
                             return null;
@@ -404,7 +399,7 @@
                     }
                     catch (InterruptedException e)
                     {
-                        _logger.warn("Interrupted: " + e);
+                        _logger.warn("Interrupted take: " + e);
                         if (isClosed())
                         {
                             return null;
@@ -426,20 +421,6 @@
         }
     }
 
-    private boolean closeOnAutoClose() throws JMSException
-    {
-        if (isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty())
-        {
-            close(false);
-
-            return true;
-        }
-        else
-        {
-            return false;
-        }
-    }
-
     public Message receiveNoWait() throws JMSException
     {
         checkPreConditions();
@@ -468,11 +449,6 @@
 
         try
         {
-            if (closeOnAutoClose())
-            {
-                return null;
-            }
-
             Object o = _synchronousQueue.poll();
             final AbstractJMSMessage m = returnMessageOrThrow(o);
             if (m != null)
@@ -513,6 +489,12 @@
 
             throw e;
         }
+        else if (o instanceof UnprocessedMessage.CloseConsumerMessage)
+        {
+            _closed.set(true);
+            deregisterConsumer();
+            return null;
+        }
         else
         {
             return (AbstractJMSMessage) o;
@@ -526,31 +508,30 @@
 
     public void close(boolean sendClose) throws JMSException
     {
-        // synchronized (_closed)
-
         if (_logger.isInfoEnabled())
         {
             _logger.info("Closing consumer:" + debugIdentity());
         }
 
-        synchronized (_connection.getFailoverMutex())
+        if (!_closed.getAndSet(true))
         {
-            if (!_closed.getAndSet(true))
+            if (_logger.isDebugEnabled())
             {
-                if (_logger.isDebugEnabled())
+                StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
+                if (_closedStack != null)
                 {
-                    StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
-                    if (_closedStack != null)
-                    {
-                        _logger.debug(_consumerTag + " previously:" + _closedStack.toString());
-                    }
-                    else
-                    {
-                        _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
-                    }
+                    _logger.debug(_consumerTag + " previously:" + _closedStack.toString());
                 }
+                else
+                {
+                    _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
+                }
+            }
 
-                if (sendClose)
+            if (sendClose)
+            {
+                // The Synchronized block only needs to protect network traffic.
+                synchronized (_connection.getFailoverMutex())
                 {
                     BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(_consumerTag, false);
 
@@ -564,7 +545,6 @@
                         {
                             _logger.debug("CancelOk'd for consumer:" + debugIdentity());
                         }
-
                     }
                     catch (AMQException e)
                     {
@@ -575,24 +555,26 @@
                         throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
                     }
                 }
-                else
-                {
-                    // //fixme this probably is not right
-                    // if (!isNoConsume())
-                    { // done in BasicCancelOK Handler but not sending one so just deregister.
-                        deregisterConsumer();
-                    }
+            }
+            else
+            {
+                // //fixme this probably is not right
+                // if (!isNoConsume())
+                { // done in BasicCancelOK Handler but not sending one so just deregister.
+                    deregisterConsumer();
                 }
+            }
 
-                if ((_messageListener != null) && _receiving.get())
+            // This will occur if session.close is called closing all consumers we may be blocked waiting for a receive
+            // so we need to let it know it is time to close.
+            if ((_messageListener != null) && _receiving.get())
+            {
+                if (_logger.isInfoEnabled())
                 {
-                    if (_logger.isInfoEnabled())
-                    {
-                        _logger.info("Interrupting thread: " + _receivingThread);
-                    }
-
-                    _receivingThread.interrupt();
+                    _logger.info("Interrupting thread: " + _receivingThread);
                 }
+
+                _receivingThread.interrupt();
             }
         }
     }
@@ -634,6 +616,12 @@
      */
     void notifyMessage(UnprocessedMessage messageFrame)
     {
+        if (messageFrame instanceof UnprocessedMessage.CloseConsumerMessage)
+        {
+            notifyCloseMessage((UnprocessedMessage.CloseConsumerMessage) messageFrame);
+            return;
+        }
+
         final boolean debug = _logger.isDebugEnabled();
 
         if (debug)
@@ -646,12 +634,12 @@
             final BasicDeliverBody deliverBody = messageFrame.getDeliverBody();
 
             AbstractJMSMessage jmsMessage =
-                _messageFactory.createMessage(deliverBody.getDeliveryTag(),
-                                              deliverBody.getRedelivered(),
-                                              deliverBody.getExchange(),
-                                              deliverBody.getRoutingKey(),
-                                              messageFrame.getContentHeader(),
-                                              messageFrame.getBodies());
+                    _messageFactory.createMessage(deliverBody.getDeliveryTag(),
+                                                  deliverBody.getRedelivered(),
+                                                  deliverBody.getExchange(),
+                                                  deliverBody.getRoutingKey(),
+                                                  messageFrame.getContentHeader(),
+                                                  messageFrame.getBodies());
 
             if (debug)
             {
@@ -688,9 +676,32 @@
         }
     }
 
-    /**
-     * @param jmsMessage this message has already been processed so can't redo preDeliver
-     */
+    /** @param closeMessage this message signals that we should close the browser */
+    public void notifyCloseMessage(UnprocessedMessage.CloseConsumerMessage closeMessage)
+    {
+        if (isMessageListenerSet())
+        {
+            // Currently only possible to get this msg type with a browser.
+            // If we get the message here then we should probably just close this consumer.
+            // Though an AutoClose consumer with message listener is quite odd...
+            // Just log out the fact so we know where we are
+            _logger.warn("Using an AutoCloseconsumer with message listener is not supported.");
+        }
+        else
+        {
+            try
+            {
+                _synchronousQueue.put(closeMessage);
+            }
+            catch (InterruptedException e)
+            {
+                _logger.info(" SynchronousQueue.put interupted. Usually result of connection closing," +
+                             "but we shouldn't have close yet");
+            }
+        }
+    }
+
+    /** @param jmsMessage this message has already been processed so can't redo preDeliver */
     public void notifyMessage(AbstractJMSMessage jmsMessage)
     {
         try
@@ -911,18 +922,6 @@
     public boolean isNoConsume()
     {
         return _noConsume;
-    }
-
-    public void closeWhenNoMessages(boolean b)
-    {
-        _closeWhenNoMessages = b;
-
-        if (_closeWhenNoMessages && _synchronousQueue.isEmpty() && _receiving.get() && (_messageListener != null))
-        {
-            _closed.set(true);
-            _receivingThread.interrupt();
-        }
-
     }
 
     public void rollback()

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java?rev=637086&r1=637085&r2=637086&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java Fri Mar 14 05:57:42 2008
@@ -104,6 +104,11 @@
         }
         // fixme why is this only done when the close is expected...
         // should the above forced closes not also cause a close?
+        // ----------
+        // Closing the session only when it is expected allows the errors to be processed
+        // Calling this here will prevent failover. So we should do this for all exceptions
+        // that should never cause failover. Such as authentication errors.
+
         session.channelClosed(channelId, errorCode, String.valueOf(reason));
     }
 }

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?rev=637086&r1=637085&r2=637086&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java Fri Mar 14 05:57:42 2008
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,10 +24,20 @@
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.BasicMessageConsumer;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicDeliverBody;
 import org.apache.qpid.framing.BasicReturnBody;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.MethodDispatcher;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
 /**
  * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
@@ -128,31 +138,159 @@
     }
 
     public static final class UnprocessedBouncedMessage extends UnprocessedMessage
+    {
+        private final BasicReturnBody _body;
+
+        public UnprocessedBouncedMessage(final BasicReturnBody body)
         {
-            private final BasicReturnBody _body;
+            _body = body;
+        }
 
-            public UnprocessedBouncedMessage(final BasicReturnBody body)
-            {
-                _body = body;
-            }
 
+        public BasicDeliverBody getDeliverBody()
+        {
+            return null;
+        }
 
-            public BasicDeliverBody getDeliverBody()
-            {
-                return null;
-            }
+        public BasicReturnBody getBounceBody()
+        {
+            return _body;
+        }
 
-            public BasicReturnBody getBounceBody()
-            {
-                return _body;
-            }
+        public boolean isDeliverMessage()
+        {
+            return false;
+        }
+    }
+
+    public static final class CloseConsumerMessage extends UnprocessedMessage
+    {
+        BasicMessageConsumer _consumer;
 
-            public boolean isDeliverMessage()
+        public CloseConsumerMessage(BasicMessageConsumer consumer)
+        {
+            _consumer = consumer;
+        }
+
+        public BasicDeliverBody getDeliverBody()
+        {
+            return new BasicDeliverBody()
             {
-                return false;
-            }
+                // This is the only thing we need to preserve so the correct consumer can be found later.
+                public AMQShortString getConsumerTag()
+                {
+                    return _consumer.getConsumerTag();
+                }
+
+                // The Rest of these methods are not used
+                public long getDeliveryTag()
+                {
+                    return 0;
+                }
+
+                public AMQShortString getExchange()
+                {
+                    return null;
+                }
+
+                public boolean getRedelivered()
+                {
+                    return false;
+                }
+
+                public AMQShortString getRoutingKey()
+                {
+                    return null;
+                }
+
+                public byte getMajor()
+                {
+                    return 0;
+                }
+
+                public byte getMinor()
+                {
+                    return 0;
+                }
+
+                public int getClazz()
+                {
+                    return 0;
+                }
+
+                public int getMethod()
+                {
+                    return 0;
+                }
+
+                public void writeMethodPayload(ByteBuffer buffer)
+                {
+                }
+
+                public byte getFrameType()
+                {
+                    return 0;
+                }
+
+                public int getSize()
+                {
+                    return 0;
+                }
+
+                public void writePayload(ByteBuffer buffer)
+                {
+                }
+
+                public void handle(int channelId, AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException
+                {
+                }
+
+                public AMQFrame generateFrame(int channelId)
+                {
+                    return null;
+                }
+
+                public AMQChannelException getChannelNotFoundException(int channelId)
+                {
+                    return null;
+                }
+
+                public AMQChannelException getChannelException(AMQConstant code, String message)
+                {
+                    return null;
+                }
+
+                public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause)
+                {
+                    return null;
+                }
+
+                public AMQConnectionException getConnectionException(AMQConstant code, String message)
+                {
+                    return null;
+                }
+
+                public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause)
+                {
+                    return null;
+                }
+
+                public boolean execute(MethodDispatcher methodDispatcher, int channelId) throws AMQException
+                {
+                    return false;
+                }
+            };
         }
 
+        public BasicReturnBody getBounceBody()
+        {
+            return null;
+        }
 
+        public boolean isDeliverMessage()
+        {
+            return false;
+        }
+    }
 
 }

Added: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java?rev=637086&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java (added)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java Fri Mar 14 05:57:42 2008
@@ -0,0 +1,110 @@
+package org.apache.qpid.test.client;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.test.VMTestCase;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.JMSException;
+import javax.naming.NamingException;
+import java.util.Enumeration;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+public class CancelTest extends VMTestCase
+{
+    private static final Logger _logger = Logger.getLogger(QueueBrowserTest.class);
+
+    private Connection _clientConnection;
+    private Session _clientSession;
+    private Queue _queue;
+
+    public void setUp() throws Exception
+    {
+
+        super.setUp();
+
+        _queue = (Queue) _context.lookup("queue");
+
+        //Create Client
+        _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+        _clientConnection.start();
+
+        _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        //Ensure _queue is created
+        _clientSession.createConsumer(_queue).close();
+    }
+
+    /**
+     * Simply
+     */
+    public void test() throws JMSException, NamingException
+    {
+        Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+        producerConnection.start();
+
+        Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(_queue);
+        producer.send(producerSession.createTextMessage());
+        producerConnection.close();
+
+
+        QueueBrowser browser = _clientSession.createBrowser(_queue);
+        Enumeration e = browser.getEnumeration();
+
+
+        while (e.hasMoreElements())
+        {
+            e.nextElement();
+        }
+
+        browser.close();
+
+        MessageConsumer consumer = _clientSession.createConsumer(_queue);
+        consumer.receive();
+        consumer.close();
+    }
+
+    public void loop()
+    {
+        try
+        {
+            int run = 0;
+            while (true)
+            {
+                System.err.println(run++);
+                test();
+            }
+        }
+        catch (Exception e)
+        {
+            _logger.error(e, e);
+        }
+    }
+
+}

Propchange: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java?rev=637086&r1=637085&r2=637086&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java Fri Mar 14 05:57:42 2008
@@ -14,14 +14,17 @@
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License.    
+ *  under the License.
+ *
  *
- * 
  */
 package org.apache.qpid.test.client;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.test.VMTestCase;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.AMQException;
 
 import javax.jms.Queue;
 import javax.jms.ConnectionFactory;
@@ -88,7 +91,6 @@
 
     private void checkQueueDepth(int depth) throws JMSException, NamingException
     {
-        sendMessages(depth);
 
         // create QueueBrowser
         _logger.info("Creating Queue Browser");
@@ -101,6 +103,19 @@
             _logger.debug("Checking for " + depth + " messages with QueueBrowser");
         }
 
+        long queueDepth = 0;
+
+        try
+        {
+            queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _queue);
+        }
+        catch (AMQException e)
+        {
+        }
+
+        assertEquals("Session reports Queue depth not as expected", depth, queueDepth);
+
+
         int msgCount = 0;
         Enumeration msgs = queueBrowser.getEnumeration();
 
@@ -116,11 +131,7 @@
         }
 
         // check to see if all messages found
-//        assertEquals("browser did not find all messages", MSG_COUNT, msgCount);
-        if (msgCount != depth)
-        {
-            _logger.warn(msgCount + " off" + depth + " messages received.");
-        }
+        assertEquals("Browser did not find all messages", depth, msgCount);
 
         //Close browser
         queueBrowser.close();
@@ -132,39 +143,61 @@
      *
      */
 
-     public void testQueueBrowserMsgsRemainOnQueue() throws Exception
-     {
-         int messages = 10;
-
-         checkQueueDepth(messages);
-
-         // VERIFY
-
-         // continue and try to receive all messages
-         MessageConsumer consumer = _clientSession.createConsumer(_queue);
-
-         _logger.info("Verify messages are still on the queue");
-
-         Message tempMsg;
-
-         for (int msgCount = 0; msgCount < messages; msgCount++)
-         {
-             tempMsg = (TextMessage) consumer.receive(RECEIVE_TIMEOUT);
-             if (tempMsg == null)
-             {
-                 fail("Message " + msgCount + " not retrieved from queue");
-             }
-         }
-
-         _logger.info("All messages recevied from queue");
-     }
-
-     /**
-      * This tests you can browse an empty queue, see QPID-785
-      * @throws Exception
-      */
-     public void testBrowsingEmptyQueue() throws Exception
-     {
-         checkQueueDepth(0);
-     }
+    public void testQueueBrowserMsgsRemainOnQueue() throws Exception
+    {
+        int messages = 10;
+
+        sendMessages(messages);
+
+        checkQueueDepth(messages);
+
+        // VERIFY
+
+        // continue and try to receive all messages
+        MessageConsumer consumer = _clientSession.createConsumer(_queue);
+
+        _logger.info("Verify messages are still on the queue");
+
+        Message tempMsg;
+
+        for (int msgCount = 0; msgCount < messages; msgCount++)
+        {
+            tempMsg = (TextMessage) consumer.receive(RECEIVE_TIMEOUT);
+            if (tempMsg == null)
+            {
+                fail("Message " + msgCount + " not retrieved from queue");
+            }
+        }
+
+        consumer.close();
+
+        _logger.info("All messages recevied from queue");
+    }
+
+    /**
+     * This tests you can browse an empty queue, see QPID-785
+     *
+     * @throws Exception
+     */
+    public void testBrowsingEmptyQueue() throws Exception
+    {
+        checkQueueDepth(0);
+    }
+
+    public void loop() throws JMSException
+    {
+        int run = 0;
+        try
+        {
+            while (true)
+            {
+                System.err.println(run++ + ":************************************************************************");
+                testQueueBrowserMsgsRemainOnQueue();
+            }
+        }
+        catch (Exception e)
+        {
+            _logger.error(e, e);
+        }
+    }
 }