You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gr...@apache.org on 2011/02/17 15:50:28 UTC

svn commit: r1071631 - in /qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/ systests/src/main/java/org/apache/qpid/test/client/

Author: grkvlt
Date: Thu Feb 17 14:50:28 2011
New Revision: 1071631

URL: http://svn.apache.org/viewvc?rev=1071631&view=rev
Log:
QPID-3008: Fix failover behaviour in 0-10 for QueueBrowserAutoAckTest

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1071631&r1=1071630&r2=1071631&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Thu Feb 17 14:50:28 2011
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
@@ -211,15 +212,13 @@ public class AMQConnectionDelegate_0_10 
     public void resubscribeSessions() throws JMSException, AMQException, FailoverException
     {
         List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values());
-        _logger.info(String.format("Resubscribing sessions = %s sessions.size=%s", sessions, sessions.size()));
+        _logger.info(String.format("Resubscribing sessions = %s sessions.size=%d", sessions, sessions.size()));
         for (AMQSession s : sessions)
         {
-            ((AMQSession_0_10) s)._qpidConnection = _qpidConnection;
             s.resubscribe();
         }
     }
 
-
     public void closeConnection(long timeout) throws JMSException, AMQException
     {
         try
@@ -257,12 +256,14 @@ public class AMQConnectionDelegate_0_10 
         ConnectionClose close = exc.getClose();
         if (close == null)
         {
+            _conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1));
+            
             try
             {
                 if (_conn.firePreFailover(false) && _conn.attemptReconnection())
                 {
                     _conn.failoverPrep();
-                    _qpidConnection.resume();
+                    _conn.resubscribeSessions();
                     _conn.fireFailoverComplete();
                     return;
                 }
@@ -271,6 +272,11 @@ public class AMQConnectionDelegate_0_10 
             {
                 _logger.error("error during failover", e);
             }
+            finally
+            {
+                _conn.getProtocolHandler().getFailoverLatch().countDown();
+                _conn.getProtocolHandler().setFailoverLatch(null);
+            }
         }
 
         ExceptionListener listener = _conn._exceptionListener;

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1071631&r1=1071630&r2=1071631&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Thu Feb 17 14:50:28 2011
@@ -892,14 +892,6 @@ public class AMQSession_0_10 extends AMQ
     public void resumed(Session ssn)
     {
         _qpidConnection = ssn.getConnection();
-        try
-        {
-            resubscribe();
-        }
-        catch (AMQException e)
-        {
-            throw new RuntimeException(e);
-        }
     }
 
     public void message(Session ssn, MessageTransfer xfr)

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java?rev=1071631&r1=1071630&r2=1071631&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java Thu Feb 17 14:50:28 2011
@@ -20,8 +20,8 @@
  */
 package org.apache.qpid.test.client;
 
-import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.test.utils.FailoverBaseCase;
@@ -41,8 +41,6 @@ import java.util.Random;
 
 public class QueueBrowserAutoAckTest extends FailoverBaseCase
 {
-    private static final Logger _logger = Logger.getLogger(QueueBrowserAutoAckTest.class);
-
     protected Connection _clientConnection;
     protected Session _clientSession;
     protected Queue _queue;
@@ -53,10 +51,8 @@ public class QueueBrowserAutoAckTest ext
     {
         super.setUp();
 
-
         //Create Client
         _clientConnection = getConnection();
-
         _clientConnection.start();
 
         setupSession();
@@ -395,7 +391,6 @@ public class QueueBrowserAutoAckTest ext
         closeBrowserBeforeAfterGetNext(10);
 
         validate(messages);
-
     }
 
     /**
@@ -454,19 +449,15 @@ public class QueueBrowserAutoAckTest ext
     {
         int messages = 5;
 
-
         sendMessages("connection1", messages);
         if (!CLUSTERED)
         {
             sendMessages("connection2", messages);
         }
 
-
         checkQueueDepth(messages);
 
-
         _logger.info("Creating Queue Browser");
-
         QueueBrowser queueBrowser = _clientSession.createBrowser(_queue);
 
         long queueDepth = 0;
@@ -477,19 +468,17 @@ public class QueueBrowserAutoAckTest ext
         }
         catch (AMQException e)
         {
+            fail("Caught exception getting queue depth: " + e.getMessage());
         }
 
         assertEquals("Session reports Queue depth not as expected", messages, queueDepth);
 
-
         int msgCount = 0;
-
         int failPoint = 0;
 
         failPoint = new Random().nextInt(messages) + 1;
 
         Enumeration msgs = queueBrowser.getEnumeration();
-
         while (msgs.hasMoreElements())
         {
             msgs.nextElement();
@@ -536,5 +525,4 @@ public class QueueBrowserAutoAckTest ext
         //Validate all messages still on Broker 1
         validate(messages);
     }
-
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org