You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/03/21 17:12:33 UTC

svn commit: r387566 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/command/ main/java/org/apache/activemq/openwire/ main/java/org/apache/activemq/transport/ main/java/org/apache/activemq/transport/activeio/ main/java/or...

Author: chirino
Date: Tue Mar 21 08:12:31 2006
New Revision: 387566

URL: http://svn.apache.org/viewcvs?rev=387566&view=rev
Log:
http://jira.activemq.org/jira/browse/AMQ-643

The maxInactivityDuration is now negociated using the WireFormatInfo.  This makes it easier to configure connections since client and server configs do not HAVE to match up excactly.

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java?rev=387566&r1=387565&r2=387566&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java Tue Mar 21 08:12:31 2006
@@ -240,6 +240,17 @@
     public void setTightEncodingEnabled(boolean tightEncodingEnabled) throws IOException {
         setProperty("TightEncodingEnabled", tightEncodingEnabled ? Boolean.TRUE : Boolean.FALSE);
     }
+    
+    /**
+     * @throws IOException 
+     */
+    public long getMaxInactivityDuration() throws IOException {
+        Long l = (Long) getProperty("MaxInactivityDuration");
+        return l == null ? 0 : l.longValue();
+    }
+    public void seMaxInactivityDuration(long maxInactivityDuration) throws IOException {
+        setProperty("MaxInactivityDuration", new Long(maxInactivityDuration));
+    }
 
     public Response visit(CommandVisitor visitor) throws Exception {
         return visitor.processWireFormat(this);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java?rev=387566&r1=387565&r2=387566&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java Tue Mar 21 08:12:31 2006
@@ -35,7 +35,8 @@
     private boolean cacheEnabled=true;
     private boolean tightEncodingEnabled=true;
     private boolean sizePrefixDisabled=false;
-
+    private long maxInactivityDuration=30*1000;
+    
     public WireFormat createWireFormat() {
 		WireFormatInfo info = new WireFormatInfo();
 		info.setVersion(version);
@@ -46,6 +47,7 @@
 			info.setTcpNoDelayEnabled(tcpNoDelayEnabled);
 			info.setTightEncodingEnabled(tightEncodingEnabled);
 			info.setSizePrefixDisabled(sizePrefixDisabled);
+            info.seMaxInactivityDuration(maxInactivityDuration);
 		} catch (Exception e) {
 			IllegalStateException ise = new IllegalStateException("Could not configure WireFormatInfo");
             ise.initCause(e);
@@ -104,4 +106,12 @@
 	public void setSizePrefixDisabled(boolean sizePrefixDisabled) {
 		this.sizePrefixDisabled = sizePrefixDisabled;
 	}
+
+    public long getMaxInactivityDuration() {
+        return maxInactivityDuration;
+    }
+
+    public void setMaxInactivityDuration(long maxInactivityDuration) {
+        this.maxInactivityDuration = maxInactivityDuration;
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?rev=387566&r1=387565&r2=387566&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java Tue Mar 21 08:12:31 2006
@@ -20,6 +20,7 @@
 
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.KeepAliveInfo;
+import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,12 +32,13 @@
  * 
  * @version $Revision$
  */
-public class InactivityMonitor extends TransportFilter implements Runnable {
+public class InactivityMonitor extends TransportFilter {
 
     private final Log log = LogFactory.getLog(InactivityMonitor.class);
     
-    private final long maxInactivityDuration;
-    private byte readCheckIteration=0;
+    private WireFormatInfo localWireFormatInfo;
+    private WireFormatInfo remoteWireFormatInfo;
+    private boolean monitorStarted=false;
 
     private final AtomicBoolean commandSent=new AtomicBoolean(false);
     private final AtomicBoolean inSend=new AtomicBoolean(false);
@@ -44,35 +46,29 @@
     private final AtomicBoolean commandReceived=new AtomicBoolean(true);
     private final AtomicBoolean inReceive=new AtomicBoolean(false);
     
-    public InactivityMonitor(Transport next, long maxInactivityDuration) {
+    private final Runnable readChecker = new Runnable() {
+        public void run() {
+            readCheck();
+        }
+    };
+    
+    private final Runnable writeChecker = new Runnable() {
+        public void run() {
+            writeCheck();
+        }
+    };
+    
+    
+    public InactivityMonitor(Transport next) {
         super(next);
-        this.maxInactivityDuration = maxInactivityDuration;
-    }
-        
-    public void start() throws Exception {
-        next.start();
-        Scheduler.executePeriodically(this, maxInactivityDuration/2);
     }
-    
+
     public void stop() throws Exception {
-        Scheduler.cancel(this);
+        stopMonitorThreads();
         next.stop();
     }
-    
-    synchronized public void run() {
-        switch(readCheckIteration) {
-        case 0:
-            writeCheck();
-            readCheckIteration++;
-            break;
-        case 1:
-            readCheck();
-            writeCheck();
-            readCheckIteration=0;
-            break;
-        }        
-    }
-    
+
+        
     private void writeCheck() {
         if( inSend.get() ) {
             log.debug("A send is in progress");
@@ -82,7 +78,7 @@
         if( !commandSent.get() ) {
             log.debug("No message sent since last write check, sending a KeepAliveInfo");
             try {
-                next.oneway(new KeepAliveInfo());
+                next.oneway(new KeepAliveInfo());                
             } catch (IOException e) {
                 onException(e);
             }
@@ -113,18 +109,35 @@
     public void onCommand(Command command) {
         inReceive.set(true);
         try {
+            if( command.isWireFormatInfo() ) {
+                synchronized( this ) {
+                    remoteWireFormatInfo = (WireFormatInfo) command;
+                    try {
+                        startMonitorThreads();
+                    } catch (IOException e) {
+                        onException(e);
+                    }
+                }
+            }
             getTransportListener().onCommand(command);
         } finally {
             inReceive.set(false);
             commandReceived.set(true);
         }
     }
+
     
     public void oneway(Command command) throws IOException {
         // Disable inactivity monitoring while processing a command.
         inSend.set(true);
         commandSent.set(true);
         try {
+            if( command.isWireFormatInfo() ) {
+                synchronized( this ) {
+                    localWireFormatInfo = (WireFormatInfo) command;
+                    startMonitorThreads();
+                }
+            }
             next.oneway(command);
         } finally {
             inSend.set(false);
@@ -132,7 +145,37 @@
     }
     
     public void onException(IOException error) {
-        Scheduler.cancel(this);
+        stopMonitorThreads();
         getTransportListener().onException(error);
     }
+    
+    
+    synchronized private void startMonitorThreads() throws IOException {
+        if( monitorStarted ) 
+            return;
+        if( localWireFormatInfo == null )
+            return;
+        if( remoteWireFormatInfo == null )
+            return;
+        
+        long l = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
+        if( l > 0 ) {
+            Scheduler.executePeriodically(writeChecker, l/2);
+            Scheduler.executePeriodically(readChecker, l);
+            monitorStarted=true;        
+        }
+    }
+    
+    /**
+     * 
+     */
+    synchronized private void stopMonitorThreads() {
+        if( monitorStarted ) {
+            Scheduler.cancel(readChecker);
+            Scheduler.cancel(writeChecker);
+            monitorStarted=false;
+        }
+    }
+    
+
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportFactory.java?rev=387566&r1=387565&r2=387566&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransportFactory.java Tue Mar 21 08:12:31 2006
@@ -230,11 +230,13 @@
         if( activeIOTransport.isTrace() ) {
             transport = new TransportLogger(transport);
         }
+        
+        transport = new InactivityMonitor(transport);
+
         if( format instanceof OpenWireFormat ) {
         	transport = new WireFormatNegotiator(transport, (OpenWireFormat) format, activeIOTransport.getMinmumWireFormatVersion());
         }
         
-        transport = new InactivityMonitor(transport, activeIOTransport.getMaxInactivityDuration());
         transport = new MutexTransport(transport);
         transport = new ResponseCorrelator(transport);
         return transport;
@@ -279,10 +281,12 @@
         if( activeIOTransport.isTrace() ) {
             transport = new TransportLogger(transport);
         }
+        
+        transport = new InactivityMonitor(transport);
+        
         if( format instanceof OpenWireFormat ) {
         	transport = new WireFormatNegotiator(transport, (OpenWireFormat) format, activeIOTransport.getMinmumWireFormatVersion());
         }
-        transport = new InactivityMonitor(transport, activeIOTransport.getMaxInactivityDuration());
         return transport;        
     }
     

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=387566&r1=387565&r2=387566&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Tue Mar 21 08:12:31 2006
@@ -56,7 +56,6 @@
     private boolean trace;
     private boolean useLocalHost = true;
     private int minmumWireFormatVersion;
-    private long maxInactivityDuration = 0; //30000;
     private InetSocketAddress socketAddress;
 
     
@@ -204,17 +203,6 @@
      */
     public void setSoTimeout(int soTimeout) {
         this.soTimeout = soTimeout;
-    }
-
-    public long getMaxInactivityDuration() {
-        return maxInactivityDuration;
-    }
-
-    /**
-     * Sets the maximum inactivity duration
-     */
-    public void setMaxInactivityDuration(long maxInactivityDuration) {
-        this.maxInactivityDuration = maxInactivityDuration;
     }
 
     public int getConnectionTimeout() {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?rev=387566&r1=387565&r2=387566&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java Tue Mar 21 08:12:31 2006
@@ -64,15 +64,13 @@
             transport = new TransportLogger(transport);
         }
 
+        transport = new InactivityMonitor(transport);
+
         // Only need the OpenWireFormat if using openwire
         if( format instanceof OpenWireFormat ) {
         	transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion());
         }
         
-        if( tcpTransport.getMaxInactivityDuration() > 0 ) {
-            transport = new InactivityMonitor(transport, tcpTransport.getMaxInactivityDuration());
-        }
-        
         transport = new MutexTransport(transport);
         transport = new ResponseCorrelator(transport);
         return transport;
@@ -85,14 +83,13 @@
             transport = new TransportLogger(transport);
         }
 
+        transport = new InactivityMonitor(transport);
+
         // Only need the OpenWireFormat if using openwire
         if( format instanceof OpenWireFormat ) {
         	transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion());
         }
         
-        if( tcpTransport.getMaxInactivityDuration() > 0 ) {
-            transport = new InactivityMonitor(transport, tcpTransport.getMaxInactivityDuration());
-        }
         return transport;
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?rev=387566&r1=387565&r2=387566&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Tue Mar 21 08:12:31 2006
@@ -48,7 +48,7 @@
     private int backlog = 5000;
     private WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
     private TcpTransportFactory transportFactory = new TcpTransportFactory();
-    private long maxInactivityDuration = 0; //30000;
+    private long maxInactivityDuration = 30000;
     private int minmumWireFormatVersion;
     private boolean trace;
     

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java?rev=387566&r1=387565&r2=387566&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java Tue Mar 21 08:12:31 2006
@@ -57,7 +57,6 @@
     private ReplayStrategy replayStrategy = new ExceptionIfDroppedReplayStrategy();
     private ReplayBuffer replayBuffer;
     private int datagramSize = 4 * 1024;
-    private long maxInactivityDuration = 0; // 30000;
     private SocketAddress targetAddress;
     private SocketAddress originalTargetAddress;
     private DatagramChannel channel;
@@ -223,23 +222,12 @@
         this.trace = trace;
     }
 
-    public long getMaxInactivityDuration() {
-        return maxInactivityDuration;
-    }
-
     public int getDatagramSize() {
         return datagramSize;
     }
 
     public void setDatagramSize(int datagramSize) {
         this.datagramSize = datagramSize;
-    }
-
-    /**
-     * Sets the maximum inactivity duration
-     */
-    public void setMaxInactivityDuration(long maxInactivityDuration) {
-        this.maxInactivityDuration = maxInactivityDuration;
     }
 
     public boolean isUseLocalHost() {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java?rev=387566&r1=387565&r2=387566&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java Tue Mar 21 08:12:31 2006
@@ -81,14 +81,12 @@
             transport = new TransportLogger(transport);
         }
 
+        transport = new InactivityMonitor(transport);
+
         if (format instanceof OpenWireFormat) {
             transport = configureClientSideNegotiator(transport, format, udpTransport);
         }
 
-        if (udpTransport.getMaxInactivityDuration() > 0) {
-            transport = new InactivityMonitor(transport, udpTransport.getMaxInactivityDuration());
-        }
-
         return transport;
     }
 
@@ -115,12 +113,10 @@
             transport = new TransportLogger(transport);
         }
 
+        transport = new InactivityMonitor(transport);
+
         if (!acceptServer && format instanceof OpenWireFormat) {
             transport = configureClientSideNegotiator(transport, format, udpTransport);
-        }
-
-        if (udpTransport.getMaxInactivityDuration() > 0) {
-            transport = new InactivityMonitor(transport, udpTransport.getMaxInactivityDuration());
         }
 
         // deal with fragmentation

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java?rev=387566&r1=387565&r2=387566&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java Tue Mar 21 08:12:31 2006
@@ -134,10 +134,7 @@
     }
 
     protected Transport configureTransport(Transport transport) {
-        if (serverTransport.getMaxInactivityDuration() > 0) {
-            transport = new InactivityMonitor(transport, serverTransport.getMaxInactivityDuration());
-        }
-
+        transport = new InactivityMonitor(transport);
         getAcceptListener().onAccept(transport);
         return transport;
     }

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java?rev=387566&r1=387565&r2=387566&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java Tue Mar 21 08:12:31 2006
@@ -1,10 +1,28 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.activemq.transport.tcp;
 
 import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
 
 import org.apache.activemq.CombinationTestSupport;
 import org.apache.activemq.command.Command;
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportAcceptListener;
 import org.apache.activemq.transport.TransportFactory;
@@ -32,16 +50,17 @@
     public Runnable serverRunOnCommand;
     public Runnable clientRunOnCommand;
     
-    public long clientInactivityLimit;
-    public long serverInactivityLimit;
-
-    
     protected void setUp() throws Exception {
         super.setUp();
-        server = TransportFactory.bind("localhost", new URI("tcp://localhost:61616?trace=true&maxInactivityDuration="+serverInactivityLimit));
-        server.setAcceptListener(this);
-        server.start();
-        clientTransport = TransportFactory.connect(new URI("tcp://localhost:61616?trace=true&maxInactivityDuration="+clientInactivityLimit));
+        startTransportServer();
+    }
+
+    /**
+     * @throws Exception
+     * @throws URISyntaxException
+     */
+    private void startClient() throws Exception, URISyntaxException {
+        clientTransport = TransportFactory.connect(new URI("tcp://localhost:61616?trace=true&wireFormat.maxInactivityDuration=1000"));
         clientTransport.setTransportListener(new TransportListener() {
             public void onCommand(Command command) {
                 clientReceiveCount.incrementAndGet();
@@ -62,18 +81,37 @@
             }});
         clientTransport.start();
     }
+
+    /**
+     * @throws IOException
+     * @throws URISyntaxException
+     * @throws Exception
+     */
+    private void startTransportServer() throws IOException, URISyntaxException, Exception {
+        server = TransportFactory.bind("localhost", new URI("tcp://localhost:61616?trace=true&wireFormat.maxInactivityDuration=1000"));
+        server.setAcceptListener(this);
+        server.start();
+    }
     
     protected void tearDown() throws Exception {
         ignoreClientError.set(true);
         ignoreServerError.set(true);
-        clientTransport.stop();
-        serverTransport.stop();
-        server.stop();
+        try {
+            if( clientTransport!=null )
+                clientTransport.stop();
+            if( serverTransport!=null )
+                serverTransport.stop();
+            if( server!=null )
+                server.stop();
+        } catch (Throwable e) {
+            e.printStackTrace();
+        }
         super.tearDown();
     }
     
     public void onAccept(Transport transport) {
         try {
+            System.out.println("["+getName()+"] Server Accepted a Connection");
             serverTransport = transport;
             serverTransport.setTransportListener(new TransportListener() {
                 public void onCommand(Command command) {
@@ -103,12 +141,35 @@
         error.printStackTrace();
     }
 
-    public void initCombosForTestClientHang() {
-        addCombinationValues("clientInactivityLimit", new Object[] { new Long(1000*60)});
-        addCombinationValues("serverInactivityLimit", new Object[] { new Long(1000)});
-    }
     public void testClientHang() throws Exception {
         
+        // 
+        // Manually create a client transport so that it does not send KeepAlive packets.
+        // this should simulate a client hang.
+        clientTransport = new TcpTransport(new OpenWireFormat(), new URI("tcp://localhost:61616"));
+        clientTransport.setTransportListener(new TransportListener() {
+            public void onCommand(Command command) {
+                clientReceiveCount.incrementAndGet();
+                if( clientRunOnCommand !=null ) {
+                    clientRunOnCommand.run();
+                }
+            }
+            public void onException(IOException error) {
+                if( !ignoreClientError.get() ) {
+                    System.out.println("Client transport error:");
+                    error.printStackTrace();
+                    clientErrorCount.incrementAndGet();
+                }
+            }
+            public void transportInterupted() {
+            }
+            public void transportResumed() {
+            }});
+        clientTransport.start();
+        WireFormatInfo info = new WireFormatInfo();
+        info.seMaxInactivityDuration(1000);
+        clientTransport.oneway(info);
+        
         assertEquals(0, serverErrorCount.get());
         assertEquals(0, clientErrorCount.get());
         
@@ -119,42 +180,45 @@
         assertTrue(serverErrorCount.get()>0);
     }
     
-    public void initCombosForTestNoClientHang() {
-        addCombinationValues("clientInactivityLimit", new Object[] { new Long(1000)});
-        addCombinationValues("serverInactivityLimit", new Object[] { new Long(1000)});
-    }
     public void testNoClientHang() throws Exception {
+        startClient();
         
         assertEquals(0, serverErrorCount.get());
         assertEquals(0, clientErrorCount.get());
         
         Thread.sleep(4000);
         
-        if( clientErrorCount.get() > 0 )
-        	assertEquals(0, clientErrorCount.get());
-        if( serverErrorCount.get() > 0 )
-        	assertEquals(0, serverErrorCount.get());
+    	assertEquals(0, clientErrorCount.get());
+    	assertEquals(0, serverErrorCount.get());
     }
 
     /**
      * Used to test when a operation blocks.  This should
      * not cause transport to get disconnected.
+     * @throws Exception 
+     * @throws URISyntaxException 
      */
-    public void initCombosForTestNoClientHangWithServerBlock() {
+    public void initCombosForTestNoClientHangWithServerBlock() throws Exception {
+        
+        startClient();
+
         addCombinationValues("clientInactivityLimit", new Object[] { new Long(1000)});
         addCombinationValues("serverInactivityLimit", new Object[] { new Long(1000)});
         addCombinationValues("serverRunOnCommand", new Object[] { new Runnable() {
                 public void run() {
                     try {
                         System.out.println("Sleeping");
-                        Thread.sleep(2000);
+                        Thread.sleep(4000);
                     } catch (InterruptedException e) {
                     }
                 }
             }});
     }
+    
     public void testNoClientHangWithServerBlock() throws Exception {
         
+        startClient();
+
         assertEquals(0, serverErrorCount.get());
         assertEquals(0, clientErrorCount.get());