You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/08/05 13:10:29 UTC

svn commit: r682672 - in /incubator/qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/ perftests/src/main/java/org/apache/qpid/requestreply/ systests/src/main/java/org/apache/qpid/test/client/failover/

Author: aidan
Date: Tue Aug  5 04:10:28 2008
New Revision: 682672

URL: http://svn.apache.org/viewvc?rev=682672&view=rev
Log:
QPID-1206: Fix failover and failover tests

AMQConnection: remove dead and confusingly misnamed method

AMQSession: rename failedOver to failedOverDirty to convey actual usage, only set it if we failed over while dirty. Ewww!

BasicMessageConsumer: if we're in client ack mode, mark as dirty when we receive a message

PingPongProducer: calculate expected replies properly if we fail after a send or before a commit

FailoverTest: test transacted case

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=682672&r1=682671&r2=682672&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Tue Aug  5 04:10:28 2008
@@ -1322,24 +1322,6 @@
         _sessions.remove(channelId);
     }
 
-    /**
-     * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
-     * The caller must hold the failover mutex before calling this method.
-     */
-    public void resubscribeSesssions() throws JMSException, AMQException, FailoverException
-    {
-        ArrayList sessions = new ArrayList(_sessions.values());
-        _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
-        for (Iterator it = sessions.iterator(); it.hasNext();)
-        {
-            AMQSession s = (AMQSession) it.next();
-            // _protocolHandler.addSessionByChannel(s.getChannelId(), s);
-            reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
-            s.resubscribe();
-            s.setFlowControl(true);
-        }
-    }
-
     public String toString()
     {
         StringBuffer buf = new StringBuffer("AMQConnection:\n");

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=682672&r1=682671&r2=682672&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Aug  5 04:10:28 2008
@@ -368,8 +368,8 @@
 
     /** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */
     private boolean _dirty;
-    /** Has failover occured on this session */
-    private boolean _failedOver;
+    /** Has failover occured on this session with outstanding actions to commit? */
+    private boolean _failedOverDirty;
 
     private static final class FlowControlIndicator
     {
@@ -740,6 +740,7 @@
             }
             // Commits outstanding messages and acknowledgments
             sendCommit();
+            markClean();
         }
         catch (AMQException e)
         {
@@ -1796,7 +1797,10 @@
      */
     void resubscribe() throws AMQException
     {
-        _failedOver = true;
+        if (_dirty)
+        {
+            _failedOverDirty = true;
+        }
         resubscribeProducers();
         resubscribeConsumers();
     }
@@ -2586,7 +2590,7 @@
     public void markClean()
     {
         _dirty = false;
-        _failedOver = false;
+        _failedOverDirty = false;
     }
 
     /**
@@ -2596,7 +2600,7 @@
      */
     public boolean hasFailedOver()
     {
-        return _failedOver;
+        return _failedOverDirty;
     }
 
     /**

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=682672&r1=682671&r2=682672&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Tue Aug  5 04:10:28 2008
@@ -730,18 +730,9 @@
         {
             if (isMessageListenerSet())
             {
-                // we do not need a lock around the test above, and the dispatch below as it is invalid
-                // for an application to alter an installed listener while the session is started
-                // synchronized (_closed)
-                {
-                    // if (!_closed.get())
-                    {
-
-                        preApplicationProcessing(jmsMessage);
-                        getMessageListener().onMessage(jmsMessage);
-                        postDeliver(jmsMessage);
-                    }
-                }
+                preApplicationProcessing(jmsMessage);
+                getMessageListener().onMessage(jmsMessage);
+                postDeliver(jmsMessage);
             }
             else
             {
@@ -802,7 +793,7 @@
                 {
                     _session.acknowledgeMessage(msg.getDeliveryTag(), false);
                 }
-
+                _session.markDirty();
                 break;
 
             case Session.DUPS_OK_ACKNOWLEDGE:

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?rev=682672&r1=682671&r2=682672&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Tue Aug  5 04:10:28 2008
@@ -1673,11 +1673,9 @@
      */
     public int getExpectedNumPings(int numpings)
     {
-        // log.debug("public int getExpectedNumPings(int numpings = " + numpings + "): called");
-
-        // log.debug("Each ping will be received by " + (_isPubSub ? getConsumersPerDestination() : 1) + " consumers.");
-
-        return numpings * (_isPubSub ? getConsumersPerDestination() : 1);
+        // Wow, I'm freaking sorry about this return here...
+        return ((_failAfterSend || _failBeforeCommit) ? numpings - 1: numpings) *
+                                    (_isPubSub ? getConsumersPerDestination() : 1);
     }
 
     /**

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java?rev=682672&r1=682671&r2=682672&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java Tue Aug  5 04:10:28 2008
@@ -98,7 +98,7 @@
         super.tearDown();
     }
 
-    private void consumeMessages(int toConsume) throws JMSException
+    private void consumeMessages(int toConsume, boolean transacted) throws JMSException
     {
         Message msg;
         for (int i = 0; i < toConsume; i++)
@@ -107,31 +107,43 @@
             assertNotNull("Message " + i + " was null!", msg);
             assertEquals("message " + i, ((TextMessage) msg).getText());
         }
+        if (transacted) {
+            consumerSession.commit();
+        }
     }
 
-    private void sendMessages(int totalMessages) throws JMSException
+    private void sendMessages(int totalMessages, boolean transacted) throws JMSException
     {
         for (int i = 0; i < totalMessages; i++)
         {
             producer.send(producerSession.createTextMessage("message " + i));
         }
+        if (transacted)
+        {
+            producerSession.commit();
+        }
     }
 
     public void testP2PFailover() throws Exception
     {
-        testP2PFailover(NUM_MESSAGES, true);
+        testP2PFailover(NUM_MESSAGES, true, false);
     }
 
     public void testP2PFailoverWithMessagesLeft() throws Exception
     {
-        testP2PFailover(NUM_MESSAGES, false);
+        testP2PFailover(NUM_MESSAGES, false, false);
+    }
+
+    public void testP2PFailoverTransacted() throws Exception
+    {
+        testP2PFailover(NUM_MESSAGES, true, false);
     }
 
-    private void testP2PFailover(int totalMessages, boolean consumeAll) throws JMSException, NamingException
+    private void testP2PFailover(int totalMessages, boolean consumeAll, boolean transacted) throws JMSException, NamingException
     {
         Message msg = null;
-        init(false, Session.AUTO_ACKNOWLEDGE);
-        sendMessages(totalMessages);
+        init(transacted, Session.AUTO_ACKNOWLEDGE);
+        sendMessages(totalMessages, transacted);
 
         // Consume some messages
         int toConsume = totalMessages;
@@ -140,7 +152,7 @@
             toConsume = totalMessages / 2;
         }
 
-        consumeMessages(toConsume);
+        consumeMessages(toConsume, transacted);
 
         _logger.info("Failing over");
 
@@ -150,8 +162,8 @@
 
         assertNull("Should not have received message from new broker!", msg);
         // Check that messages still sent / received
-        sendMessages(totalMessages);
-        consumeMessages(totalMessages);
+        sendMessages(totalMessages, transacted);
+        consumeMessages(totalMessages, transacted);
     }
 
     private void causeFailure(long delay)
@@ -173,7 +185,7 @@
     public void testClientAckFailover() throws Exception
     {
         init(false, Session.CLIENT_ACKNOWLEDGE);
-        sendMessages(1);
+        sendMessages(1, false);
         Message msg = consumer.receive();
         assertNotNull("Expected msgs not received", msg);