You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/06/12 15:42:53 UTC
svn commit: r546482 -
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
Author: jstrachan
Date: Tue Jun 12 06:42:52 2007
New Revision: 546482
URL: http://svn.apache.org/viewvc?view=rev&rev=546482
Log:
applied patch for AMQ-1146
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?view=diff&rev=546482&r1=546481&r2=546482
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java Tue Jun 12 06:42:52 2007
@@ -27,14 +27,14 @@
import java.util.concurrent.atomic.AtomicBoolean;
/**
- * Used to make sure that commands are arriving periodically from the peer of the transport.
- *
+ * Used to make sure that commands are arriving periodically from the peer of the transport.
+ *
* @version $Revision$
*/
public class InactivityMonitor extends TransportFilter {
private final Log log = LogFactory.getLog(InactivityMonitor.class);
-
+
private WireFormatInfo localWireFormatInfo;
private WireFormatInfo remoteWireFormatInfo;
private final AtomicBoolean monitorStarted= new AtomicBoolean(false);
@@ -44,20 +44,20 @@
private final AtomicBoolean commandReceived=new AtomicBoolean(true);
private final AtomicBoolean inReceive=new AtomicBoolean(false);
-
+
private final Runnable readChecker = new Runnable() {
public void run() {
readCheck();
}
};
-
+
private final Runnable writeChecker = new Runnable() {
public void run() {
writeCheck();
}
};
-
-
+
+
public InactivityMonitor(Transport next) {
super(next);
}
@@ -67,108 +67,116 @@
next.stop();
}
-
+
private void writeCheck() {
- if( inSend.get() ) {
- log.trace("A send is in progress");
- return;
+ synchronized(writeChecker) {
+ if( inSend.get() ) {
+ log.trace("A send is in progress");
+ return;
+ }
+
+ if( !commandSent.get() ) {
+ log.trace("No message sent since last write check, sending a KeepAliveInfo");
+ try {
+ next.oneway(new KeepAliveInfo());
+ } catch (IOException e) {
+ onException(e);
+ }
+ } else {
+ log.trace("Message sent since last write check, resetting flag");
+ }
+
+ commandSent.set(false);
}
-
- if( !commandSent.get() ) {
- log.trace("No message sent since last write check, sending a KeepAliveInfo");
- try {
- next.oneway(new KeepAliveInfo());
- } catch (IOException e) {
- onException(e);
- }
- } else {
- log.trace("Message sent since last write check, resetting flag");
- }
-
- commandSent.set(false);
-
}
private void readCheck() {
- if( inReceive.get() ) {
- log.trace("A receive is in progress");
- return;
- }
-
- if( !commandReceived.get() ) {
- log.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
- onException(new InactivityIOException("Channel was inactive for too long."));
- } else {
- log.trace("Message received since last read check, resetting flag: ");
+ synchronized(readChecker) {
+ if( inReceive.get() ) {
+ log.trace("A receive is in progress");
+ return;
+ }
+
+ if( !commandReceived.get() ) {
+ log.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
+ onException(new InactivityIOException("Channel was inactive for too long."));
+ } else {
+ log.trace("Message received since last read check, resetting flag: ");
+ }
+
+ commandReceived.set(false);
}
-
- commandReceived.set(false);
+
}
public void onCommand(Object command) {
- inReceive.set(true);
- try {
- if( command.getClass() == WireFormatInfo.class ) {
- synchronized( this ) {
- remoteWireFormatInfo = (WireFormatInfo) command;
- try {
- startMonitorThreads();
- } catch (IOException e) {
- onException(e);
+ synchronized(readChecker) {
+ inReceive.set(true);
+ try {
+ if( command.getClass() == WireFormatInfo.class ) {
+ synchronized( this ) {
+ remoteWireFormatInfo = (WireFormatInfo) command;
+ try {
+ startMonitorThreads();
+ } catch (IOException e) {
+ onException(e);
+ }
}
}
+ transportListener.onCommand(command);
+ } finally {
+ inReceive.set(false);
+ commandReceived.set(true);
}
- transportListener.onCommand(command);
- } finally {
- inReceive.set(false);
- commandReceived.set(true);
}
}
-
+
public void oneway(Object o) throws IOException {
- // Disable inactivity monitoring while processing a command.
- inSend.set(true);
- commandSent.set(true);
- try {
- if( o.getClass() == WireFormatInfo.class ) {
- synchronized( this ) {
- localWireFormatInfo = (WireFormatInfo) o;
- startMonitorThreads();
+ synchronized(writeChecker) {
+ // Disable inactivity monitoring while processing a command.
+ inSend.set(true);
+ commandSent.set(true);
+ try {
+ if( o.getClass() == WireFormatInfo.class ) {
+ synchronized( this ) {
+ localWireFormatInfo = (WireFormatInfo) o;
+ startMonitorThreads();
+ }
}
+ next.oneway(o);
+ } finally {
+ inSend.set(false);
}
- next.oneway(o);
- } finally {
- inSend.set(false);
}
}
-
+
public void onException(IOException error) {
if( monitorStarted.get() ) {
stopMonitorThreads();
}
getTransportListener().onException(error);
}
-
-
+
+
synchronized private void startMonitorThreads() throws IOException {
- if( monitorStarted.get() )
+ if( monitorStarted.get() )
return;
if( localWireFormatInfo == null )
return;
if( remoteWireFormatInfo == null )
return;
-
+
long l = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
if( l > 0 ) {
- monitorStarted.set(true);
+ monitorStarted.set(true);
Scheduler.executePeriodically(writeChecker, l/2);
Scheduler.executePeriodically(readChecker, l);
}
}
-
+
/**
- *
+ *
*/
synchronized private void stopMonitorThreads() {
if( monitorStarted.compareAndSet(true, false) ) {
@@ -176,6 +184,6 @@
Scheduler.cancel(writeChecker);
}
}
-
+
}