You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2011/02/09 23:52:14 UTC

svn commit: r1069159 - in /qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport: ClientDelegate.java Connection.java

Author: rajith
Date: Wed Feb  9 22:52:14 2011
New Revision: 1069159

URL: http://svn.apache.org/viewvc?rev=1069159&view=rev
Log:
QPID-3043
Added the extra state 'RESUMING', to ensure that any new session creation is delayed until the connection is able to reattach all existing sessions. If a connection is reconnecting then it will first go to RESUMING state (instead of OPEN) in connection-open-ok. Once the 'resume' method in Connection.java is completed the state will be set to OPEN.

Modified:
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java?rev=1069159&r1=1069158&r2=1069159&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java Wed Feb  9 22:52:14 2011
@@ -28,6 +28,7 @@ import org.ietf.jgss.Oid;
 
 import org.apache.qpid.security.UsernamePasswordCallbackHandler;
 import static org.apache.qpid.transport.Connection.State.OPEN;
+import static org.apache.qpid.transport.Connection.State.RESUMING;
 import org.apache.qpid.transport.util.Logger;
 
 import javax.security.sasl.Sasl;
@@ -216,7 +217,14 @@ public class ClientDelegate extends Conn
             }
         }
         
-        conn.setState(OPEN);
+        if (conn.isConnectionResuming())
+        {
+            conn.setState(RESUMING);
+        }
+        else
+        {
+            conn.setState(OPEN);
+        }
     }
 
     @Override

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1069159&r1=1069158&r2=1069159&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Wed Feb  9 22:52:14 2011
@@ -25,6 +25,7 @@ import static org.apache.qpid.transport.
 import static org.apache.qpid.transport.Connection.State.NEW;
 import static org.apache.qpid.transport.Connection.State.OPEN;
 import static org.apache.qpid.transport.Connection.State.OPENING;
+import static org.apache.qpid.transport.Connection.State.RESUMING;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -32,6 +33,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.security.sasl.SaslClient;
@@ -63,7 +65,7 @@ public class Connection extends Connecti
     public static final int MAX_CHANNEL_MAX = 0xFFFF;
     public static final int MIN_USABLE_CHANNEL_NUM = 0;
 
-    public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD }
+    public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
 
     static class DefaultConnectionListener implements ConnectionListener
     {
@@ -119,7 +121,8 @@ public class Connection extends Connecti
     
     private static final AtomicLong idGenerator = new AtomicLong(0);
     private final long _connectionId = idGenerator.incrementAndGet();
-
+    private static final AtomicBoolean connectionLost = new AtomicBoolean(false);
+    
     public Connection() {}
 
     public void setConnectionDelegate(ConnectionDelegate delegate)
@@ -270,6 +273,8 @@ public class Connection extends Connecti
                 close();
                 throw new ConnectionException("connect() timed out");
             case OPEN:
+            case RESUMING:
+                connectionLost.set(false);
                 break;
             case CLOSED:
                 throw new ConnectionException("connect() aborted");
@@ -475,11 +480,13 @@ public class Connection extends Connecti
                     ssn.resume();
                 }
             }
+            setState(OPEN);
         }
     }
 
     public void exception(ConnectionException e)
     {
+        connectionLost.set(true);
         synchronized (lock)
         {
             switch (state)
@@ -682,5 +689,10 @@ public class Connection extends Connecti
     {
         return securityLayer;
     }
+    
+    public boolean isConnectionResuming()
+    {
+        return connectionLost.get();
+    }
 
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org