You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gr...@apache.org on 2011/02/17 15:50:28 UTC
svn commit: r1071631 - in /qpid/trunk/qpid/java:
client/src/main/java/org/apache/qpid/client/
systests/src/main/java/org/apache/qpid/test/client/
Author: grkvlt
Date: Thu Feb 17 14:50:28 2011
New Revision: 1071631
URL: http://svn.apache.org/viewvc?rev=1071631&view=rev
Log:
QPID-3008: Fix failover behaviour in 0-10 for QueueBrowserAutoAckTest
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1071631&r1=1071630&r2=1071631&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Thu Feb 17 14:50:28 2011
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
@@ -211,15 +212,13 @@ public class AMQConnectionDelegate_0_10
public void resubscribeSessions() throws JMSException, AMQException, FailoverException
{
List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values());
- _logger.info(String.format("Resubscribing sessions = %s sessions.size=%s", sessions, sessions.size()));
+ _logger.info(String.format("Resubscribing sessions = %s sessions.size=%d", sessions, sessions.size()));
for (AMQSession s : sessions)
{
- ((AMQSession_0_10) s)._qpidConnection = _qpidConnection;
s.resubscribe();
}
}
-
public void closeConnection(long timeout) throws JMSException, AMQException
{
try
@@ -257,12 +256,14 @@ public class AMQConnectionDelegate_0_10
ConnectionClose close = exc.getClose();
if (close == null)
{
+ _conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1));
+
try
{
if (_conn.firePreFailover(false) && _conn.attemptReconnection())
{
_conn.failoverPrep();
- _qpidConnection.resume();
+ _conn.resubscribeSessions();
_conn.fireFailoverComplete();
return;
}
@@ -271,6 +272,11 @@ public class AMQConnectionDelegate_0_10
{
_logger.error("error during failover", e);
}
+ finally
+ {
+ _conn.getProtocolHandler().getFailoverLatch().countDown();
+ _conn.getProtocolHandler().setFailoverLatch(null);
+ }
}
ExceptionListener listener = _conn._exceptionListener;
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1071631&r1=1071630&r2=1071631&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Thu Feb 17 14:50:28 2011
@@ -892,14 +892,6 @@ public class AMQSession_0_10 extends AMQ
public void resumed(Session ssn)
{
_qpidConnection = ssn.getConnection();
- try
- {
- resubscribe();
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
}
public void message(Session ssn, MessageTransfer xfr)
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java?rev=1071631&r1=1071630&r2=1071631&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java Thu Feb 17 14:50:28 2011
@@ -20,8 +20,8 @@
*/
package org.apache.qpid.test.client;
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.test.utils.FailoverBaseCase;
@@ -41,8 +41,6 @@ import java.util.Random;
public class QueueBrowserAutoAckTest extends FailoverBaseCase
{
- private static final Logger _logger = Logger.getLogger(QueueBrowserAutoAckTest.class);
-
protected Connection _clientConnection;
protected Session _clientSession;
protected Queue _queue;
@@ -53,10 +51,8 @@ public class QueueBrowserAutoAckTest ext
{
super.setUp();
-
//Create Client
_clientConnection = getConnection();
-
_clientConnection.start();
setupSession();
@@ -395,7 +391,6 @@ public class QueueBrowserAutoAckTest ext
closeBrowserBeforeAfterGetNext(10);
validate(messages);
-
}
/**
@@ -454,19 +449,15 @@ public class QueueBrowserAutoAckTest ext
{
int messages = 5;
-
sendMessages("connection1", messages);
if (!CLUSTERED)
{
sendMessages("connection2", messages);
}
-
checkQueueDepth(messages);
-
_logger.info("Creating Queue Browser");
-
QueueBrowser queueBrowser = _clientSession.createBrowser(_queue);
long queueDepth = 0;
@@ -477,19 +468,17 @@ public class QueueBrowserAutoAckTest ext
}
catch (AMQException e)
{
+ fail("Caught exception getting queue depth: " + e.getMessage());
}
assertEquals("Session reports Queue depth not as expected", messages, queueDepth);
-
int msgCount = 0;
-
int failPoint = 0;
failPoint = new Random().nextInt(messages) + 1;
Enumeration msgs = queueBrowser.getEnumeration();
-
while (msgs.hasMoreElements())
{
msgs.nextElement();
@@ -536,5 +525,4 @@ public class QueueBrowserAutoAckTest ext
//Validate all messages still on Broker 1
validate(messages);
}
-
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org