You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/06/12 15:42:53 UTC

svn commit: r546482 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java

Author: jstrachan
Date: Tue Jun 12 06:42:52 2007
New Revision: 546482

URL: http://svn.apache.org/viewvc?view=rev&rev=546482
Log:
applied patch for AMQ-1146

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?view=diff&rev=546482&r1=546481&r2=546482
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java Tue Jun 12 06:42:52 2007
@@ -27,14 +27,14 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
- * Used to make sure that commands are arriving periodically from the peer of the transport.  
- * 
+ * Used to make sure that commands are arriving periodically from the peer of the transport.
+ *
  * @version $Revision$
  */
 public class InactivityMonitor extends TransportFilter {
 
     private final Log log = LogFactory.getLog(InactivityMonitor.class);
-    
+
     private WireFormatInfo localWireFormatInfo;
     private WireFormatInfo remoteWireFormatInfo;
     private final AtomicBoolean monitorStarted= new AtomicBoolean(false);
@@ -44,20 +44,20 @@
 
     private final AtomicBoolean commandReceived=new AtomicBoolean(true);
     private final AtomicBoolean inReceive=new AtomicBoolean(false);
-    
+
     private final Runnable readChecker = new Runnable() {
         public void run() {
             readCheck();
         }
     };
-    
+
     private final Runnable writeChecker = new Runnable() {
         public void run() {
             writeCheck();
         }
     };
-    
-    
+
+
     public InactivityMonitor(Transport next) {
         super(next);
     }
@@ -67,108 +67,116 @@
         next.stop();
     }
 
-        
+
     private void writeCheck() {
-        if( inSend.get() ) {
-            log.trace("A send is in progress");
-            return;
+        synchronized(writeChecker) {
+            if( inSend.get() ) {
+                log.trace("A send is in progress");
+                return;
+            }
+
+            if( !commandSent.get() ) {
+                log.trace("No message sent since last write check, sending a KeepAliveInfo");
+                try {
+                    next.oneway(new KeepAliveInfo());
+                } catch (IOException e) {
+                    onException(e);
+                }
+            } else {
+                log.trace("Message sent since last write check, resetting flag");
+            }
+
+            commandSent.set(false);
         }
-        
-        if( !commandSent.get() ) {
-            log.trace("No message sent since last write check, sending a KeepAliveInfo");
-            try {
-                next.oneway(new KeepAliveInfo());                
-            } catch (IOException e) {
-                onException(e);
-            }
-        } else {
-            log.trace("Message sent since last write check, resetting flag");
-        }
-        
-        commandSent.set(false);
-        
     }
 
     private void readCheck() {
-        if( inReceive.get() ) {
-            log.trace("A receive is in progress");
-            return;
-        }
-        
-        if( !commandReceived.get() ) {
-            log.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
-            onException(new InactivityIOException("Channel was inactive for too long."));           
-        } else {
-            log.trace("Message received since last read check, resetting flag: ");
+        synchronized(readChecker) {
+            if( inReceive.get() ) {
+                log.trace("A receive is in progress");
+                return;
+            }
+
+            if( !commandReceived.get() ) {
+                log.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
+                onException(new InactivityIOException("Channel was inactive for too long."));
+            } else {
+                log.trace("Message received since last read check, resetting flag: ");
+            }
+
+            commandReceived.set(false);
         }
-        
-        commandReceived.set(false);
+
     }
 
     public void onCommand(Object command) {
-        inReceive.set(true);
-        try {
-            if( command.getClass() == WireFormatInfo.class ) {
-                synchronized( this ) {
-                    remoteWireFormatInfo = (WireFormatInfo) command;
-                    try {
-                        startMonitorThreads();
-                    } catch (IOException e) {
-                        onException(e);
+        synchronized(readChecker) {
+            inReceive.set(true);
+            try {
+                if( command.getClass() == WireFormatInfo.class ) {
+                    synchronized( this ) {
+                        remoteWireFormatInfo = (WireFormatInfo) command;
+                        try {
+                            startMonitorThreads();
+                        } catch (IOException e) {
+                            onException(e);
+                        }
                     }
                 }
+                transportListener.onCommand(command);
+            } finally {
+                inReceive.set(false);
+                commandReceived.set(true);
             }
-            transportListener.onCommand(command);
-        } finally {
-            inReceive.set(false);
-            commandReceived.set(true);
         }
     }
 
-    
+
     public void oneway(Object o) throws IOException {
-        // Disable inactivity monitoring while processing a command.
-        inSend.set(true);
-        commandSent.set(true);
-        try {
-            if( o.getClass() == WireFormatInfo.class ) {
-                synchronized( this ) {
-                    localWireFormatInfo = (WireFormatInfo) o;
-                    startMonitorThreads();
+        synchronized(writeChecker) {
+            // Disable inactivity monitoring while processing a command.
+            inSend.set(true);
+            commandSent.set(true);
+            try {
+                if( o.getClass() == WireFormatInfo.class ) {
+                    synchronized( this ) {
+                        localWireFormatInfo = (WireFormatInfo) o;
+                        startMonitorThreads();
+                    }
                 }
+                next.oneway(o);
+            } finally {
+                inSend.set(false);
             }
-            next.oneway(o);
-        } finally {
-            inSend.set(false);
         }
     }
-    
+
     public void onException(IOException error) {
     	if( monitorStarted.get() ) {
 	        stopMonitorThreads();
     	}
         getTransportListener().onException(error);
     }
-    
-    
+
+
     synchronized private void startMonitorThreads() throws IOException {
-        if( monitorStarted.get() ) 
+        if( monitorStarted.get() )
             return;
         if( localWireFormatInfo == null )
             return;
         if( remoteWireFormatInfo == null )
             return;
-        
+
         long l = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
         if( l > 0 ) {
-            monitorStarted.set(true);        
+            monitorStarted.set(true);
             Scheduler.executePeriodically(writeChecker, l/2);
             Scheduler.executePeriodically(readChecker, l);
         }
     }
-    
+
     /**
-     * 
+     *
      */
     synchronized private void stopMonitorThreads() {
         if( monitorStarted.compareAndSet(true, false) ) {
@@ -176,6 +184,6 @@
             Scheduler.cancel(writeChecker);
         }
     }
-    
+
 
 }