You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/01/17 14:31:49 UTC

svn commit: r497016 - in /incubator/qpid/branches/perftesting/qpid/java: broker/src/main/java/org/apache/qpid/server/ client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/client/protocol/ client/src/main/java/org/apache/qpi...

Author: ritchiem
Date: Wed Jan 17 05:31:48 2007
New Revision: 497016

URL: http://svn.apache.org/viewvc?view=rev&rev=497016
Log:
Hand patched bug fixes from post persistence changes

Revision: 496661
Author: ritchiem
Date: 11:13:38, 16 January 2007
Message:
QPID-300
Updated BlockingMethodFrameListener so it passed FailoverExceptions without wrapping in AMQExceptions.
----
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java

Revision: 496658
Author: rgreig
Date: 10:51:04, 16 January 2007
Message:
QPID-299 Messages not being correctly requeued when transacted session closed
----
Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java

Revision: 496641
Author: ritchiem
Date: 09:43:37, 16 January 2007
Message:
QPID-293
Added DispatcherCallback and MessageConsumerPair to allow Processed Messages to be returned to the consumer for redelivery whilst pausing the dispatcher.

AMQSession updated to create the callback and populate the queue.

Created two test cases that check the messages are correctly delivered with and without message listeners for 1 and 2 clients.

Minor non-JIRA related.
PropertiesFileInitialContextFactory dropped a warn log to info.
----
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Added : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java
Added : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java
Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
Added : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
Added : /incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java

Added:
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java   (with props)
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java   (with props)
    incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java   (with props)
    incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java   (with props)
Modified:
    incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
    incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=497016&r1=497015&r2=497016
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Wed Jan 17 05:31:48 2007
@@ -337,7 +337,8 @@
             }
         }
         unsubscribeAllConsumers(session);
-        requeue();
+        requeue();        
+        _txnBuffer.commit();
     }
 
     private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=497016&r1=497015&r2=497016
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Jan 17 05:31:48 2007
@@ -49,6 +49,7 @@
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -92,6 +93,8 @@
      */
     private final FlowControllingBlockingQueue _queue;
 
+    private final java.util.Queue<MessageConsumerPair> _reprocessQueue;
+
     private Dispatcher _dispatcher;
 
     private MessageFactoryRegistry _messageFactoryRegistry;
@@ -139,11 +142,32 @@
     private volatile AtomicBoolean _stopped = new AtomicBoolean(true);
 
     /**
+     * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer
+     */
+    private final AtomicBoolean _pausing = new AtomicBoolean(false);
+
+    /**
+     * Used to signal 'pausing' the dispatcher when setting a message listener on a consumer
+     */
+    private final AtomicBoolean _paused = new AtomicBoolean(false);
+
+    /**
      * Set when recover is called. This is to handle the case where recover() is called by application code
      * during onMessage() processing. We need to make sure we do not send an auto ack if recover was called.
      */
     private boolean _inRecovery;
 
+    public void doDispatcherTask(DispatcherCallback dispatcherCallback)
+    {
+        synchronized (this)
+        {
+            _dispatcher.pause();
+
+            dispatcherCallback.whilePaused(_reprocessQueue);
+
+            _dispatcher.reprocess();
+        }
+    }
 
 
     /**
@@ -151,6 +175,8 @@
      */
     private class Dispatcher extends Thread
     {
+        private final Logger _logger = Logger.getLogger(Dispatcher.class);        
+
         public Dispatcher()
         {
             super("Dispatcher-Channel-" + _channelId);
@@ -158,23 +184,105 @@
 
         public void run()
         {
-            UnprocessedMessage message;
             _stopped.set(false);
+
+            while (!_stopped.get())
+            {
+                if (_pausing.get())
+                {
+                    try
+                    {
+                        //Wait for unpausing
+                        synchronized (_pausing)
+                        {
+                            synchronized (_paused)
+                            {
+                                _paused.notify();
+                            }
+
+                            _logger.info("dispatcher paused");
+                            
+                            _pausing.wait();
+                            _logger.info("dispatcher notified");
+                        }
+
+                    }
+                    catch (InterruptedException e)
+                    {
+                        //do nothing... occurs when a pause request occurs will already
+                        // be here if another pause event is pending
+                        _logger.info("dispacher interrupted");
+                    }
+
+                    doReDispatch();
+
+                }
+                else
+                {
+                    doNormalDispatch();
+                }
+            }
+
+            _logger.info("Dispatcher thread terminating for channel " + _channelId);
+        }
+
+        private void doNormalDispatch()
+        {
+            UnprocessedMessage message;
             try
             {
-                while (!_stopped.get() && (message = (UnprocessedMessage) _queue.take()) != null)
+                while (!_stopped.get() && !_pausing.get() && (message = (UnprocessedMessage) _queue.take()) != null)
                 {
                     dispatchMessage(message);
                 }
             }
             catch (InterruptedException e)
             {
-                ;
+                _logger.info("dispatcher normal dispatch interrupted");
             }
 
-            _logger.info("Dispatcher thread terminating for channel " + _channelId);
         }
 
+        private void doReDispatch()
+        {
+            _logger.info("doRedispatching");
+
+            MessageConsumerPair messageConsumerPair;
+
+            if (_reprocessQueue != null)
+            {
+                _logger.info("Reprocess Queue has size:" + _reprocessQueue.size());
+                while (!_stopped.get() && ((messageConsumerPair = _reprocessQueue.poll()) != null))
+                {
+                    reDispatchMessage(messageConsumerPair);
+                }
+            }
+
+            if (_reprocessQueue == null || _reprocessQueue.isEmpty())
+            {
+                _logger.info("Reprocess Queue emptied");
+                _pausing.set(false);
+            }
+            else
+            {
+                _logger.info("Reprocess Queue still contains contains:" + _reprocessQueue.size());
+            }
+
+        }
+
+        private void reDispatchMessage(MessageConsumerPair consumerPair)
+        {
+            if (consumerPair.getItem() instanceof AbstractJMSMessage)
+            {
+                _logger.info("do renotify:" + consumerPair.getItem());
+                consumerPair.getConsumer().notifyMessage((AbstractJMSMessage) consumerPair.getItem(), _channelId);
+            }
+
+            //    BasicMessageConsumer.notifyError(Throwable cause)
+            // will put the cause in to the list which could come out here... need to watch this.
+        }
+
+
         private void dispatchMessage(UnprocessedMessage message)
         {
             if (message.deliverBody != null)
@@ -235,6 +343,36 @@
             _stopped.set(true);
             interrupt();
         }
+
+        public void pause()
+        {
+            _logger.info("pausing");
+            _pausing.set(true);
+
+
+            interrupt();
+
+            synchronized (_paused)
+            {
+                try
+                {
+                    _paused.wait();
+                }
+                catch (InterruptedException e)
+                {
+                  //do nothing
+                }
+            }
+        }
+
+        public void reprocess()
+        {
+            synchronized (_pausing)
+            {
+                _logger.info("reprocessing");
+                _pausing.notify();
+            }
+        }
     }
 
     AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
@@ -267,6 +405,8 @@
         _defaultPrefetchHighMark = defaultPrefetchHighMark;
         _defaultPrefetchLowMark = defaultPrefetchLowMark;
 
+        _reprocessQueue = new ConcurrentLinkedQueue<MessageConsumerPair>();
+
         if (_acknowledgeMode == NO_ACKNOWLEDGE)
         {
             _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
@@ -1583,7 +1723,7 @@
         //stop the server delivering messages to this session
         suspendChannel();
 
-//stop the dispatcher thread
+        //stop the dispatcher thread
         _stopped.set(true);
     }
 

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=497016&r1=497015&r2=497016
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Wed Jan 17 05:31:48 2007
@@ -39,6 +39,7 @@
 import javax.jms.Message;
 import javax.jms.MessageListener;
 import java.util.Iterator;
+import java.util.Queue;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import javax.jms.Destination;
@@ -68,7 +69,7 @@
     /**
      * Holds an atomic reference to the listener installed.
      */
-    private final AtomicReference _messageListener = new AtomicReference();
+    private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>();
 
     /**
      * The consumer tag allows us to close the consumer by sending a jmsCancel method to the
@@ -83,13 +84,17 @@
 
     /**
      * Used in the blocking receive methods to receive a message from
-     * the Session thread. Argument true indicates we want strict FIFO semantics
+     * the Session thread.
+     * <p/>
+     * Or to notify of errors
+     * <p/>
+     * Argument true indicates we want strict FIFO semantics
      */
     private final ArrayBlockingQueue _synchronousQueue;
 
     private MessageFactoryRegistry _messageFactory;
 
-    private AMQSession _session;
+    private final AMQSession _session;
 
     private AMQProtocolHandler _protocolHandler;
 
@@ -146,8 +151,8 @@
     private Thread _receivingThread;
 
     /**
-     *  autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive
-     *  on the queue.  This is used for queue browsing.
+     * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive
+     * on the queue.  This is used for queue browsing.
      */
     private boolean _autoClose;
     private boolean _closeWhenNoMessages;
@@ -190,8 +195,8 @@
 
     public MessageListener getMessageListener() throws JMSException
     {
-    	checkPreConditions();
-        return (MessageListener) _messageListener.get();
+        checkPreConditions();
+        return _messageListener.get();
     }
 
     public int getAcknowledgeMode()
@@ -204,7 +209,7 @@
         return _messageListener.get() != null;
     }
 
-    public void setMessageListener(MessageListener messageListener) throws JMSException
+    public void setMessageListener(final MessageListener messageListener) throws JMSException
     {
     	checkPreConditions();
 
@@ -221,7 +226,7 @@
         if (_session.isStopped())
         {
             _messageListener.set(messageListener);
-            _logger.debug("Message listener set for destination " + _destination);
+            _logger.debug("Session stopped : Message listener set for destination " + _destination);
         }
         else
         {
@@ -237,14 +242,30 @@
 
             if (messageListener != null)
             {
-                //handle case where connection has already been started, and the dispatcher is blocked
-                //doing a put on the _synchronousQueue
-                AbstractJMSMessage jmsMsg = (AbstractJMSMessage)_synchronousQueue.poll();
-                if (jmsMsg != null)
+                //handle case where connection has already been started, and the dispatcher has alreaded started
+                // putting values on the _synchronousQueue
+
+                synchronized (_session)
                 {
-                    preApplicationProcessing(jmsMsg);
-                    messageListener.onMessage(jmsMsg);
-                    postDeliver(jmsMsg);
+                    //Pause Dispatcher
+                    _session.doDispatcherTask(new DispatcherCallback(this)
+                    {
+                        public void whilePaused(Queue<MessageConsumerPair> reprocessQueue)
+                        {
+                            // Prepend messages in _synchronousQueue to dispatcher queue
+                            _logger.debug("ReprocessQueue current size:" + reprocessQueue.size());
+                            for (Object item : _synchronousQueue)
+                            {
+                                reprocessQueue.offer(new MessageConsumerPair(_consumer, item));
+                            }
+                            _logger.debug("Added items to reprocessQueue:" + reprocessQueue.size());
+
+                            // Set Message Listener
+                            _logger.debug("Set Message Listener");
+                            _messageListener.set(messageListener);                            
+                        }
+                    }
+                    );                    
                 }
             }
         }
@@ -498,7 +519,9 @@
      */
     void notifyMessage(UnprocessedMessage messageFrame, int channelId)
     {
-        if (_logger.isDebugEnabled())
+        final boolean debug = _logger.isDebugEnabled();
+
+        if (debug)
         {
             _logger.debug("notifyMessage called with message number " + messageFrame.deliverBody.deliveryTag);
         }
@@ -509,11 +532,37 @@
                                                                           messageFrame.contentHeader,
                                                                           messageFrame.bodies);
 
-            _logger.debug("Message is of type: " + jmsMessage.getClass().getName());
+            if (debug)
+            {
+                _logger.debug("Message is of type: " + jmsMessage.getClass().getName());
+            }
             jmsMessage.setConsumer(this);
 
             preDeliver(jmsMessage);
 
+            notifyMessage(jmsMessage, channelId);
+        }
+        catch (Exception e)
+        {
+            if (e instanceof InterruptedException)
+            {
+                _logger.info("SynchronousQueue.put interupted. Usually result of connection closing");
+            }
+            else
+            {
+                _logger.error("Caught exception (dump follows) - ignoring...", e);
+            }
+        }
+    }
+
+    /**
+     * @param jmsMessage this message has already been processed so can't redo preDeliver
+     * @param channelId
+     */
+    public void notifyMessage(AbstractJMSMessage jmsMessage, int channelId)
+    {
+        try
+        {
             if (isMessageListenerSet())
             {
                 //we do not need a lock around the test above, and the dispatch below as it is invalid
@@ -524,6 +573,7 @@
             }
             else
             {
+                //This shouldn't be possible.
                 _synchronousQueue.put(jmsMessage);
             }
         }
@@ -531,11 +581,11 @@
         {
             if (e instanceof InterruptedException)
             {
-                _logger.info("SynchronousQueue.put interupted. Usually result of connection closing");
+                _logger.info("reNotification : SynchronousQueue.put interupted. Usually result of connection closing");
             }
             else
             {
-                _logger.error("Caught exception (dump follows) - ignoring...", e);
+                _logger.error("reNotification : Caught exception (dump follows) - ignoring...", e);
             }
         }
     }
@@ -619,6 +669,8 @@
     void notifyError(Throwable cause)
     {
         _closed.set(true);
+
+        //QPID-293 can "request redelivery of this error through dispatcher"
 
         // we have no way of propagating the exception to a message listener - a JMS limitation - so we
         // deal with the case where we have a synchronous receive() waiting for a message to arrive

Added: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java?view=auto&rev=497016
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java (added)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java Wed Jan 17 05:31:48 2007
@@ -0,0 +1,36 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.    
+ *
+ * 
+ */
+package org.apache.qpid.client;
+
+import java.util.Queue;
+
+public abstract class DispatcherCallback
+{
+    BasicMessageConsumer _consumer;
+
+    public DispatcherCallback(BasicMessageConsumer mc)
+    {
+        _consumer = mc;
+    }
+
+    abstract public void whilePaused(Queue<MessageConsumerPair> reprocessQueue);
+
+}

Propchange: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/DispatcherCallback.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java?view=auto&rev=497016
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java (added)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java Wed Jan 17 05:31:48 2007
@@ -0,0 +1,43 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.    
+ *
+ * 
+ */
+package org.apache.qpid.client;
+
+public class MessageConsumerPair
+{
+    BasicMessageConsumer _consumer;
+    Object _item;
+
+    public MessageConsumerPair(BasicMessageConsumer consumer, Object item)
+    {
+        _consumer = consumer;
+        _item = item;
+    }
+
+    public BasicMessageConsumer getConsumer()
+    {
+        return _consumer;
+    }
+
+    public Object getItem()
+    {
+        return _item;
+    }
+}

Propchange: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java?view=diff&rev=497016&r1=497015&r2=497016
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java Wed Jan 17 05:31:48 2007
@@ -21,6 +21,7 @@
 package org.apache.qpid.client.protocol;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.framing.AMQMethodBody;
 
 public abstract class BlockingMethodFrameListener implements AMQMethodListener
@@ -49,6 +50,7 @@
     /**
      * This method is called by the MINA dispatching thread. Note that it could
      * be called before blockForFrame() has been called.
+     *
      * @param evt the frame event
      * @return true if the listener has dealt with this frame
      * @throws AMQException
@@ -106,11 +108,16 @@
         {
             if (_error instanceof AMQException)
             {
-                throw (AMQException)_error;
+                throw(AMQException) _error;
+            }
+            else if (_error instanceof FailoverException)
+            {
+                // This should ensure that FailoverException is not wrapped and can be caught.
+                throw(FailoverException) _error;  // needed to expose FailoverException.
             }
             else
             {
-                throw new AMQException("Woken up due to " + _error.getClass(), _error); // FIXME: This will wrap FailoverException and prevent it being caught.
+                throw new AMQException("Woken up due to " + _error.getClass(), _error);
             }
         }
 
@@ -120,6 +127,7 @@
     /**
      * This is a callback, called by the MINA dispatcher thread only. It is also called from within this
      * class to avoid code repetition but again is only called by the MINA dispatcher thread.
+     *
      * @param e
      */
     public void error(Exception e)

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java?view=diff&rev=497016&r1=497015&r2=497016
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java Wed Jan 17 05:31:48 2007
@@ -85,7 +85,7 @@
             }
             else
             {
-                _logger.warn("No Provider URL specified.");
+                _logger.info("No Provider URL specified.");
             }
         }
         catch (IOException ioe)

Added: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java?view=auto&rev=497016
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java (added)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java Wed Jan 17 05:31:48 2007
@@ -0,0 +1,200 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Message;
+import javax.jms.ConnectionFactory;
+import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+import java.util.Hashtable;
+
+/**
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue
+ * <p/>
+ * The message delivery process:
+ * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s
+ * from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at connection start
+ * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a
+ * session can run in any order and a synchronous put/poll will block the dispatcher).
+ * <p/>
+ * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered
+ * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ */
+public class MessageListenerMultiConsumerTest extends TestCase
+{
+    private static final Logger _logger = Logger.getLogger(MessageListenerMultiConsumerTest.class);
+
+    Context _context;
+
+    private static final int MSG_COUNT = 6;
+    private int receivedCount1 = 0;
+    private int receivedCount2 = 0;
+    private Connection _clientConnection;
+    private MessageConsumer _consumer1;
+    private MessageConsumer _consumer2;
+
+    private boolean _testAsync;
+
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        TransportConnection.createVMBroker(1);
+
+        InitialContextFactory factory = new PropertiesFileInitialContextFactory();
+
+        Hashtable<String, String> env = new Hashtable<String, String>();
+        
+        env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/tests?brokerlist='vm://:1'");
+        env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
+
+        _context = factory.getInitialContext(env);
+
+        Queue queue = (Queue) _context.lookup("queue");
+
+        //Create Client 1
+        _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+        _clientConnection.start();
+
+        Session clientSession1 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        _consumer1 = clientSession1.createConsumer(queue);
+
+        //Create Client 2
+        Session clientSession2 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        _consumer2 = clientSession2.createConsumer(queue);
+
+        //Create Producer
+        Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+        producerConnection.start();
+
+
+        Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageProducer producer = producerSession.createProducer(queue);
+
+        for (int msg = 0; msg < MSG_COUNT; msg++)
+        {
+            producer.send(producerSession.createTextMessage("Message " + msg));
+        }
+
+        producerConnection.close();
+
+        _testAsync = false;
+    }
+
+    protected void tearDown() throws Exception
+    {
+        //Should have recieved all async messages
+        if (_testAsync)
+        {
+            assertEquals(MSG_COUNT, receivedCount1 + receivedCount2);
+        }
+        _clientConnection.close();        
+
+        super.tearDown();
+        TransportConnection.killAllVMBrokers();
+    }
+
+
+    public void testRecieveC1thenC2() throws Exception
+    {
+
+        for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+        {
+
+            assertTrue(_consumer1.receive() != null);
+        }
+
+        for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+        {
+            assertTrue(_consumer2.receive() != null);
+        }
+    }
+
+    public void testRecieveInterleaved() throws Exception
+    {
+
+        for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+        {
+            assertTrue(_consumer1.receive() != null);
+            assertTrue(_consumer2.receive() != null);
+        }
+    }
+
+
+    public void testAsynchronousRecieve() throws Exception
+    {
+        _testAsync = true;
+
+        _consumer1.setMessageListener(new MessageListener()
+        {
+            public void onMessage(Message message)
+            {
+                _logger.info("Client 1 Received Message(" + receivedCount1 + "):" + message);
+
+                receivedCount1++;
+            }
+        });
+
+        _consumer2.setMessageListener(new MessageListener()
+        {
+            public void onMessage(Message message)
+            {
+                _logger.info("Client 2 Received Message(" + receivedCount2 + "):" + message);
+
+                receivedCount2++;
+            }
+        });
+
+
+        _logger.info("Waiting 3 seconds for messages");
+
+        try
+        {
+            Thread.sleep(6000);
+        }
+        catch (InterruptedException e)
+        {
+            //do nothing
+        }
+
+    }
+
+
+    public static junit.framework.Test suite()
+    {
+        return new junit.framework.TestSuite(MessageListenerMultiConsumerTest.class);
+    }
+}

Propchange: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java?view=auto&rev=497016
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java (added)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java Wed Jan 17 05:31:48 2007
@@ -0,0 +1,164 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.    
+ *
+ * 
+ */
+package org.apache.qpid.client;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Message;
+import javax.jms.ConnectionFactory;
+import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+import java.util.Hashtable;
+
+/**
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue
+ * <p/>
+ * The message delivery process:
+ * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s
+ * from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at connection start
+ * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a
+ * session can run in any order and a synchronous put/poll will block the dispatcher).
+ * <p/>
+ * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered
+ * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ */
+public class MessageListenerTest extends TestCase implements MessageListener
+{
+    private static final Logger _logger = Logger.getLogger(MessageListenerTest.class);
+
+    Context _context;
+
+    private static final int MSG_COUNT = 5;
+    private int receivedCount = 0;
+    private MessageConsumer _consumer;
+    private Connection _clientConnection;
+    private boolean _testAsync;
+
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        TransportConnection.createVMBroker(1);
+
+        InitialContextFactory factory = new PropertiesFileInitialContextFactory();
+
+        Hashtable<String, String> env = new Hashtable<String, String>();
+        
+        env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/tests?brokerlist='vm://:1'");
+        env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
+
+        _context = factory.getInitialContext(env);
+
+        Queue queue = (Queue) _context.lookup("queue");
+
+        //Create Client
+        _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+        _clientConnection.start();
+
+        Session clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+        _consumer = clientSession.createConsumer(queue);
+
+        //Create Producer
+
+        Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+        producerConnection.start();
+
+        Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageProducer producer = producerSession.createProducer(queue);
+
+        for (int msg = 0; msg < MSG_COUNT; msg++)
+        {
+            producer.send(producerSession.createTextMessage("Message " + msg));
+        }
+
+        producerConnection.close();
+
+        _testAsync = false;
+    }
+
+    protected void tearDown() throws Exception
+    {
+        //Should have recieved all async messages
+        if (_testAsync)
+        {
+            assertEquals(MSG_COUNT, receivedCount);
+        }
+        _clientConnection.close();
+
+        super.tearDown();
+        TransportConnection.killAllVMBrokers();
+    }
+
+
+    public void testSynchronousRecieve() throws Exception
+    {
+
+        for (int msg = 0; msg < MSG_COUNT; msg++)
+        {
+            assertTrue(_consumer.receive() != null);
+        }
+    }
+
+    public void testAsynchronousRecieve() throws Exception
+    {
+        _testAsync = true;
+
+        _consumer.setMessageListener(this);
+
+
+        _logger.info("Waiting 3 seconds for messages");
+
+        try
+        {
+            Thread.sleep(2000);
+        }
+        catch (InterruptedException e)
+        {
+            //do nothing
+        }
+
+    }
+
+    public void onMessage(Message message)
+    {
+        _logger.info("Received Message(" + receivedCount + "):" + message);
+
+        receivedCount++;
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new junit.framework.TestSuite(MessageListenerTest.class);
+    }
+}

Propchange: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java?view=diff&rev=497016&r1=497015&r2=497016
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java Wed Jan 17 05:31:48 2007
@@ -23,7 +23,12 @@
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.testutil.VMBrokerSetup;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.mina.util.SessionLog;
+import org.apache.log4j.Logger;
+
 
 import javax.jms.*;
 
@@ -47,10 +52,12 @@
     private Session testSession;
     private MessageConsumer testConsumer1;
     private MessageConsumer testConsumer2;
+    private static final Logger _logger = Logger.getLogger(TransactedTest.class);
 
     protected void setUp() throws Exception
     {
         super.setUp();
+        TransportConnection.createVMBroker(1);
         queue1 = new AMQQueue("Q1", false);
         queue2 = new AMQQueue("Q2", false);
 
@@ -86,6 +93,7 @@
         con.close();
         testCon.close();
         prepCon.close();
+        TransportConnection.killAllVMBrokers();
         super.tearDown();
     }
 
@@ -132,6 +140,84 @@
         assertTrue(null == testConsumer2.receive(1000));
     }
 
+
+    public void testResendsMsgsAfterSessionClose() throws Exception
+    {
+        Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+
+        Session consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+        AMQQueue queue3 = new AMQQueue("Q3", false);
+        MessageConsumer consumer = consumerSession.createConsumer(queue3);
+        //force synch to ensure the consumer has resulted in a bound queue
+        ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+        Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+        Session producerSession = con2.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(queue3);
+
+        _logger.info("Sending four messages");
+        producer.send(producerSession.createTextMessage("msg1"));
+        producer.send(producerSession.createTextMessage("msg2"));
+        producer.send(producerSession.createTextMessage("msg3"));
+        producer.send(producerSession.createTextMessage("msg4"));
+
+        producerSession.commit();
+
+
+        _logger.info("Starting connection");
+        con.start();
+        TextMessage tm = (TextMessage) consumer.receive();
+
+        tm.acknowledge();
+        consumerSession.commit();
+
+        _logger.info("Received and acknowledged first message");
+        tm = (TextMessage) consumer.receive(1000);
+        assertNotNull(tm);
+        tm = (TextMessage) consumer.receive(1000);
+        assertNotNull(tm);
+        tm = (TextMessage) consumer.receive(1000);
+        assertNotNull(tm);
+        _logger.info("Received all four messages. Closing connection with three outstanding messages");
+
+        consumerSession.close();
+
+        consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+
+        consumer = consumerSession.createConsumer(queue3);
+
+        // no ack for last three messages so when I call recover I expect to get three messages back
+
+        tm = (TextMessage) consumer.receive(3000);
+        assertNotNull(tm);
+        assertEquals("msg2", tm.getText());
+
+        tm = (TextMessage) consumer.receive(3000);
+        assertNotNull(tm);
+        assertEquals("msg3", tm.getText());
+
+        tm = (TextMessage) consumer.receive(3000);
+        assertNotNull(tm);
+        assertEquals("msg4", tm.getText());
+
+        _logger.info("Received redelivery of three messages. Acknowledging last message");
+        tm.acknowledge();
+        consumerSession.commit();
+        _logger.info("Calling acknowledge with no outstanding messages");
+        // all acked so no messages to be delivered
+
+
+        tm = (TextMessage) consumer.receiveNoWait();
+        assertNull(tm);
+        _logger.info("No messages redelivered as is expected");
+
+        con.close();
+        con2.close();
+
+    }
+
+
+
+
     private void expect(String text, Message msg) throws JMSException
     {
         assertTrue(msg instanceof TextMessage);
@@ -140,6 +226,6 @@
 
     public static junit.framework.Test suite()
     {
-        return new VMBrokerSetup(new junit.framework.TestSuite(TransactedTest.class));
+        return new junit.framework.TestSuite(TransactedTest.class);
     }
 }