You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/10/27 20:40:55 UTC
svn commit: r468490 - in
/incubator/activemq/sandbox/qpid/src/main/java/org/apache:
activemq/amqp/broker/ activemq/amqp/command/ activemq/amqp/transport/
qpid/server/protocol/
Author: chirino
Date: Fri Oct 27 11:40:55 2006
New Revision: 468490
URL: http://svn.apache.org/viewvc?view=rev&rev=468490
Log:
Better error handling and connection shutdown processing.
Modified:
incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/broker/AmqpConnection.java
incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/broker/AmqpSession.java
incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/command/CompositeAMQDataBlock.java
incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/transport/AmqpTransportFactory.java
incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
Modified: incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/broker/AmqpConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/broker/AmqpConnection.java?view=diff&rev=468490&r1=468489&r2=468490
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/broker/AmqpConnection.java (original)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/broker/AmqpConnection.java Fri Oct 27 11:40:55 2006
@@ -24,15 +24,11 @@
import org.apache.activemq.Service;
import org.apache.activemq.amqp.command.AMQDataBlock;
-import org.apache.activemq.amqp.command.ConnectionCloseBody;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.Transport;
-import org.apache.activemq.util.ServiceSupport;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
/**
* @version $Revision: 1.26 $
@@ -40,9 +36,6 @@
@SuppressWarnings("unchecked")
public class AmqpConnection implements Service {
- private static final Log transportLog = LogFactory.getLog(AmqpConnection.class.getName() + ".Transport");
- private static final Log serviceLog = LogFactory.getLog(AmqpConnection.class.getName() + ".Service");
-
// What this connection is attached to.
protected final AmqpTransportConnector connector;
private final Transport transport;
@@ -50,10 +43,6 @@
// Handles ansync dispatching..
protected final TaskRunner taskRunner;
protected final List dispatchQueue = Collections.synchronizedList(new LinkedList());
-
- // Exception handling.
- private boolean inServiceException=false;
- protected IOException transportException;
protected boolean disposed=false;
private String remoteAddress;
@@ -77,14 +66,10 @@
this.transport.setTransportListener(new DefaultTransportListener() {
public void onCommand(Object o) {
- try {
- session.process((AMQDataBlock) o);
- } catch (Exception e) {
- AmqpConnection.this.onException(e);
- }
+ session.process((AMQDataBlock) o);
}
public void onException(IOException exception) {
- onTransportException(exception);
+ session.onTransportException(exception);
}
});
@@ -95,7 +80,11 @@
return false;
} else {
AMQDataBlock command = (AMQDataBlock) dispatchQueue.remove(0);
- oneway(command);
+ try {
+ oneway(command);
+ } catch (IOException e) {
+ session.onTransportException(e);
+ }
return true;
}
}
@@ -123,50 +112,12 @@
if( taskRunner!=null )
taskRunner.shutdown();
}
-
- public void onTransportException(IOException e) {
- if( !disposed ) {
- transportException = e;
- if( transportLog.isDebugEnabled() )
- transportLog.debug("Transport failed: "+e,e);
- ServiceSupport.dispose(this);
- }
- }
-
- public void onException(Throwable e) {
-
- // are we a transport exception such as not being able to dispatch
- // synchronously to a transport
- if (e instanceof IOException) {
- onTransportException((IOException) e);
- }
-
- else if( !disposed && !inServiceException ) {
- inServiceException = true;
- try {
- if( serviceLog.isDebugEnabled() )
- serviceLog.debug("Async error occurred: "+e,e);
-
- ConnectionCloseBody cc = new ConnectionCloseBody();
- cc.setReplyCode(200);
- cc.setReplyText(e.getMessage());
- onewayAsync(cc);
-
- } finally {
- inServiceException = false;
- }
- }
- }
-
- protected void oneway(AMQDataBlock command) {
- try {
- transport.oneway(command);
- } catch (IOException e) {
- onException(e);
- }
+
+ protected void oneway(AMQDataBlock command) throws IOException {
+ transport.oneway(command);
}
- public void onewayAsync(AMQDataBlock frame) {
+ public void onewayAsync(AMQDataBlock frame) throws IOException {
if( taskRunner==null ) {
oneway( frame );
} else {
Modified: incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/broker/AmqpSession.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/broker/AmqpSession.java?view=diff&rev=468490&r1=468489&r2=468490
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/broker/AmqpSession.java (original)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/broker/AmqpSession.java Fri Oct 27 11:40:55 2006
@@ -1,5 +1,6 @@
package org.apache.activemq.amqp.broker;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -177,6 +178,7 @@
throw e;
}
}
+ public String toString() { return "NOT STARTED";}
};
/**
@@ -233,7 +235,7 @@
}
}
-
+ public String toString() { return "NOT AUTHENTICATED";}
};
/**
@@ -246,6 +248,7 @@
startHeartbeats(body.heartbeat);
return null;
}
+ public String toString() { return "NOT TUNED";}
};
/**
@@ -269,6 +272,7 @@
frameHandler = CONNECTION_OPEN_HANDLER;
return response;
}
+ public String toString() { return "NOT OPENED";}
};
/**
@@ -333,11 +337,10 @@
@Override
public Object processConnectionCloseBody(ConnectionCloseBody body) throws Exception {
log.info("ConnectionClose received with reply code/reply text " + body.getReplyCode() + "/" + body.getReplyText());
+
+ frameHandler = CONNECTION_CLOSING_HANDLER;
ConnectionCloseOkBody response = new ConnectionCloseOkBody();
response.setChannel(body.getChannel());
- frameHandler = CONNECTION_CLOSING_HANDLER;
-
- // TODO: may need to shut down connection here.
return response;
}
@@ -658,6 +661,7 @@
return onChannelError(body, AMQConstant.INTERNAL_ERROR, "Failed to commit: " + e.getMessage());
}
}
+ public String toString() { return "OPEN";}
};
@@ -671,12 +675,15 @@
connection.stop();
return null;
}
+ public String toString() { return "CLOSING";}
};
/**
* Handles commands that can be issued when the connection is closing
*/
- private final static Visitor CONNECTION_CLOSED_HANDLER = new Visitor();
+ private final static Visitor CONNECTION_CLOSED_HANDLER = new Visitor() {
+ public String toString() { return "CLOSED";}
+ };
/**
* This frame handler changes when the connection state changes.
@@ -688,7 +695,11 @@
}
public void writeFrame(AMQDataBlock frame) {
- connection.onewayAsync(frame);
+ try {
+ connection.onewayAsync(frame);
+ } catch (IOException e) {
+ onTransportException(e);
+ }
}
public void process(AMQDataBlock frame) {
@@ -697,6 +708,20 @@
if (response != null) {
writeFrame(response);
}
+ } catch (AMQException e) {
+ frameHandler = CONNECTION_CLOSING_HANDLER;
+ ConnectionCloseBody close = new ConnectionCloseBody();
+ close.setReplyCode(e.getErrorCode());
+ close.setReplyText(e.getMessage());
+ if( frame instanceof MethodBody ) {
+ MethodBody mb = (MethodBody) frame;
+ close.setClassId(mb.getClazz());
+ close.setMethodId(mb.getMethod());
+ } else {
+ close.setClassId(0);
+ close.setMethodId(0);
+ }
+ writeFrame(close);
} catch (Exception e) {
frameHandler = CONNECTION_CLOSING_HANDLER;
ConnectionCloseBody close = new ConnectionCloseBody();
@@ -710,8 +735,24 @@
close.setClassId(0);
close.setMethodId(0);
}
- writeFrame(close);
+ writeFrame(close);
+ }
+ }
+
+ public void onTransportException(IOException exception) {
+
+ if( frameHandler!=CONNECTION_CLOSING_HANDLER && frameHandler!=CONNECTION_CLOSED_HANDLER ) {
+ log.info("The "+getKey()+" client's "+frameHandler+" connection failed.");
+ if( log.isDebugEnabled() ) {
+ log.debug("Connection failure details: "+exception, exception);
+ }
+ }
+
+ try {
+ connection.stop();
+ } catch (Exception ignore) {
}
+
}
public String getKey() {
@@ -824,5 +865,6 @@
return close;
}
+
}
Modified: incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/command/CompositeAMQDataBlock.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/command/CompositeAMQDataBlock.java?view=diff&rev=468490&r1=468489&r2=468490
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/command/CompositeAMQDataBlock.java (original)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/command/CompositeAMQDataBlock.java Fri Oct 27 11:40:55 2006
@@ -47,4 +47,18 @@
throw new RuntimeException("Not implemented.");
}
+ @Override
+ public String toString() {
+ StringBuffer buf = new StringBuffer(getClass().toString());
+ buf.append("{");
+ if(_blocks!=null) {
+ for (int i = 0; i < _blocks.length; i++) {
+ if( i!=0 )
+ buf.append(", ");
+ buf.append(_blocks[i]);
+ }
+ }
+ buf.append("}");
+ return buf.toString();
+ }
}
Modified: incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/transport/AmqpTransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/transport/AmqpTransportFactory.java?view=diff&rev=468490&r1=468489&r2=468490
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/transport/AmqpTransportFactory.java (original)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/transport/AmqpTransportFactory.java Fri Oct 27 11:40:55 2006
@@ -55,15 +55,16 @@
public Transport serverConfigure(Transport transport, WireFormat wf, HashMap options) throws Exception {
((AmqpWireFormat)wf).setSendProtocolInitiation(false);
- ((AmqpWireFormat)wf).setWaitForProtocolInitiation(true);
+ ((AmqpWireFormat)wf).setWaitForProtocolInitiation(true);
+// options.put("trace", "true");
transport = compositeConfigure(transport, wf, options);
- transport = new AmqpServerTransportFilter(transport);
transport = new MutexTransport(transport);
return transport;
}
protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
- return super.createTcpTransport(wf, socketFactory, location, localLocation);
+ TcpTransport transport = super.createTcpTransport(wf, socketFactory, location, localLocation);
+ return transport;
}
}
Modified: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=468490&r1=468489&r2=468490
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Fri Oct 27 11:40:55 2006
@@ -1,5 +1,7 @@
package org.apache.qpid.server.protocol;
+import java.io.IOException;
+
import org.apache.activemq.amqp.command.AMQDataBlock;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.AMQChannel;
@@ -9,6 +11,7 @@
/**
* Write a datablock, encoding where necessary (e.g. into a sequence of bytes)
* @param frame the frame to be encoded and written
+ * @throws IOException
*/
void writeFrame(AMQDataBlock frame);