You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/04/17 18:20:06 UTC

svn commit: r529666 - in /incubator/qpid/branches/M2/java/client/src: main/java/org/apache/qpid/client/ test/java/org/apache/qpid/client/ test/java/org/apache/qpid/test/unit/close/ test/java/org/apache/qpid/test/unit/topic/ test/java/org/apache/qpid/te...

Author: ritchiem
Date: Tue Apr 17 09:19:59 2007
New Revision: 529666

URL: http://svn.apache.org/viewvc?view=rev&rev=529666
Log:
QPID-455 Prefetched messages can cause problems with client tools.
AMQSession - suspend channel at startup until start() and recieve/setMessageListener are called.
BasicMessageConsumer - mainly style sheet changes
MessageListenerMultiConsumerTest - removed one test case as we cannot ensure round-robin effect at start up .. added test case for only c2 consuming when c1 does nothing.
MessageListenerTest - added new test that can demonstrate a further bug of message 'loss' when a receive is called only once before a message listener is set. Prefetched message end up on _SynchronousQueue regression of QPID-293 as of r501004.
MessageRequeueTest - Was missing a conn.start()
DurableSubscriptionTest - Removed blocking receives() so we don't block on failure
CommitRollbackTest - Text message was wrong on testGetThenDisconnect tests so adjusted



Modified:
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
    incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
    incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
    incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
    incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java

Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=529666&r1=529665&r2=529666
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Apr 17 09:19:59 2007
@@ -202,6 +202,7 @@
     /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
 
     private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class);
+    private AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
 
     private class Dispatcher extends Thread
     {
@@ -328,7 +329,7 @@
                         }
                     }
                     // Don't reject if we're already closing
-                    if(!_closed.get())
+                    if (!_closed.get())
                     {
                         rejectMessage(message, true);
                     }
@@ -999,7 +1000,7 @@
     }
 
     public BasicMessageProducer createProducer(Destination destination, boolean mandatory,
-                                          boolean immediate, boolean waitUntilSent)
+                                               boolean immediate, boolean waitUntilSent)
             throws JMSException
     {
         return createProducerImpl(destination, mandatory, immediate, waitUntilSent);
@@ -1023,14 +1024,14 @@
     }
 
     private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory,
-                                                                   boolean immediate)
+                                                    boolean immediate)
             throws JMSException
     {
         return createProducerImpl(destination, mandatory, immediate, false);
     }
 
     private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory,
-                                                                   final boolean immediate, final boolean waitUntilSent)
+                                                    final boolean immediate, final boolean waitUntilSent)
             throws JMSException
     {
         return (BasicMessageProducer) new FailoverSupport()
@@ -1947,6 +1948,24 @@
         {
             _dispatcher.setConnectionStopped(initiallyStopped);
         }
+
+        if (!AMQSession.this._closed.get()
+            && AMQSession.this._startedAtLeastOnce.get()
+            && _firstDispatcher.getAndSet(false))
+        {
+            if (isSuspended())
+            {
+                try
+                {
+                    suspendChannel(false);
+                }
+                catch (AMQException e)
+                {
+                    _logger.info("Suspending channel threw an exception:" + e);
+                }
+            }
+        }
+
     }
 
     void stop() throws AMQException
@@ -1978,6 +1997,21 @@
         AMQShortString queueName = declareQueue(amqd, protocolHandler);
 
         bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
+
+        if (_dispatcher == null)
+        {
+            if (!isSuspended())
+            {
+                try
+                {
+                    suspendChannel(true);
+                }
+                catch (AMQException e)
+                {
+                    _logger.info("Suspending channel threw an exception:" + e);
+                }
+            }
+        }
 
         try
         {

Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=529666&r1=529665&r2=529666
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Tue Apr 17 09:19:59 2007
@@ -140,9 +140,9 @@
     private List<StackTraceElement> _closedStack = null;
 
     protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
-        String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
-        AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
-        boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+                                   String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
+                                   AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
+                                   boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
     {
         _channelId = channelId;
         _connection = connection;
@@ -219,7 +219,7 @@
             if (_logger.isDebugEnabled())
             {
                 _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination "
-                    + _destination);
+                              + _destination);
             }
         }
         else
@@ -468,7 +468,7 @@
                     if (_closedStack != null)
                     {
                         _logger.trace(_consumerTag + " close():"
-                            + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+                                      + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
                         _logger.trace(_consumerTag + " previously:" + _closedStack.toString());
                     }
                     else
@@ -481,9 +481,9 @@
                 {
                     // TODO: Be aware of possible changes to parameter order as versions change.
                     final AMQFrame cancelFrame =
-                        BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
-                            _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag
-                            false); // nowait
+                            BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
+                                                           _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag
+                                                           false); // nowait
 
                     try
                     {
@@ -498,6 +498,7 @@
                     catch (AMQException e)
                     {
                         // _logger.error("Error closing consumer: " + e, e);
+                        e.printStackTrace();
                         JMSException jmse = new JMSException("Error closing consumer: " + e);
                         jmse.setLinkedException(e);
                         throw jmse;
@@ -540,7 +541,7 @@
                 if (_closedStack != null)
                 {
                     _logger.trace(_consumerTag + " markClosed():"
-                        + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
+                                  + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
                     _logger.trace(_consumerTag + " previously:" + _closedStack.toString());
                 }
                 else
@@ -572,9 +573,9 @@
         try
         {
             AbstractJMSMessage jmsMessage =
-                _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag,
-                    messageFrame.getDeliverBody().redelivered, messageFrame.getDeliverBody().exchange,
-                    messageFrame.getDeliverBody().routingKey, messageFrame.getContentHeader(), messageFrame.getBodies());
+                    _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag,
+                                                  messageFrame.getDeliverBody().redelivered, messageFrame.getDeliverBody().exchange,
+                                                  messageFrame.getDeliverBody().routingKey, messageFrame.getContentHeader(), messageFrame.getBodies());
 
             if (debug)
             {
@@ -659,15 +660,15 @@
         switch (_acknowledgeMode)
         {
 
-        case Session.PRE_ACKNOWLEDGE:
-            _session.acknowledgeMessage(msg.getDeliveryTag(), false);
-            break;
-
-        case Session.CLIENT_ACKNOWLEDGE:
-            // we set the session so that when the user calls acknowledge() it can call the method on session
-            // to send out the appropriate frame
-            msg.setAMQSession(_session);
-            break;
+            case Session.PRE_ACKNOWLEDGE:
+                _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+                break;
+
+            case Session.CLIENT_ACKNOWLEDGE:
+                // we set the session so that when the user calls acknowledge() it can call the method on session
+                // to send out the appropriate frame
+                msg.setAMQSession(_session);
+                break;
         }
     }
 
@@ -677,55 +678,55 @@
         switch (_acknowledgeMode)
         {
 
-        case Session.CLIENT_ACKNOWLEDGE:
-            if (isNoConsume())
-            {
-                _session.acknowledgeMessage(msg.getDeliveryTag(), false);
-            }
+            case Session.CLIENT_ACKNOWLEDGE:
+                if (isNoConsume())
+                {
+                    _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+                }
 
-            break;
+                break;
 
-        case Session.DUPS_OK_ACKNOWLEDGE:
-            if (++_outstanding >= _prefetchHigh)
-            {
-                _dups_ok_acknowledge_send = true;
-            }
+            case Session.DUPS_OK_ACKNOWLEDGE:
+                if (++_outstanding >= _prefetchHigh)
+                {
+                    _dups_ok_acknowledge_send = true;
+                }
 
-            if (_outstanding <= _prefetchLow)
-            {
-                _dups_ok_acknowledge_send = false;
-            }
+                if (_outstanding <= _prefetchLow)
+                {
+                    _dups_ok_acknowledge_send = false;
+                }
 
-            if (_dups_ok_acknowledge_send)
-            {
-                if (!_session.isInRecovery())
+                if (_dups_ok_acknowledge_send)
                 {
-                    _session.acknowledgeMessage(msg.getDeliveryTag(), true);
+                    if (!_session.isInRecovery())
+                    {
+                        _session.acknowledgeMessage(msg.getDeliveryTag(), true);
+                    }
                 }
-            }
 
-            break;
+                break;
 
-        case Session.AUTO_ACKNOWLEDGE:
-            // we do not auto ack a message if the application code called recover()
-            if (!_session.isInRecovery())
-            {
-                _session.acknowledgeMessage(msg.getDeliveryTag(), false);
-            }
+            case Session.AUTO_ACKNOWLEDGE:
+                // we do not auto ack a message if the application code called recover()
+                if (!_session.isInRecovery())
+                {
+                    _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+                }
 
-            break;
+                break;
 
-        case Session.SESSION_TRANSACTED:
-            if (isNoConsume())
-            {
-                _session.acknowledgeMessage(msg.getDeliveryTag(), false);
-            }
-            else
-            {
-                _receivedDeliveryTags.add(msg.getDeliveryTag());
-            }
+            case Session.SESSION_TRANSACTED:
+                if (isNoConsume())
+                {
+                    _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+                }
+                else
+                {
+                    _receivedDeliveryTags.add(msg.getDeliveryTag());
+                }
 
-            break;
+                break;
         }
     }
 
@@ -757,7 +758,7 @@
                 if (_closedStack != null)
                 {
                     _logger.trace(_consumerTag + " notifyError():"
-                        + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
+                                  + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
                     _logger.trace(_consumerTag + " previously" + _closedStack.toString());
                 }
                 else
@@ -877,7 +878,7 @@
             if (_logger.isDebugEnabled())
             {
                 _logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)"
-                    + "for consumer with tag:" + _consumerTag);
+                              + "for consumer with tag:" + _consumerTag);
             }
 
             Long tag = _receivedDeliveryTags.poll();
@@ -907,7 +908,7 @@
             if (_logger.isDebugEnabled())
             {
                 _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)"
-                    + "for consumer with tag:" + _consumerTag);
+                              + "for consumer with tag:" + _consumerTag);
             }
 
             Iterator iterator = _synchronousQueue.iterator();
@@ -931,7 +932,7 @@
                 else
                 {
                     _logger.error("Queue contained a :" + o.getClass()
-                        + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
+                                  + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
                     iterator.remove();
                 }
             }

Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java?view=diff&rev=529666&r1=529665&r2=529666
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java (original)
+++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java Tue Apr 17 09:19:59 2007
@@ -65,6 +65,7 @@
 
     private final CountDownLatch _allMessagesSent = new CountDownLatch(2); //all messages Sent Lock
 
+
     protected void setUp() throws Exception
     {
         super.setUp();
@@ -122,30 +123,39 @@
         TransportConnection.killAllVMBrokers();
     }
 
+//    public void testRecieveC1thenC2() throws Exception
+//    {
+//
+//        for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+//        {
+//
+//            assertTrue(_consumer1.receive() != null);
+//        }
+//
+//        for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+//        {
+//            assertTrue(_consumer2.receive() != null);
+//        }
+//    }
 
-    public void testRecieveC1thenC2() throws Exception
+    public void testRecieveInterleaved() throws Exception
     {
-
-        for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+        int msg = 0;
+        int MAX_LOOPS = MSG_COUNT * 2;
+        for (int loops = 0; msg < MSG_COUNT || loops < MAX_LOOPS; loops++)
         {
 
-            assertTrue(_consumer1.receive() != null);
+            if (_consumer1.receive(100) != null)
+            {
+                msg++;
+            }
+            if (_consumer2.receive(100) != null)
+            {
+                msg++;
+            }
         }
 
-        for (int msg = 0; msg < MSG_COUNT / 2; msg++)
-        {
-            assertTrue(_consumer2.receive() != null);
-        }
-    }
-
-    public void testRecieveInterleaved() throws Exception
-    {
-
-        for (int msg = 0; msg < MSG_COUNT / 2; msg++)
-        {
-            assertTrue(_consumer1.receive() != null);
-            assertTrue(_consumer2.receive() != null);
-        }
+        assertEquals("Not all messages received.", MSG_COUNT, msg);
     }
 
 
@@ -161,7 +171,7 @@
 
                 if (receivedCount1 == MSG_COUNT / 2)
                 {
-                    _allMessagesSent.countDown();                    
+                    _allMessagesSent.countDown();
                 }
 
             }
@@ -194,6 +204,15 @@
         }
 
         assertEquals(MSG_COUNT, receivedCount1 + receivedCount2);
+    }
+
+    public void testRecieveC2Only() throws Exception
+    {
+        for (int msg = 0; msg < MSG_COUNT; msg++)
+        {
+            assertTrue(MSG_COUNT + " msg should be received. Only received:" + msg,
+                       _consumer2.receive(1000) != null);
+        }
     }
 
 

Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java?view=diff&rev=529666&r1=529665&r2=529666
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java (original)
+++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java Tue Apr 17 09:19:59 2007
@@ -144,6 +144,36 @@
 
     }
 
+    public void testRecieveTheUseMessageListener() throws Exception
+     {
+
+         _logger.error("Test disabled as initial receive is not called first");
+         // Perform initial receive to start connection
+//         assertTrue(_consumer.receive(2000) != null);
+//         receivedCount++;
+
+         // Sleep to ensure remaining 4 msgs end up on _synchronousQueue
+//         Thread.sleep(1000);
+
+         // Set the message listener and wait for the messages to come in.
+         _consumer.setMessageListener(this);
+
+         _logger.info("Waiting 3 seconds for messages");
+
+         try
+         {
+             _awaitMessages.await(3000, TimeUnit.MILLISECONDS);
+         }
+         catch (InterruptedException e)
+         {
+             //do nothing
+         }
+         //Should have recieved all async messages
+         assertEquals(MSG_COUNT, receivedCount);
+
+     }
+    
+
     public void onMessage(Message message)
     {
         _logger.info("Received Message(" + receivedCount + "):" + message);

Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java?view=diff&rev=529666&r1=529665&r2=529666
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java (original)
+++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java Tue Apr 17 09:19:59 2007
@@ -330,7 +330,7 @@
     public void testRequeue() throws JMSException, AMQException, URLSyntaxException
     {
         int run = 0;
-        while (run < 10)
+//        while (run < 10)
         {
             run++;
 
@@ -350,17 +350,10 @@
             _logger.debug("Create Consumer");
             MessageConsumer consumer = session.createConsumer(q);
 
-            try
-            {
-                Thread.sleep(2000);
-            }
-            catch (InterruptedException e)
-            {
-                //
-            }
+            conn.start();
 
             _logger.debug("Receiving msg");
-            Message msg = consumer.receive(1000);
+            Message msg = consumer.receive(2000);
 
             assertNotNull("Message should not be null", msg);
 

Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?view=diff&rev=529666&r1=529665&r2=529666
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java (original)
+++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java Tue Apr 17 09:19:59 2007
@@ -100,7 +100,9 @@
         AMQTopic topic = new AMQTopic(con,"MyTopic");
         Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
         MessageConsumer consumer1 = session1.createConsumer(topic);
-        MessageProducer producer = session1.createProducer(topic);
+
+        Session sessionProd = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+        MessageProducer producer = sessionProd.createProducer(topic);
 
         Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
         TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
@@ -112,12 +114,12 @@
         Message msg;
         msg = consumer1.receive();
         assertEquals("A", ((TextMessage) msg).getText());
-        msg = consumer1.receive(1000);
+        msg = consumer1.receive(100);
         assertEquals(null, msg);
 
         msg = consumer2.receive();
         assertEquals("A", ((TextMessage) msg).getText());
-        msg = consumer2.receive(1000);
+        msg = consumer2.receive(100);
         assertEquals(null, msg);
 
         consumer2.close();
@@ -127,14 +129,14 @@
 
         producer.send(session1.createTextMessage("B"));
 
-        msg = consumer1.receive();
+        msg = consumer1.receive(100);
         assertEquals("B", ((TextMessage) msg).getText());
-        msg = consumer1.receive(1000);
+        msg = consumer1.receive(100);
         assertEquals(null, msg);
 
-        msg = consumer3.receive();
+        msg = consumer3.receive(100);
         assertEquals("B", ((TextMessage) msg).getText());
-        msg = consumer3.receive(1000);
+        msg = consumer3.receive(100);
         assertEquals(null, msg);
 
         con.close();

Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java?view=diff&rev=529666&r1=529665&r2=529666
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java (original)
+++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java Tue Apr 17 09:19:59 2007
@@ -267,7 +267,7 @@
         assertTrue("session is not transacted", _pubSession.getTransacted());
 
         _logger.info("sending test message");
-        String MESSAGE_TEXT = "testGetThenDisconnect";
+        String MESSAGE_TEXT = "testGetThenRollback";
         _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
 
         _pubSession.commit();