You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2011/02/09 23:52:14 UTC
svn commit: r1069159 - in
/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport:
ClientDelegate.java Connection.java
Author: rajith
Date: Wed Feb 9 22:52:14 2011
New Revision: 1069159
URL: http://svn.apache.org/viewvc?rev=1069159&view=rev
Log:
QPID-3043
Added the extra state 'RESUMING', to ensure that any new session creation is delayed until the connection is able to reattach all existing sessions. If a connection is reconnecting then it will first go to RESUMING state (instead of OPEN) in connection-open-ok. Once the 'resume' method in Connection.java is completed the state will be set to OPEN.
Modified:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java?rev=1069159&r1=1069158&r2=1069159&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java Wed Feb 9 22:52:14 2011
@@ -28,6 +28,7 @@ import org.ietf.jgss.Oid;
import org.apache.qpid.security.UsernamePasswordCallbackHandler;
import static org.apache.qpid.transport.Connection.State.OPEN;
+import static org.apache.qpid.transport.Connection.State.RESUMING;
import org.apache.qpid.transport.util.Logger;
import javax.security.sasl.Sasl;
@@ -216,7 +217,14 @@ public class ClientDelegate extends Conn
}
}
- conn.setState(OPEN);
+ if (conn.isConnectionResuming())
+ {
+ conn.setState(RESUMING);
+ }
+ else
+ {
+ conn.setState(OPEN);
+ }
}
@Override
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1069159&r1=1069158&r2=1069159&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Wed Feb 9 22:52:14 2011
@@ -25,6 +25,7 @@ import static org.apache.qpid.transport.
import static org.apache.qpid.transport.Connection.State.NEW;
import static org.apache.qpid.transport.Connection.State.OPEN;
import static org.apache.qpid.transport.Connection.State.OPENING;
+import static org.apache.qpid.transport.Connection.State.RESUMING;
import java.util.ArrayList;
import java.util.Collections;
@@ -32,6 +33,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.security.sasl.SaslClient;
@@ -63,7 +65,7 @@ public class Connection extends Connecti
public static final int MAX_CHANNEL_MAX = 0xFFFF;
public static final int MIN_USABLE_CHANNEL_NUM = 0;
- public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD }
+ public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
static class DefaultConnectionListener implements ConnectionListener
{
@@ -119,7 +121,8 @@ public class Connection extends Connecti
private static final AtomicLong idGenerator = new AtomicLong(0);
private final long _connectionId = idGenerator.incrementAndGet();
-
+ private static final AtomicBoolean connectionLost = new AtomicBoolean(false);
+
public Connection() {}
public void setConnectionDelegate(ConnectionDelegate delegate)
@@ -270,6 +273,8 @@ public class Connection extends Connecti
close();
throw new ConnectionException("connect() timed out");
case OPEN:
+ case RESUMING:
+ connectionLost.set(false);
break;
case CLOSED:
throw new ConnectionException("connect() aborted");
@@ -475,11 +480,13 @@ public class Connection extends Connecti
ssn.resume();
}
}
+ setState(OPEN);
}
}
public void exception(ConnectionException e)
{
+ connectionLost.set(true);
synchronized (lock)
{
switch (state)
@@ -682,5 +689,10 @@ public class Connection extends Connecti
{
return securityLayer;
}
+
+ public boolean isConnectionResuming()
+ {
+ return connectionLost.get();
+ }
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org