You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/08/25 17:31:30 UTC

svn commit: r569688 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity: client/JMSTestCase.java jms/MessageConsumerImpl.java jms/QpidMessageListener.java jms/SessionImpl.java

Author: rajith
Date: Sat Aug 25 08:31:30 2007
New Revision: 569688

URL: http://svn.apache.org/viewvc?rev=569688&view=rev
Log:
I provided a fixed to the deadlock issue in MessageConsumerImpl.

Here is the deadlock issue
---------------------------
The internal receive thread acquires the _incomingMessageLock and blocks on sync()
The MINA thread gets on to onMessage() and blocks while trying to acquire the incomingMessageLock
Since the MINA thread doesn't return it can't process the execution.complete() sent by the broker.
Since the execution.complete doesn't get processed, the sync() doesn't return.
Hence the deadlock.

Solution
----------
I rewrote the receive logic using a LinkedBlockingQueue and leveraging the application thread that calls receive methods


Removed:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java?rev=569688&r1=569687&r2=569688&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java Sat Aug 25 08:31:30 2007
@@ -22,8 +22,8 @@
             msg.writeInt(123);
             prod.send(msg);
             
-            javax.jms.Message m = cons.receive();
-            System.out.println(m);
+            javax.jms.BytesMessage m = (javax.jms.BytesMessage)cons.receive();
+            System.out.println("Data : " + m.readInt());
             
         }
         catch(Exception e)

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java?rev=569688&r1=569687&r2=569688&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java Sat Aug 25 08:31:30 2007
@@ -17,24 +17,30 @@
  */
 package org.apache.qpidity.jms;
 
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
 
-import org.apache.qpidity.jms.message.QpidMessage;
-import org.apache.qpidity.RangeSet;
-import org.apache.qpidity.QpidException;
 import org.apache.qpidity.Option;
-import org.apache.qpidity.filter.MessageFilter;
-import org.apache.qpidity.filter.JMSSelectorFilter;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.RangeSet;
 import org.apache.qpidity.client.MessagePartListener;
 import org.apache.qpidity.client.util.MessagePartListenerAdapter;
 import org.apache.qpidity.exchange.ExchangeDefaults;
-
-import javax.jms.*;
+import org.apache.qpidity.filter.JMSSelectorFilter;
+import org.apache.qpidity.filter.MessageFilter;
+import org.apache.qpidity.jms.message.MessageFactory;
+import org.apache.qpidity.jms.message.QpidMessage;
 
 /**
  * Implementation of JMS message consumer
  */
-public class MessageConsumerImpl extends MessageActor implements MessageConsumer
+public class MessageConsumerImpl extends MessageActor implements MessageConsumer, org.apache.qpidity.client.util.MessageListener
 {
     // we can receive up to 100 messages for an asynchronous listener
     public static final int MAX_MESSAGE_TRANSFERRED = 100;
@@ -91,9 +97,9 @@
      * Nether exceed MAX_MESSAGE_TRANSFERRED
      */
     private int _messageAsyncrhonouslyReceived = 0;
-
-    private AtomicBoolean _messageReceived = new AtomicBoolean();
-
+    
+    private LinkedBlockingQueue<QpidMessage> _queue = new LinkedBlockingQueue<QpidMessage>();
+    
     //----- Constructors
     /**
      * Create a new MessageProducerImpl.
@@ -120,11 +126,8 @@
         _subscriptionName = subscriptionName;
         _isStopped = getSession().isStopped();
         // let's create a message part assembler
-        /**
-         * A Qpid message listener that pushes messages to this consumer session when this consumer is
-         * asynchronous or directly to this consumer when it is synchronously accessed.
-         */
-        MessagePartListener messageAssembler = new MessagePartListenerAdapter(new QpidMessageListener(this));
+        
+        MessagePartListener messageAssembler = new MessagePartListenerAdapter(this);
 
         if (destination instanceof Queue)
         {
@@ -183,10 +186,8 @@
         // this will prevent the broker from sending more than one message
         // When a messageListener is set the flow will be adjusted.
         // until then we assume it's for synchronous message consumption
-        getSession().getQpidSession()
-                .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
-
-        getSession().getQpidSession().sync();
+        requestCredit(1);
+        requestSync();
         // check for an exception
         if (getSession().getCurrentException() != null)
         {
@@ -266,9 +267,7 @@
             getSession().getQpidSession().messageStop(getMessageActorID());
         }
         _messageAsyncrhonouslyReceived = 0;
-        getSession().getQpidSession()
-                .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,
-                             MAX_MESSAGE_TRANSFERRED);
+        requestCredit(MAX_MESSAGE_TRANSFERRED);
     }
 
     /**
@@ -281,7 +280,28 @@
      */
     public Message receive() throws JMSException
     {
-        return receive(0);
+        // Check if we can get a message immediately
+        Message result;
+        result = receiveNoWait();
+        
+        if(result != null)
+        {
+            return result;
+        }
+        
+        try
+        {
+            // Now issue a credit and wait for the broker to send a message
+            // IMO no point doing a credit() flush() and sync() in a loop.
+            // This will only overload the broker. After the initial try we can wait
+            // for the broker to send a message when it gets one
+            requestCredit(1);
+            return (Message)_queue.take();
+        }
+        catch (Exception e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
     }
 
     /**
@@ -297,20 +317,35 @@
      */
     public Message receive(long timeout) throws JMSException
     {
+        checkClosed();
+        checkIfListenerSet();
         if (timeout < 0)
         {
             throw new JMSException("Invalid timeout value: " + timeout);
         }
+        
         Message result;
         try
         {
-            result = internalReceive(timeout);
+            // first check if we have any in the queue already
+            result = (Message)_queue.poll();
+            if(result == null)
+            {
+                requestCredit(1);
+                requestFlush();
+                // We shouldn't do a sync(). Bcos the timeout can happen
+                // before the sync() returns
+                return (Message)_queue.poll(timeout,TimeUnit.MILLISECONDS);
+            }
+            else
+            {
+                return result;
+            }
         }
         catch (Exception e)
         {
             throw ExceptionHelper.convertQpidExceptionToJMSException(e);
         }
-        return result;
     }
 
     /**
@@ -321,138 +356,56 @@
      */
     public Message receiveNoWait() throws JMSException
     {
+        checkClosed();
+        checkIfListenerSet();
         Message result;
         try
         {
-            result = internalReceive(-1);
+            // first check if we have any in the queue already
+            result = (Message)_queue.poll();
+            if(result == null)
+            {
+                requestCredit(1);
+                requestFlush();
+                requestSync();
+                return (Message)_queue.poll();
+            }
+            else
+            {
+                return result;
+            }            
         }
         catch (Exception e)
         {
             throw ExceptionHelper.convertQpidExceptionToJMSException(e);
         }
-        return result;
     }
-
-    // not public methods
-
-    /**
-     * Receive a synchronous message
-     * <p> This call blocks until a message arrives, the timeout expires, or this message consumer
-     * is closed.
-     * <p> A timeout of zero never expires, and the call blocks indefinitely (unless this message consumer
-     * is closed)
-     * <p> A timeout less than 0 returns the next message or null if one is not available.
-     *
-     * @param timeout The timeout value (in milliseconds)
-     * @return the next message or null if one is not available.
-     * @throws Exception If receiving the next message fails due to some internal error.
-     */
-    private Message internalReceive(long timeout) throws Exception
+    
+    // not public methods    
+    private void requestCredit(int units)
     {
-        checkNotClosed();
-        Message result = null;
-
-        if (_messageListener != null)
-        {
-            throw new javax.jms.IllegalStateException("A listener has already been set.");
-        }
-
-        synchronized (_incomingMessageLock)
-        {
-            // This indicate to the delivery thread to deliver the message to this consumer
-            // as it can happens that a message is delivered after a receive operation as returned.
-            _isReceiving = true;
-            boolean blockingReceived = timeout == 0;
-            if (!_isStopped)
-            {
-                // if this consumer is stopped then this will be call when starting
-                requestOneMessage();
-                //When sync() returns we know whether we have received a message or not.
-                System.out.println("Internal receive  -- Called sync()");
-                getSession().getQpidSession().sync();
-                System.out.println("Internal receive  -- Returned from sync()");
-            }
-            if (_messageReceived.get() && timeout < 0)
-            {
-                // this is a nowait and we havent received a message then we must immediatly return
-                result = null;
-            }
-            else
-            {
-                boolean messageReceived = false;
-                while (!messageReceived)
-                {
-                    long timeBeforeWait = 0;
-                    while (_incomingMessage == null && !_isClosed)
-                    {
-                        if (!blockingReceived)
-                        {
-                            timeBeforeWait = System.currentTimeMillis();
-                        }
-                        try
-                        {
-                            _incomingMessageLock.wait(timeout);
-                        }
-                        catch (InterruptedException e)
-                        {
-                            throw new RuntimeException(e);
-                        }
-                    }
-                    if (_incomingMessage != null)
-                    {
-                        result = (Message) _incomingMessage;
-                        // tell the session that a message is inprocess
-                        getSession().preProcessMessage(_incomingMessage);
-                        // tell the session to acknowledge this message (if required)
-                        getSession().acknowledgeMessage(_incomingMessage);
-                        _incomingMessage.afterMessageReceive();
-                        messageReceived = true;
-                    }
-                    else
-                    {
-                        //now setup the new timeout
-                        if (!blockingReceived)
-                        {
-                            timeout = timeout - (System.currentTimeMillis() - timeBeforeWait);
-                        }
-                        if (!_isClosed)
-                        {
-                            // we need to request a new message
-                            requestOneMessage();
-                            getSession().getQpidSession().sync();
-                            if (_messageReceived.get() && timeout < 0)
-                            {
-                                // we are waiting for too long and we haven't received a proper message
-                                result = null;
-                                messageReceived = true;
-                            }
-                        }
-                    }
-                }
-                _incomingMessage = null;
-            }
-            // We now release any message received for this consumer
-            _isReceiving = false;
-            getSession().testQpidException();
-        }
-        return result;
+        getSession().getQpidSession()
+        .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, units);
     }
-
-    /**
-     * Request a single message
-     */
-    private void requestOneMessage()
+    
+    private void requestFlush()
+    {
+        getSession().getQpidSession().messageFlush(getMessageActorID());
+    }
+        
+    private void requestSync()
+    {
+        getSession().getQpidSession().sync();
+    }
+    
+    private void checkClosed() throws JMSException
     {
-        if (_logger.isDebugEnabled())
+        if(_isStopped)
         {
-            _logger.debug("Requesting a single message");
+            throw new JMSException("Session is closed");
         }
-        getSession().getQpidSession()
-                .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
-        getSession().getQpidSession().messageFlush(getMessageActorID());
-        _messageReceived.set(false);
     }
-
+    
     /**
      * Stop the delivery of messages to this consumer.
      * <p>For asynchronous receiver, this operation blocks until the message listener
@@ -475,93 +428,42 @@
     {
         synchronized (_incomingMessageLock)
         {
-            _isStopped = false;
-            if (_isReceiving)
-            {
-                // there is a synch call waiting for a message to be delivered
-                // so tell the broker to deliver a message
-                getSession().getQpidSession()
-                        .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,
-                                     1);
-                getSession().getQpidSession().messageFlush(getMessageActorID());
-            }
+            _isStopped = false;            
         }
     }
 
-    /**
-     * Deliver a message to this consumer.
-     *
-     * @param message The message delivered to this consumer.
-     */
-    protected synchronized void onMessage(QpidMessage message)
-    {        
+    public void onMessage(org.apache.qpidity.api.Message message)
+    {
         try
         {
-            // if there is a message selector then we need to evaluate it.
-            boolean messageOk = true;
-            if (_messageSelector != null)
+            QpidMessage jmsMessage = MessageFactory.getQpidMessage(message);
+            if (_messageListener == null)
             {
-                messageOk = _filter.matches((Message) message);                
+                _queue.offer(jmsMessage);
             }
-            
-            System.out.println("Received a message- onMessage in message consumer Impl");
-            if (!messageOk && _preAcquire)
+            else
             {
-                // this is the case for topics
-                // We need to ack this message
-                System.out.println("onMessage - trying to ack message");                
-                acknowledgeMessage(message);
-                System.out.println("onMessage - acked message");
-            }
-            // now we need to acquire this message if needed
-            // this is the case of queue with a message selector set
-            if (!_preAcquire && messageOk)
-            {
-                System.out.println("onMessage - trying to acquire message");
-                messageOk = acquireMessage(message);
-                System.out.println("onMessage - acquired message");
+                // I still think we don't need that additional thread in SessionImpl
+                // if the Application blocks on a message thats fine
+                // getSession().dispatchMessage(getMessageActorID(), jmsMessage);
+                notifyMessageListener(jmsMessage);
             }
+        }    
+        catch (Exception e)
+        {
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+    
+   
+    public void notifyMessageListener(QpidMessage message)throws RuntimeException
+    {        
+        try
+        {
+            boolean messageOk = checkPreConditions(message);
 
-            // if this consumer is synchronous then set the current message and
-            // notify the waiting thread
-            if (_messageListener == null)
-            {
-                if (_logger.isDebugEnabled())
-                {
-                    _logger.debug("Received a message- onMessage in message consumer Impl");                    
-                }
-                synchronized (_incomingMessageLock)
-                {
-                    System.out.println("got incomming message lock");
-                    if (messageOk)
-                    {
-                        // we have received a proper message that we can deliver
-                        if (_isReceiving)
-                        {
-                            System.out.println("Is receiving true, setting message and notifying");
-                            _incomingMessage = message;
-                            _incomingMessageLock.notify();
-                        }
-                        else
-                        {
-                            // this message has been received after a received as returned
-                            // we need to release it
-                            releaseMessage(message);
-                        }
-                    }
-                    else
-                    {
-                        // oups the message did not match the selector or we did not manage to acquire it
-                        // If the receiver is still waiting for a message
-                        // then we need to request a new one from the server
-                        if (_isReceiving)
-                        {
-                            _incomingMessageLock.notify();
-                        }
-                    }
-                }
-            }
-            else
+            // only deliver the message if it is valid 
+            if (messageOk)
             {
                 _messageAsyncrhonouslyReceived++;
                 if (_messageAsyncrhonouslyReceived >= MAX_MESSAGE_TRANSFERRED)
@@ -569,56 +471,93 @@
                     // ask the server for the delivery of MAX_MESSAGE_TRANSFERRED more messages
                     resetAsynchMessageReceived();
                 }
-                // only deliver the message if it is valid 
-                if (messageOk)
+                
+                preApplicationProcessing(message);
+                // The JMS specs says:
+                /* The result of a listener throwing a RuntimeException depends on the session?s
+                * acknowledgment mode.
+                ? --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message
+                * will be immediately redelivered. The number of times a JMS provider will
+                * redeliver the same message before giving up is provider-dependent.
+                ? --- CLIENT_ACKNOWLEDGE - the next message for the listener is delivered.
+                * --- Transacted Session - the next message for the listener is delivered.
+                *
+                * The number of time we try redelivering the message is 0
+                **/
+               try
+                {
+                    
+                    _messageListener.onMessage((Message) message);
+                }
+                catch (RuntimeException re)
                 {
-                    // This is an asynchronous message
-                    // tell the session that a message is in process
-                    getSession().preProcessMessage(message);
-                    // If the session is transacted we need to ack the message first
-                    // This is because a message is associated with its tx only when acked
-                    if (getSession().getTransacted())
-                    {
-                        getSession().acknowledgeMessage(message);
-                    }
-                    // The JMS specs says:
-                    /* The result of a listener throwing a RuntimeException depends on the session?s
-                    * acknowledgment mode.
-                    ? --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message
-                    * will be immediately redelivered. The number of times a JMS provider will
-                    * redeliver the same message before giving up is provider-dependent.
-                    ? --- CLIENT_ACKNOWLEDGE - the next message for the listener is delivered.
-                    * --- Transacted Session - the next message for the listener is delivered.
-                    *
-                    * The number of time we try redelivering the message is 0
-                    **/
-                    try
-                    {
-                        message.afterMessageReceive();
-                        _messageListener.onMessage((Message) message);
-                    }
-                    catch (RuntimeException re)
-                    {
-                        // do nothing as this message will not be redelivered
-                    }
-                    // If the session has been recovered we then need to redelivered this message
-                    if (getSession().isInRecovery())
-                    {
-                        releaseMessage(message);
-                    }
-                    else if (!getSession().getTransacted())
-                    {
-                        // Tell the jms Session to ack this message if required
-                        getSession().acknowledgeMessage(message);
-                    }
+                    // do nothing as this message will not be redelivered
                 }
             }
+            
         }
         catch (Exception e)
         {
             throw new RuntimeException(e.getMessage());
         }
     }
+    
+    private void checkIfListenerSet() throws javax.jms.IllegalStateException
+    {
+
+        if (_messageListener != null)
+        {
+            throw new javax.jms.IllegalStateException("A listener has already been set.");
+        }
+    }
+    
+    private void preApplicationProcessing(QpidMessage message)throws Exception
+    {
+        getSession().preProcessMessage(message);
+        // If the session is transacted we need to ack the message first
+        // This is because a message is associated with its tx only when acked
+        if (getSession().getTransacted())
+        {
+            getSession().acknowledgeMessage(message);
+        }
+        message.afterMessageReceive();
+    }
+                    
+    private boolean checkPreConditions(QpidMessage message)throws QpidException
+    {
+        boolean messageOk = true;
+        if (_messageSelector != null)
+        {
+            messageOk = _filter.matches((Message) message);     
+            if (!messageOk)
+            {
+                System.out.println("Message not OK, releasing");
+                releaseMessage(message);
+            }
+        }
+        
+        System.out.println("messageOk " + messageOk);
+        System.out.println("_preAcquire " + _preAcquire);
+        
+        if (!messageOk && _preAcquire)
+        {
+            // this is the case for topics
+            // We need to ack this message
+            System.out.println("filterMessage - trying to ack message");                
+            acknowledgeMessage(message);
+            System.out.println("filterMessage - acked message");
+        }
+        // now we need to acquire this message if needed
+        // this is the case of queue with a message selector set
+        if (!_preAcquire && messageOk)
+        {
+            System.out.println("filterMessage - trying to acquire message");
+            messageOk = acquireMessage(message);
+            System.out.println("filterMessage - acquired message");
+        }
+        
+        return messageOk;
+    }
 
     /**
      * Release a message
@@ -680,10 +619,5 @@
             getSession().getQpidSession().messageAcknowledge(ranges);
             getSession().testQpidException();
         }
-    }
-
-    public void notifyMessageReceived()
-    {
-        _messageReceived.set(true);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java?rev=569688&r1=569687&r2=569688&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java Sat Aug 25 08:31:30 2007
@@ -1301,7 +1301,8 @@
                     {
                         try
                         {
-                            mc.onMessage(message.getMessage());
+                           // mc.onMessage(message.getMessage());
+                            mc.notifyMessageListener(message.getMessage());
                         }
                         catch (RuntimeException t)
                         {