You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/10/19 15:56:37 UTC
svn commit: r586460 -
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
Author: chirino
Date: Fri Oct 19 06:56:37 2007
New Revision: 586460
URL: http://svn.apache.org/viewvc?rev=586460&view=rev
Log:
Reduced the amount of time that the synchronizations are held to avoid deadlocks
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?rev=586460&r1=586459&r2=586460&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java Fri Oct 19 06:56:37 2007
@@ -67,84 +67,86 @@
}
final void writeCheck() {
- synchronized (writeChecker) {
if (inSend.get()) {
- LOG.trace("A send is in progress");
- return;
- }
+ LOG.trace("A send is in progress");
+ return;
+ }
- if (!commandSent.get()) {
- LOG.trace("No message sent since last write check, sending a KeepAliveInfo");
- try {
+ if (!commandSent.get()) {
+ LOG.trace("No message sent since last write check, sending a KeepAliveInfo");
+ try {
+ synchronized (writeChecker) {
next.oneway(new KeepAliveInfo());
- } catch (IOException e) {
- onException(e);
}
- } else {
- LOG.trace("Message sent since last write check, resetting flag");
+ } catch (IOException e) {
+ onException(e);
}
-
- commandSent.set(false);
+ } else {
+ LOG.trace("Message sent since last write check, resetting flag");
}
+
+ commandSent.set(false);
}
final void readCheck() {
- synchronized (readChecker) {
- if (inReceive.get()) {
- LOG.trace("A receive is in progress");
- return;
- }
+ if (inReceive.get()) {
+ LOG.trace("A receive is in progress");
+ return;
+ }
- if (!commandReceived.get()) {
- LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
+ if (!commandReceived.get()) {
+ LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
+ synchronized (readChecker) {
onException(new InactivityIOException("Channel was inactive for too long."));
- } else {
- LOG.trace("Message received since last read check, resetting flag: ");
}
-
- commandReceived.set(false);
+ } else {
+ LOG.trace("Message received since last read check, resetting flag: ");
}
-
+ commandReceived.set(false);
}
public void onCommand(Object command) {
- synchronized (readChecker) {
- inReceive.set(true);
- try {
- if (command.getClass() == WireFormatInfo.class) {
- synchronized (this) {
- remoteWireFormatInfo = (WireFormatInfo)command;
- try {
- startMonitorThreads();
- } catch (IOException e) {
- onException(e);
- }
+ inReceive.set(true);
+ try {
+ if (command.getClass() == WireFormatInfo.class) {
+ synchronized (this) {
+ IOException error=null;
+ remoteWireFormatInfo = (WireFormatInfo)command;
+ try {
+ startMonitorThreads();
+ } catch (IOException e) {
+ error = e;
+ }
+ if( error!=null ) {
+ onException(error);
}
}
+ }
+ synchronized (readChecker) {
transportListener.onCommand(command);
- } finally {
- inReceive.set(false);
- commandReceived.set(true);
}
+ } finally {
+ commandReceived.set(true);
+ inReceive.set(false);
}
}
public void oneway(Object o) throws IOException {
- synchronized (writeChecker) {
- // Disable inactivity monitoring while processing a command.
- inSend.set(true);
- commandSent.set(true);
- try {
- if (o.getClass() == WireFormatInfo.class) {
- synchronized (this) {
- localWireFormatInfo = (WireFormatInfo)o;
- startMonitorThreads();
- }
+ // Disable inactivity monitoring while processing a command.
+ inSend.set(true);
+ try {
+ if (o.getClass() == WireFormatInfo.class) {
+ synchronized (this) {
+ localWireFormatInfo = (WireFormatInfo)o;
+ startMonitorThreads();
}
+ }
+ synchronized (writeChecker) {
next.oneway(o);
- } finally {
- inSend.set(false);
}
+ } finally {
+ commandSent.set(true);
+ inSend.set(false);
}
}
@@ -152,7 +154,9 @@
if (monitorStarted.get()) {
stopMonitorThreads();
}
- getTransportListener().onException(error);
+ synchronized (readChecker) {
+ transportListener.onException(error);
+ }
}
private synchronized void startMonitorThreads() throws IOException {