You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2012/09/10 14:39:45 UTC

svn commit: r1382799 - in /qpid/trunk/qpid/java: bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/client/handler/ client/src/main/java/org/apache/qpid/clie...

Author: robbie
Date: Mon Sep 10 12:39:44 2012
New Revision: 1382799

URL: http://svn.apache.org/viewvc?rev=1382799&view=rev
Log:
QPID-4289: Fix 0-8/0-9/0-9-1 failover issues

Applied patch from Philip Harvey <ph...@philharveyonline.com> and Oleksandr Rudyy <or...@gmail.com>

Added:
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestUtils.java
Modified:
    qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
    qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java
    qpid/trunk/qpid/java/test-profiles/CPPExcludes

Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java?rev=1382799&r1=1382798&r2=1382799&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java Mon Sep 10 12:39:44 2012
@@ -31,6 +31,7 @@ import org.apache.qpid.client.AMQConnect
 import org.apache.qpid.jms.ConnectionListener;
 import org.apache.qpid.jms.ConnectionURL;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TestUtils;
 
 import com.sleepycat.je.rep.ReplicationConfig;
 
@@ -134,7 +135,10 @@ public class HAClusterBlackboxTest exten
 
         public void assertFailoverOccurs(long delay) throws InterruptedException
         {
-            _failoverLatch.await(delay, TimeUnit.MILLISECONDS);
+            if (!_failoverLatch.await(delay, TimeUnit.MILLISECONDS))
+            {
+                LOGGER.warn("Test thread dump:\n\n" + TestUtils.dumpThreads() + "\n");
+            }
             assertEquals("Failover did not occur", 0, _failoverLatch.getCount());
         }
 

Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java?rev=1382799&r1=1382798&r2=1382799&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java Mon Sep 10 12:39:44 2012
@@ -204,7 +204,7 @@ public class HATestClusterCreator
 
     public void stopNode(final int brokerPortNumber)
     {
-        _testcase.stopBroker(brokerPortNumber);
+        _testcase.killBroker(brokerPortNumber);
     }
 
     public void stopCluster() throws Exception

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1382799&r1=1382798&r2=1382799&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Mon Sep 10 12:39:44 2012
@@ -1080,7 +1080,7 @@ public class AMQConnection extends Close
         return _started;
     }
 
-    protected final boolean isConnected()
+    public final boolean isConnected()
     {
         return _connected;
     }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1382799&r1=1382798&r2=1382799&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Mon Sep 10 12:39:44 2012
@@ -90,12 +90,13 @@ public class AMQConnectionDelegate_8_0 i
 
     public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws AMQException, IOException
     {
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Connecting to broker:" + brokerDetail);
+        }
         final Set<AMQState> openOrClosedStates =
                 EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED);
 
-
-        StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates);
-
         ConnectionSettings settings = brokerDetail.buildConnectionSettings();
         settings.setProtocol(brokerDetail.getTransport());
 
@@ -126,6 +127,8 @@ public class AMQConnectionDelegate_8_0 i
         OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(getProtocolVersion());
         NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()), sslContext);
         _conn.getProtocolHandler().setNetworkConnection(network, securityLayer.sender(network.getSender()));
+
+        StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates);
         _conn.getProtocolHandler().getProtocolSession().init();
         // this blocks until the connection has been set up or when an error
         // has prevented the connection being set up

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java?rev=1382799&r1=1382798&r2=1382799&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java Mon Sep 10 12:39:44 2012
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.client.handler;
 
+import java.nio.ByteBuffer;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,6 +35,7 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.framing.ConnectionCloseBody;
 import org.apache.qpid.framing.ConnectionCloseOkBody;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.transport.Sender;
 
 public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody>
 {
@@ -91,18 +94,15 @@ public class ConnectionCloseMethodHandle
         }
         finally
         {
+            Sender<ByteBuffer> sender = session.getSender();
 
             if (error != null)
             {
                 session.notifyError(error);
-            }            
-
-            // Close the protocol Session, including any open TCP connections 
-            session.closeProtocolSession();
+            }
 
-            // Closing the session should not introduce a race condition as this thread will continue to propgate any
-            // exception in to the exceptionCaught method of the SessionHandler.
-            // Any sessionClosed event should occur after this.
+            // Close the open TCP connection
+            sender.close();
         }
     }
 

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1382799&r1=1382798&r2=1382799&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Mon Sep 10 12:39:44 2012
@@ -67,6 +67,7 @@ import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the
@@ -210,48 +211,67 @@ public class AMQProtocolHandler implemen
         }
         else
         {
-            _logger.debug("Session closed called with failover state currently " + _failoverState);
-
-            // reconnetablility was introduced here so as not to disturb the client as they have made their intentions
-            // known through the policy settings.
-
-            if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed())
-            {
-                _logger.debug("FAILOVER STARTING");
-                if (_failoverState == FailoverState.NOT_STARTED)
-                {
-                    _failoverState = FailoverState.IN_PROGRESS;
-                    startFailoverThread();
-                }
-                else
-                {
-                    _logger.debug("Not starting failover as state currently " + _failoverState);
-                }
-            }
-            else
+            // Use local variable to keep flag whether fail-over allowed or not,
+            // in order to execute AMQConnection#exceptionRecievedout out of synchronization block,
+            // otherwise it might deadlock with failover mutex
+            boolean failoverNotAllowed = false;
+            synchronized (this)
             {
-                _logger.debug("Failover not allowed by policy."); // or already in progress?
-
                 if (_logger.isDebugEnabled())
                 {
-                    _logger.debug(_connection.getFailoverPolicy().toString());
+                    _logger.debug("Session closed called with failover state " + _failoverState);
                 }
 
-                if (_failoverState != FailoverState.IN_PROGRESS)
+                // reconnetablility was introduced here so as not to disturb the client as they have made their intentions
+                // known through the policy settings.
+                if (_failoverState == FailoverState.NOT_STARTED)
                 {
-                    _logger.debug("sessionClose() not allowed to failover");
-                    _connection.exceptionReceived(new AMQDisconnectedException(
-                            "Server closed connection and reconnection " + "not permitted.",
-                            _stateManager.getLastException()));
+                    // close the sender
+                    try
+                    {
+                        _sender.close();
+                    }
+                    catch (Exception e)
+                    {
+                        _logger.warn("Exception occured on closing the sender", e);
+                    }
+                    if (_connection.failoverAllowed())
+                    {
+                        _failoverState = FailoverState.IN_PROGRESS;
+
+                        _logger.debug("FAILOVER STARTING");
+                        startFailoverThread();
+                    }
+                    else if (_connection.isConnected())
+                    {
+                        failoverNotAllowed = true;
+                        if (_logger.isDebugEnabled())
+                        {
+                            _logger.debug("Failover not allowed by policy:" + _connection.getFailoverPolicy());
+                        }
+                    }
+                    else
+                    {
+                        _logger.debug("We are in process of establishing the initial connection");
+                    }
                 }
                 else
                 {
-                    _logger.debug("sessionClose() failover in progress");
+                    _logger.debug("Not starting the failover thread as state currently " + _failoverState);
                 }
             }
+
+            if (failoverNotAllowed)
+            {
+                _connection.exceptionReceived(new AMQDisconnectedException(
+                        "Server closed connection and reconnection not permitted.", _stateManager.getLastException()));
+            }
         }
 
-        _logger.debug("Protocol Session [" + this + "] closed");
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Protocol Session [" + this + "] closed");
+        }
     }
 
     /** See {@link FailoverHandler} to see rationale for separate thread. */
@@ -297,14 +317,17 @@ public class AMQProtocolHandler implemen
      */
     public void exception(Throwable cause)
     {
-        if (_failoverState == FailoverState.NOT_STARTED)
+        boolean connectionClosed = (cause instanceof AMQConnectionClosedException || cause instanceof IOException);
+        if (connectionClosed)
         {
-            if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException)
+            _network.close();
+        }
+        FailoverState state = getFailoverState();
+        if (state == FailoverState.NOT_STARTED)
+        {
+            if (connectionClosed)
             {
                 _logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
-                // this will attempt failover
-                _network.close();
-                closed();
             }
             else
             {
@@ -319,7 +342,7 @@ public class AMQProtocolHandler implemen
         }
         // we reach this point if failover was attempted and failed therefore we need to let the calling app
         // know since we cannot recover the situation
-        else if (_failoverState == FailoverState.FAILED)
+        else if (state == FailoverState.FAILED)
         {
             _logger.error("Exception caught by protocol handler: " + cause, cause);
 
@@ -329,6 +352,10 @@ public class AMQProtocolHandler implemen
             propagateExceptionToAllWaiters(amqe);
             _connection.exceptionReceived(cause);
         }
+        else
+        {
+            _logger.warn("Exception caught by protocol handler: " + cause, cause);
+        }
     }
 
     /**
@@ -792,14 +819,14 @@ public class AMQProtocolHandler implemen
         return _protocolSession;
     }
 
-    FailoverState getFailoverState()
+    synchronized FailoverState getFailoverState()
     {
         return _failoverState;
     }
 
-    public void setFailoverState(FailoverState failoverState)
+    public synchronized void setFailoverState(FailoverState failoverState)
     {
-        _failoverState = failoverState;
+        _failoverState= failoverState;
     }
 
     public byte getProtocolMajorVersion()
@@ -843,6 +870,11 @@ public class AMQProtocolHandler implemen
         _sender = sender;
     }
 
+    protected Sender<ByteBuffer> getSender()
+    {
+        return _sender;
+    }
+
     /** @param delay delay in seconds (not ms) */
     void initHeartbeats(int delay)
     {

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1382799&r1=1382798&r2=1382799&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Mon Sep 10 12:39:44 2012
@@ -48,6 +48,8 @@ import org.apache.qpid.transport.Transpo
 
 import javax.jms.JMSException;
 import javax.security.sasl.SaslClient;
+
+import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -372,6 +374,11 @@ public class AMQProtocolSession implemen
         }
     }
 
+    public Sender<ByteBuffer> getSender()
+    {
+        return _protocolHandler.getSender();
+    }
+
     public void failover(String host, int port)
     {
         _protocolHandler.failover(host, port);

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java?rev=1382799&r1=1382798&r2=1382799&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java Mon Sep 10 12:39:44 2012
@@ -1313,7 +1313,7 @@ public class FailoverBehaviourTest exten
      * @param acknowledgeMode session acknowledge mode
      * @throws JMSException
      */
-    private void sessionCloseWhileFailoverImpl(int acknowledgeMode) throws JMSException
+    private void sessionCloseWhileFailoverImpl(int acknowledgeMode) throws Exception
     {
         initDelayedFailover(acknowledgeMode);
 
@@ -1324,9 +1324,14 @@ public class FailoverBehaviourTest exten
 
         failBroker(getFailingPort());
 
+        // wait until failover is started
+        _failoverStarted.await(5, TimeUnit.SECONDS);
+
         // test whether session#close blocks while failover is in progress
         _consumerSession.close();
 
+        assertTrue("Failover has not completed yet but session was closed", _failoverComplete.await(5, TimeUnit.SECONDS));
+
         assertFailoverException();
     }
 
@@ -1360,10 +1365,8 @@ public class FailoverBehaviourTest exten
      * @param acknowledgeMode session acknowledge mode
      * @throws JMSException
      */
-    private void browserCloseWhileFailoverImpl(int acknowledgeMode) throws JMSException
+    private void browserCloseWhileFailoverImpl(int acknowledgeMode) throws Exception
     {
-        setDelayedFailoverPolicy();
-
         QueueBrowser browser = prepareQueueBrowser(acknowledgeMode);
 
         @SuppressWarnings("unchecked")
@@ -1373,8 +1376,13 @@ public class FailoverBehaviourTest exten
 
         failBroker(getFailingPort());
 
+        // wait until failover is started
+        _failoverStarted.await(5, TimeUnit.SECONDS);
+
         browser.close();
 
+        assertTrue("Failover has not completed yet but browser was closed", _failoverComplete.await(5, TimeUnit.SECONDS));
+
         assertFailoverException();
     }
 
@@ -1402,5 +1410,11 @@ public class FailoverBehaviourTest exten
         ((AMQConnection) _connection).setFailoverPolicy(failoverPolicy);
         return failoverPolicy;
     }
-    
+
+    @Override
+    public void failBroker(int port)
+    {
+        killBroker(port);
+    }
+
 }

Added: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java?rev=1382799&view=auto
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java (added)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java Mon Sep 10 12:39:44 2012
@@ -0,0 +1,255 @@
+package org.apache.qpid.client.failover;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQBrokerDetails;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.server.store.MessageStoreConstants;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TestUtils;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.util.FileUtils;
+
+public class MultipleBrokersFailoverTest extends QpidBrokerTestCase implements ConnectionListener
+{
+    private static final Logger _logger = Logger.getLogger(MultipleBrokersFailoverTest.class);
+
+    private static final String FAILOVER_VIRTUAL_HOST = "failover";
+    private static final String NON_FAILOVER_VIRTUAL_HOST = "nonfailover";
+    private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'";
+    private static final int FAILOVER_RETRIES = 1;
+    private static final int FAILOVER_CONNECTDELAY = 1000;
+    private int[] _brokerPorts;
+    private AMQConnectionURL _connectionURL;
+    private Connection _connection;
+    private CountDownLatch _failoverComplete;
+    private CountDownLatch _failoverStarted;
+    private Session _consumerSession;
+    private Destination _destination;
+    private MessageConsumer _consumer;
+    private Session _producerSession;
+    private MessageProducer _producer;
+
+    @Override
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+
+        int port = findFreePort();
+        _brokerPorts = new int[4];
+        _connectionURL = new AMQConnectionURL("amqp://guest:guest@test/" + FAILOVER_VIRTUAL_HOST
+                + "?&failover='roundrobin?cyclecount='1''");
+
+        // we need to create 4 brokers:
+        // 1st broker will be running in test JVM and will not have failover host (only tcp connection will established, amqp connection will be closed)
+        // 2d broker will be spawn in separate JVM and should have a failover host (amqp connection should be established)
+        // 3d broker will be spawn in separate JVM and should not have a failover host (only tcp connection will established, amqp connection will be closed)
+        // 4d broker will be spawn in separate JVM and should have a failover host (amqp connection should be established)
+
+        // the test should connect to the second broker first and fail over to the forth broker
+        // after unsuccessful try to establish the connection to the 3d broker
+        for (int i = 0; i < _brokerPorts.length; i++)
+        {
+            if (i > 0)
+            {
+                port = getNextAvailable(port + 1);
+            }
+            _brokerPorts[i] = port;
+
+            XMLConfiguration testConfiguration = new XMLConfiguration();
+            testConfiguration.addProperty("management.enabled", "false");
+
+            XMLConfiguration testVirtualhosts = new XMLConfiguration();
+            String host = null;
+            if (i == 1 || i == _brokerPorts.length - 1)
+            {
+                host = FAILOVER_VIRTUAL_HOST;
+            }
+            else
+            {
+                host = NON_FAILOVER_VIRTUAL_HOST;
+            }
+            testVirtualhosts.addProperty("virtualhost.name", host);
+            testVirtualhosts.addProperty("virtualhost." + host + ".store.class", getTestProfileMessageStoreClassName());
+            testVirtualhosts.addProperty(
+                    "virtualhost." + host + ".store." + MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY, "${QPID_WORK}/"
+                            + host);
+
+            startBroker(port, testConfiguration, testVirtualhosts);
+            revertSystemProperties();
+
+            _connectionURL.addBrokerDetails(new AMQBrokerDetails(String.format(BROKER_PORTION_FORMAT, port,
+                    FAILOVER_CONNECTDELAY, FAILOVER_RETRIES)));
+        }
+        _connection = getConnection(_connectionURL);
+        ((AMQConnection) _connection).setConnectionListener(this);
+        _failoverComplete = new CountDownLatch(1);
+        _failoverStarted = new CountDownLatch(1);
+    }
+
+    public void tearDown() throws Exception
+    {
+        try
+        {
+            super.tearDown();
+        }
+        finally
+        {
+            for (int i = 0; i < _brokerPorts.length; i++)
+            {
+                if (_brokerPorts[i] > 0)
+                {
+                    stopBrokerSafely(_brokerPorts[i]);
+                    FileUtils.deleteDirectory(System.getProperty("QPID_WORK") + "/" + getFailingPort());
+                }
+            }
+
+        }
+    }
+
+    public void startBroker() throws Exception
+    {
+        // noop, stop starting broker in super.tearDown()
+    }
+
+    public void testFailoverOnBrokerKill() throws Exception
+    {
+        init(Session.SESSION_TRANSACTED, true);
+        assertConnectionPort(_brokerPorts[1]);
+
+        assertSendReceive(0);
+
+        killBroker(_brokerPorts[1]);
+
+        awaitForFailoverCompletion(FAILOVER_CONNECTDELAY * _brokerPorts.length * 2);
+        assertEquals("Failover is not started as expected", 0, _failoverStarted.getCount());
+
+        assertSendReceive(2);
+        assertConnectionPort(_brokerPorts[_brokerPorts.length - 1]);
+    }
+
+    public void testFailoverOnBrokerStop() throws Exception
+    {
+        init(Session.SESSION_TRANSACTED, true);
+        assertConnectionPort(_brokerPorts[1]);
+
+        assertSendReceive(0);
+
+        stopBroker(_brokerPorts[1]);
+
+        awaitForFailoverCompletion(FAILOVER_CONNECTDELAY * _brokerPorts.length * 2);
+        assertEquals("Failover is not started as expected", 0, _failoverStarted.getCount());
+
+        assertSendReceive(1);
+        assertConnectionPort(_brokerPorts[_brokerPorts.length - 1]);
+    }
+
+    private void assertConnectionPort(int brokerPort)
+    {
+        int connectionPort = ((AMQConnection)_connection).getActiveBrokerDetails().getPort();
+        assertEquals("Unexpected broker port", brokerPort, connectionPort);
+    }
+
+    private void assertSendReceive(int index) throws JMSException
+    {
+        Message message = createNextMessage(_producerSession, index);
+        _producer.send(message);
+        if (_producerSession.getTransacted())
+        {
+            _producerSession.commit();
+        }
+        Message receivedMessage = _consumer.receive(1000l);
+        assertReceivedMessage(receivedMessage, index);
+        if (_consumerSession.getTransacted())
+        {
+            _consumerSession.commit();
+        }
+    }
+
+    private void awaitForFailoverCompletion(long delay)
+    {
+        _logger.info("Awaiting Failover completion..");
+        try
+        {
+            if (!_failoverComplete.await(delay, TimeUnit.MILLISECONDS))
+            {
+                _logger.warn("Test thread stack:\n\n" + TestUtils.dumpThreads());
+                fail("Failover did not complete");
+            }
+        }
+        catch (InterruptedException e)
+        {
+            fail("Test was interrupted:" + e.getMessage());
+        }
+    }
+
+    private void assertReceivedMessage(Message receivedMessage, int messageIndex)
+    {
+        assertNotNull("Expected message [" + messageIndex + "] is not received!", receivedMessage);
+        assertTrue(
+                "Failure to receive message [" + messageIndex + "], expected TextMessage but received " + receivedMessage,
+                receivedMessage instanceof TextMessage);
+    }
+
+    private void init(int acknowledgeMode, boolean startConnection) throws JMSException
+    {
+        boolean isTransacted = acknowledgeMode == Session.SESSION_TRANSACTED ? true : false;
+
+        _consumerSession = _connection.createSession(isTransacted, acknowledgeMode);
+        _destination = _consumerSession.createQueue(getTestQueueName());
+        _consumer = _consumerSession.createConsumer(_destination);
+
+        if (startConnection)
+        {
+            _connection.start();
+        }
+
+        _producerSession = _connection.createSession(isTransacted, acknowledgeMode);
+        _producer = _producerSession.createProducer(_destination);
+
+    }
+
+    @Override
+    public void bytesSent(long count)
+    {
+    }
+
+    @Override
+    public void bytesReceived(long count)
+    {
+    }
+
+    @Override
+    public boolean preFailover(boolean redirect)
+    {
+        _failoverStarted.countDown();
+        return true;
+    }
+
+    @Override
+    public boolean preResubscribe()
+    {
+        return true;
+    }
+
+    @Override
+    public void failoverComplete()
+    {
+        _failoverComplete.countDown();
+    }
+}

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java?rev=1382799&r1=1382798&r2=1382799&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java Mon Sep 10 12:39:44 2012
@@ -82,28 +82,7 @@ public class InternalBrokerHolder implem
     @Override
     public String dumpThreads()
     {
-        ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
-        ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true);
-        StringBuilder dump = new StringBuilder();
-        dump.append(String.format("%n"));
-        for (ThreadInfo threadInfo : threadInfos)
-        {
-            dump.append(threadInfo);
-        }
-
-        long[] deadLocks = threadMXBean.findDeadlockedThreads();
-        if (deadLocks != null && deadLocks.length > 0)
-        {
-            ThreadInfo[] deadlockedThreads = threadMXBean.getThreadInfo(deadLocks);
-            dump.append(String.format("%n"));
-            dump.append("Deadlock is detected!");
-            dump.append(String.format("%n"));
-            for (ThreadInfo threadInfo : deadlockedThreads)
-            {
-                dump.append(threadInfo);
-            }
-        }
-        return dump.toString();
+        return TestUtils.dumpThreads();
     }
 
     @Override

Added: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestUtils.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestUtils.java?rev=1382799&view=auto
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestUtils.java (added)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestUtils.java Mon Sep 10 12:39:44 2012
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.utils;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+
+public class TestUtils
+{
+    public static String dumpThreads()
+    {
+        ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+        ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true);
+        StringBuilder dump = new StringBuilder();
+        dump.append(String.format("%n"));
+        for (ThreadInfo threadInfo : threadInfos)
+        {
+            dump.append(threadInfo);
+        }
+
+        long[] deadLocks = threadMXBean.findDeadlockedThreads();
+        if (deadLocks != null && deadLocks.length > 0)
+        {
+            ThreadInfo[] deadlockedThreads = threadMXBean.getThreadInfo(deadLocks);
+            dump.append(String.format("%n"));
+            dump.append("Deadlock is detected!");
+            dump.append(String.format("%n"));
+            for (ThreadInfo threadInfo : deadlockedThreads)
+            {
+                dump.append(threadInfo);
+            }
+        }
+        return dump.toString();
+    }
+}

Modified: qpid/trunk/qpid/java/test-profiles/CPPExcludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/CPPExcludes?rev=1382799&r1=1382798&r2=1382799&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/CPPExcludes (original)
+++ qpid/trunk/qpid/java/test-profiles/CPPExcludes Mon Sep 10 12:39:44 2012
@@ -184,3 +184,6 @@ org.apache.qpid.disttest.*
 // Exclude java broker REST API tests
 org.apache.qpid.server.management.plugin.servlet.rest.*
 org.apache.qpid.systest.rest.acl.*
+
+// Exclude failover tests requiring virtual host functionality
+org.apache.qpid.client.failover.MultipleBrokersFailoverTest#*
\ No newline at end of file



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org