You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/10/27 20:40:55 UTC

svn commit: r468490 - in /incubator/activemq/sandbox/qpid/src/main/java/org/apache: activemq/amqp/broker/ activemq/amqp/command/ activemq/amqp/transport/ qpid/server/protocol/

Author: chirino
Date: Fri Oct 27 11:40:55 2006
New Revision: 468490

URL: http://svn.apache.org/viewvc?view=rev&rev=468490
Log:
Better error handling and connection shutdown processing.

Modified:
    incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/broker/AmqpConnection.java
    incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/broker/AmqpSession.java
    incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/command/CompositeAMQDataBlock.java
    incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/transport/AmqpTransportFactory.java
    incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java

Modified: incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/broker/AmqpConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/broker/AmqpConnection.java?view=diff&rev=468490&r1=468489&r2=468490
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/broker/AmqpConnection.java (original)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/broker/AmqpConnection.java Fri Oct 27 11:40:55 2006
@@ -24,15 +24,11 @@
 
 import org.apache.activemq.Service;
 import org.apache.activemq.amqp.command.AMQDataBlock;
-import org.apache.activemq.amqp.command.ConnectionCloseBody;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.Transport;
-import org.apache.activemq.util.ServiceSupport;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 /**
  * @version $Revision: 1.26 $
@@ -40,9 +36,6 @@
 @SuppressWarnings("unchecked")
 public class AmqpConnection implements Service {
 
-    private static final Log transportLog = LogFactory.getLog(AmqpConnection.class.getName() + ".Transport");
-    private static final Log serviceLog = LogFactory.getLog(AmqpConnection.class.getName() + ".Service");
-    
     // What this connection is attached to.
     protected final AmqpTransportConnector connector;
     private final Transport transport;
@@ -50,10 +43,6 @@
     // Handles ansync dispatching..
     protected final TaskRunner taskRunner;
     protected final List dispatchQueue = Collections.synchronizedList(new LinkedList());
-
-    // Exception handling.
-    private boolean inServiceException=false;
-    protected IOException transportException;    
     
     protected boolean disposed=false;
 	private String remoteAddress;
@@ -77,14 +66,10 @@
         
         this.transport.setTransportListener(new DefaultTransportListener() {
             public void onCommand(Object o) {
-				try {
-					session.process((AMQDataBlock) o);
-				} catch (Exception e) {
-					AmqpConnection.this.onException(e);
-				}
+				session.process((AMQDataBlock) o);
             }
             public void onException(IOException exception) {
-                onTransportException(exception);
+                session.onTransportException(exception);
             }
         });
                 
@@ -95,7 +80,11 @@
 				            return false;
 				        } else {
 				            AMQDataBlock command = (AMQDataBlock) dispatchQueue.remove(0);
-				            oneway(command);
+				            try {
+								oneway(command);
+							} catch (IOException e) {
+								session.onTransportException(e);
+							}
 				            return true;
 				        }
 					}
@@ -123,50 +112,12 @@
         if( taskRunner!=null )
             taskRunner.shutdown();
     }
-    
-    public void onTransportException(IOException e) {
-        if( !disposed ) {
-            transportException = e;	
-            if( transportLog.isDebugEnabled() )
-                transportLog.debug("Transport failed: "+e,e);
-            ServiceSupport.dispose(this);
-        }
-    }
-        
-    public void onException(Throwable e) {
-    	
-        // are we a transport exception such as not being able to dispatch
-        // synchronously to a transport
-        if (e instanceof IOException) {
-            onTransportException((IOException) e);
-        }
-                
-        else if( !disposed && !inServiceException ) {
-            inServiceException = true;
-                try {
-                if( serviceLog.isDebugEnabled() )
-                	serviceLog.debug("Async error occurred: "+e,e);
-                
-                	ConnectionCloseBody cc = new ConnectionCloseBody();
-                	cc.setReplyCode(200);
-                	cc.setReplyText(e.getMessage());
-                	onewayAsync(cc);
-
-            } finally {
-                inServiceException = false;
-            }
-        } 
-    }
-        
-    protected void oneway(AMQDataBlock command) {
-        try {
-            transport.oneway(command);
-        } catch (IOException e) {
-            onException(e);
-        }
+                    
+    protected void oneway(AMQDataBlock command) throws IOException {
+        transport.oneway(command);
     }
 
-    public void onewayAsync(AMQDataBlock frame) {
+    public void onewayAsync(AMQDataBlock frame) throws IOException {
         if( taskRunner==null ) {
             oneway( frame );
         } else {

Modified: incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/broker/AmqpSession.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/broker/AmqpSession.java?view=diff&rev=468490&r1=468489&r2=468490
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/broker/AmqpSession.java (original)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/broker/AmqpSession.java Fri Oct 27 11:40:55 2006
@@ -1,5 +1,6 @@
 package org.apache.activemq.amqp.broker;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -177,6 +178,7 @@
 				throw e;
 			}
 		}
+		public String toString() { return "NOT STARTED";}
 	};
 
 	/**
@@ -233,7 +235,7 @@
 
 			}
 		}
-
+		public String toString() { return "NOT AUTHENTICATED";}
 	};
 
 	/**
@@ -246,6 +248,7 @@
 			startHeartbeats(body.heartbeat);
 			return null;
 		}
+		public String toString() { return "NOT TUNED";}
 	};
 
 	/**
@@ -269,6 +272,7 @@
 			frameHandler = CONNECTION_OPEN_HANDLER;
 			return response;
 		}
+		public String toString() { return "NOT OPENED";}
 	};
 
 	/**
@@ -333,11 +337,10 @@
 		@Override
 		public Object processConnectionCloseBody(ConnectionCloseBody body) throws Exception {
 			log.info("ConnectionClose received with reply code/reply text " + body.getReplyCode() + "/" + body.getReplyText());
+			
+			frameHandler = CONNECTION_CLOSING_HANDLER;
 			ConnectionCloseOkBody response = new ConnectionCloseOkBody();
 			response.setChannel(body.getChannel());
-			frameHandler = CONNECTION_CLOSING_HANDLER;
-
-			// TODO: may need to shut down connection here.
 			return response;
 		}
 
@@ -658,6 +661,7 @@
 				return onChannelError(body, AMQConstant.INTERNAL_ERROR, "Failed to commit: " + e.getMessage());
 			}
 		}
+		public String toString() { return "OPEN";}
 
 	};
 
@@ -671,12 +675,15 @@
 			connection.stop();
 			return null;
 		}
+		public String toString() { return "CLOSING";}
 	};
 
 	/**
 	 * Handles commands that can be issued when the connection is closing
 	 */
-	private final static Visitor CONNECTION_CLOSED_HANDLER = new Visitor();
+	private final static Visitor CONNECTION_CLOSED_HANDLER = new Visitor() {
+		public String toString() { return "CLOSED";}
+	};
 
 	/**
 	 * This frame handler changes when the connection state changes.
@@ -688,7 +695,11 @@
 	}
 
 	public void writeFrame(AMQDataBlock frame) {
-		connection.onewayAsync(frame);
+		try {
+			connection.onewayAsync(frame);
+		} catch (IOException e) {
+			onTransportException(e);
+		}
 	}
 
 	public void process(AMQDataBlock frame) {
@@ -697,6 +708,20 @@
             if (response != null) {
                 writeFrame(response);
             }
+		} catch (AMQException e) {
+			frameHandler = CONNECTION_CLOSING_HANDLER;
+			ConnectionCloseBody close = new ConnectionCloseBody();
+			close.setReplyCode(e.getErrorCode());
+			close.setReplyText(e.getMessage());
+			if( frame instanceof MethodBody ) {
+				MethodBody mb = (MethodBody) frame;
+				close.setClassId(mb.getClazz());
+				close.setMethodId(mb.getMethod());
+			} else {
+				close.setClassId(0);
+				close.setMethodId(0);
+			}
+			writeFrame(close);
 		} catch (Exception e) {
 			frameHandler = CONNECTION_CLOSING_HANDLER;
 			ConnectionCloseBody close = new ConnectionCloseBody();
@@ -710,8 +735,24 @@
 				close.setClassId(0);
 				close.setMethodId(0);
 			}
-            writeFrame(close);
+			writeFrame(close);
+		}
+	}
+	
+	public void onTransportException(IOException exception) {
+		
+		if( frameHandler!=CONNECTION_CLOSING_HANDLER && frameHandler!=CONNECTION_CLOSED_HANDLER ) {
+			log.info("The "+getKey()+" client's "+frameHandler+" connection failed.");
+			if( log.isDebugEnabled() ) {
+				log.debug("Connection failure details: "+exception, exception);
+			}
+		}
+		
+		try {
+			connection.stop();
+		} catch (Exception ignore) {
 		}
+		
 	}
 
 	public String getKey() {
@@ -824,5 +865,6 @@
 
 		return close;
 	}
+
 
 }

Modified: incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/command/CompositeAMQDataBlock.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/command/CompositeAMQDataBlock.java?view=diff&rev=468490&r1=468489&r2=468490
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/command/CompositeAMQDataBlock.java (original)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/command/CompositeAMQDataBlock.java Fri Oct 27 11:40:55 2006
@@ -47,4 +47,18 @@
 		throw new RuntimeException("Not implemented.");
 	}
 
+	@Override
+	public String toString() {
+        StringBuffer buf = new StringBuffer(getClass().toString());
+        buf.append("{");
+        if(_blocks!=null) {
+        	for (int i = 0; i < _blocks.length; i++) {
+        		if( i!=0 )
+    				buf.append(", ");				
+				buf.append(_blocks[i]);				
+			}
+        }
+        buf.append("}");
+        return buf.toString();
+	}
 }

Modified: incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/transport/AmqpTransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/transport/AmqpTransportFactory.java?view=diff&rev=468490&r1=468489&r2=468490
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/transport/AmqpTransportFactory.java (original)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/transport/AmqpTransportFactory.java Fri Oct 27 11:40:55 2006
@@ -55,15 +55,16 @@
     
     public Transport serverConfigure(Transport transport, WireFormat wf, HashMap options) throws Exception {
     	((AmqpWireFormat)wf).setSendProtocolInitiation(false);
-    	((AmqpWireFormat)wf).setWaitForProtocolInitiation(true);    	
+    	((AmqpWireFormat)wf).setWaitForProtocolInitiation(true);  
+//    	options.put("trace", "true");
     	transport = compositeConfigure(transport, wf, options);    	
-    	transport = new AmqpServerTransportFilter(transport);
         transport = new MutexTransport(transport);
         return transport;
     }
 
     protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
-    	return super.createTcpTransport(wf, socketFactory, location, localLocation);
+    	TcpTransport transport = super.createTcpTransport(wf, socketFactory, location, localLocation);
+    	return transport;
     }
 
 }

Modified: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=468490&r1=468489&r2=468490
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Fri Oct 27 11:40:55 2006
@@ -1,5 +1,7 @@
 package org.apache.qpid.server.protocol;
 
+import java.io.IOException;
+
 import org.apache.activemq.amqp.command.AMQDataBlock;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.AMQChannel;
@@ -9,6 +11,7 @@
     /**
      * Write a datablock, encoding where necessary (e.g. into a sequence of bytes)
      * @param frame the frame to be encoded and written
+     * @throws IOException 
      */
 	void writeFrame(AMQDataBlock frame);