You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/11/14 21:19:43 UTC
svn commit: r1409375 - in
/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch:
api/AmqpConnection.java impl/AmqpTransport.java
Author: chirino
Date: Wed Nov 14 20:19:43 2012
New Revision: 1409375
URL: http://svn.apache.org/viewvc?rev=1409375&view=rev
Log:
Fixing up amqp transport client disconnects.
Modified:
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnection.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnection.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnection.java?rev=1409375&r1=1409374&r2=1409375&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnection.java (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnection.java Wed Nov 14 20:19:43 2012
@@ -21,6 +21,7 @@ import org.apache.activemq.apollo.amqp.h
import org.apache.activemq.apollo.amqp.hawtdispatch.impl.AmqpTransport;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointError;
import org.apache.qpid.proton.engine.impl.ConnectionImpl;
import org.apache.qpid.proton.engine.impl.ProtocolTracer;
import org.apache.qpid.proton.engine.impl.SessionImpl;
@@ -176,4 +177,23 @@ public class AmqpConnection extends Amqp
public ProtocolTracer getProtocolTracer() {
return transport.getProtocolTracer();
}
+
+ /**
+ * Once the remote end, closes the transport is disconnected.
+ */
+ @Override
+ public void close() {
+ super.close();
+ onRemoteClose(new Callback<EndpointError>() {
+ @Override
+ public void onSuccess(EndpointError value) {
+ disconnect();
+ }
+
+ @Override
+ public void onFailure(Throwable value) {
+ disconnect();
+ }
+ });
+ }
}
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java?rev=1409375&r1=1409374&r2=1409375&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java Wed Nov 14 20:19:43 2012
@@ -505,6 +505,7 @@ public class AmqpTransport extends Watch
state = DISCONNECTED;
hawtdispatchTransport = null;
protonTransport = null;
+ fireWatches();
}
});
}
@@ -540,7 +541,7 @@ public class AmqpTransport extends Watch
addWatch(new Watch() {
@Override
public boolean execute() {
- if( state!=DISCONNECTED ) {
+ if( state==DISCONNECTED ) {
cb.onSuccess(null);
return true;
}