You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/10/19 15:56:37 UTC

svn commit: r586460 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java

Author: chirino
Date: Fri Oct 19 06:56:37 2007
New Revision: 586460

URL: http://svn.apache.org/viewvc?rev=586460&view=rev
Log:
Reduced the amount of time that the synchronizations are held to avoid deadlocks

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?rev=586460&r1=586459&r2=586460&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java Fri Oct 19 06:56:37 2007
@@ -67,84 +67,86 @@
     }
 
     final void writeCheck() {
-        synchronized (writeChecker) {
             if (inSend.get()) {
-                LOG.trace("A send is in progress");
-                return;
-            }
+            LOG.trace("A send is in progress");
+            return;
+        }
 
-            if (!commandSent.get()) {
-                LOG.trace("No message sent since last write check, sending a KeepAliveInfo");
-                try {
+        if (!commandSent.get()) {
+            LOG.trace("No message sent since last write check, sending a KeepAliveInfo");
+            try {
+                synchronized (writeChecker) {
                     next.oneway(new KeepAliveInfo());
-                } catch (IOException e) {
-                    onException(e);
                 }
-            } else {
-                LOG.trace("Message sent since last write check, resetting flag");
+            } catch (IOException e) {
+                onException(e);
             }
-
-            commandSent.set(false);
+        } else {
+            LOG.trace("Message sent since last write check, resetting flag");
         }
+
+        commandSent.set(false);
     }
 
     final void readCheck() {
-        synchronized (readChecker) {
-            if (inReceive.get()) {
-                LOG.trace("A receive is in progress");
-                return;
-            }
+        if (inReceive.get()) {
+            LOG.trace("A receive is in progress");
+            return;
+        }
 
-            if (!commandReceived.get()) {
-                LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
+        if (!commandReceived.get()) {
+            LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
+            synchronized (readChecker) {
                 onException(new InactivityIOException("Channel was inactive for too long."));
-            } else {
-                LOG.trace("Message received since last read check, resetting flag: ");
             }
-
-            commandReceived.set(false);
+        } else {
+            LOG.trace("Message received since last read check, resetting flag: ");
         }
-
+        commandReceived.set(false);
     }
 
     public void onCommand(Object command) {
-        synchronized (readChecker) {
-            inReceive.set(true);
-            try {
-                if (command.getClass() == WireFormatInfo.class) {
-                    synchronized (this) {
-                        remoteWireFormatInfo = (WireFormatInfo)command;
-                        try {
-                            startMonitorThreads();
-                        } catch (IOException e) {
-                            onException(e);
-                        }
+        inReceive.set(true);
+        try {
+            if (command.getClass() == WireFormatInfo.class) {
+                synchronized (this) {
+                    IOException error=null;
+                    remoteWireFormatInfo = (WireFormatInfo)command;
+                    try {
+                        startMonitorThreads();
+                    } catch (IOException e) {
+                        error = e;
+                    }
+                    if( error!=null ) {
+                        onException(error);
                     }
                 }
+            }
+            synchronized (readChecker) {
                 transportListener.onCommand(command);
-            } finally {
-                inReceive.set(false);
-                commandReceived.set(true);
             }
+        } finally {
+            commandReceived.set(true);
+            inReceive.set(false);
         }
     }
 
     public void oneway(Object o) throws IOException {
-        synchronized (writeChecker) {
-            // Disable inactivity monitoring while processing a command.
-            inSend.set(true);
-            commandSent.set(true);
-            try {
-                if (o.getClass() == WireFormatInfo.class) {
-                    synchronized (this) {
-                        localWireFormatInfo = (WireFormatInfo)o;
-                        startMonitorThreads();
-                    }
+        // Disable inactivity monitoring while processing a command.
+        inSend.set(true);
+        try {
+            if (o.getClass() == WireFormatInfo.class) {
+                synchronized (this) {
+                    localWireFormatInfo = (WireFormatInfo)o;
+                    startMonitorThreads();
                 }
+            }
+            synchronized (writeChecker) {
                 next.oneway(o);
-            } finally {
-                inSend.set(false);
             }
+        } finally {
+            commandSent.set(true);
+            inSend.set(false);
         }
     }
 
@@ -152,7 +154,9 @@
         if (monitorStarted.get()) {
             stopMonitorThreads();
         }
-        getTransportListener().onException(error);
+        synchronized (readChecker) {
+            transportListener.onException(error);
+        }
     }
 
     private synchronized void startMonitorThreads() throws IOException {