You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/11/14 21:19:43 UTC

svn commit: r1409375 - in /activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch: api/AmqpConnection.java impl/AmqpTransport.java

Author: chirino
Date: Wed Nov 14 20:19:43 2012
New Revision: 1409375

URL: http://svn.apache.org/viewvc?rev=1409375&view=rev
Log:
Fixing up amqp transport client disconnects.

Modified:
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnection.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnection.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnection.java?rev=1409375&r1=1409374&r2=1409375&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnection.java (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnection.java Wed Nov 14 20:19:43 2012
@@ -21,6 +21,7 @@ import org.apache.activemq.apollo.amqp.h
 import org.apache.activemq.apollo.amqp.hawtdispatch.impl.AmqpTransport;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointError;
 import org.apache.qpid.proton.engine.impl.ConnectionImpl;
 import org.apache.qpid.proton.engine.impl.ProtocolTracer;
 import org.apache.qpid.proton.engine.impl.SessionImpl;
@@ -176,4 +177,23 @@ public class AmqpConnection extends Amqp
     public ProtocolTracer getProtocolTracer() {
         return transport.getProtocolTracer();
     }
+
+    /**
+     * Once the remote end, closes the transport is disconnected.
+     */
+    @Override
+    public void close() {
+        super.close();
+        onRemoteClose(new Callback<EndpointError>() {
+            @Override
+            public void onSuccess(EndpointError value) {
+                disconnect();
+            }
+
+            @Override
+            public void onFailure(Throwable value) {
+                disconnect();
+            }
+        });
+    }
 }

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java?rev=1409375&r1=1409374&r2=1409375&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java Wed Nov 14 20:19:43 2012
@@ -505,6 +505,7 @@ public class AmqpTransport extends Watch
                         state = DISCONNECTED;
                         hawtdispatchTransport = null;
                         protonTransport = null;
+                        fireWatches();
                     }
                 });
             }
@@ -540,7 +541,7 @@ public class AmqpTransport extends Watch
         addWatch(new Watch() {
             @Override
             public boolean execute() {
-                if( state!=DISCONNECTED ) {
+                if( state==DISCONNECTED ) {
                     cb.onSuccess(null);
                     return true;
                 }