You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/11/02 20:39:53 UTC
svn commit: r1405122 -
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
Author: tabish
Date: Fri Nov 2 19:39:53 2012
New Revision: 1405122
URL: http://svn.apache.org/viewvc?rev=1405122&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4156
More defensive checks when queuing read and write checks to the static executor.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java?rev=1405122&r1=1405121&r2=1405122&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java Fri Nov 2 19:39:53 2012
@@ -18,6 +18,7 @@ package org.apache.activemq.transport;
import java.io.IOException;
import java.util.Timer;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@@ -107,10 +108,11 @@ public abstract class AbstractInactivity
private final Runnable writeChecker = new Runnable() {
long lastRunTime;
+
public void run() {
long now = System.currentTimeMillis();
- if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
- LOG.debug(this + " "+(now-lastRunTime)+" ms elapsed since last write check.");
+ if (lastRunTime != 0 && LOG.isDebugEnabled()) {
+ LOG.debug(this + " " + (now - lastRunTime) + " ms elapsed since last write check.");
}
lastRunTime = now;
@@ -146,39 +148,49 @@ public abstract class AbstractInactivity
return;
}
- if (!commandSent.get() && useKeepAlive && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
+ if (!commandSent.get() && useKeepAlive && monitorStarted.get() &&
+ !ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
+
if (LOG.isTraceEnabled()) {
LOG.trace(this + " no message sent since last write check, sending a KeepAliveInfo");
}
- ASYNC_TASKS.execute(new Runnable() {
- public void run() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Running {}", this);
- }
- if (monitorStarted.get()) {
- try {
- // If we can't get the lock it means another write beat us into the
- // send and we don't need to heart beat now.
- if (sendLock.writeLock().tryLock()) {
- KeepAliveInfo info = new KeepAliveInfo();
- info.setResponseRequired(keepAliveResponseRequired);
- doOnewaySend(info);
+
+ try {
+ ASYNC_TASKS.execute(new Runnable() {
+ public void run() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Running {}", this);
+ }
+ if (monitorStarted.get()) {
+ try {
+ // If we can't get the lock it means another write beat us into the
+ // send and we don't need to heart beat now.
+ if (sendLock.writeLock().tryLock()) {
+ KeepAliveInfo info = new KeepAliveInfo();
+ info.setResponseRequired(keepAliveResponseRequired);
+ doOnewaySend(info);
+ }
+ } catch (IOException e) {
+ onException(e);
+ } finally {
+ if (sendLock.writeLock().isHeldByCurrentThread()) {
+ sendLock.writeLock().unlock();
+ }
}
- } catch (IOException e) {
- onException(e);
- } finally {
- if (sendLock.writeLock().isHeldByCurrentThread()) {
- sendLock.writeLock().unlock();
- }
}
}
- }
- @Override
- public String toString() {
- return "WriteCheck[" + getRemoteAddress() + "]";
- };
- });
+ @Override
+ public String toString() {
+ return "WriteCheck[" + getRemoteAddress() + "]";
+ };
+ });
+ } catch (RejectedExecutionException ex) {
+ if (!ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
+ LOG.error("Async write check was rejected from the executor: ", ex);
+ throw ex;
+ }
+ }
} else {
if (LOG.isTraceEnabled()) {
LOG.trace(this + " message sent since last write check, resetting flag");
@@ -197,23 +209,33 @@ public abstract class AbstractInactivity
}
return;
}
- if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
+ if (!commandReceived.get() && monitorStarted.get() &&
+ !ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
+
if (LOG.isDebugEnabled()) {
LOG.debug("No message received since last read check for " + toString() + ". Throwing InactivityIOException.");
}
- ASYNC_TASKS.execute(new Runnable() {
- public void run() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Running {}", this);
+
+ try {
+ ASYNC_TASKS.execute(new Runnable() {
+ public void run() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Running {}", this);
+ }
+ onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: "+next.getRemoteAddress()));
}
- onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: "+next.getRemoteAddress()));
- }
- @Override
- public String toString() {
- return "ReadCheck[" + getRemoteAddress() + "]";
- };
- });
+ @Override
+ public String toString() {
+ return "ReadCheck[" + getRemoteAddress() + "]";
+ };
+ });
+ } catch (RejectedExecutionException ex) {
+ if (!ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
+ LOG.error("Async read check was rejected from the executor: ", ex);
+ throw ex;
+ }
+ }
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Message received since last read check, resetting flag: ");
@@ -278,8 +300,8 @@ public abstract class AbstractInactivity
// Must be called under lock, either read or write on sendLock.
private void doOnewaySend(Object command) throws IOException {
- if( failed.get() ) {
- throw new InactivityIOException("Cannot send, channel has already failed: "+next.getRemoteAddress());
+ if (failed.get()) {
+ throw new InactivityIOException("Cannot send, channel has already failed: " + next.getRemoteAddress());
}
if (command.getClass() == WireFormatInfo.class) {
synchronized (this) {
@@ -382,13 +404,13 @@ public abstract class AbstractInactivity
if (writeCheckerTask != null) {
writeCheckerTask.cancel();
}
- synchronized( AbstractInactivityMonitor.class ) {
+ synchronized (AbstractInactivityMonitor.class) {
WRITE_CHECK_TIMER.purge();
READ_CHECK_TIMER.purge();
CHECKER_COUNTER--;
- if(CHECKER_COUNTER==0) {
- WRITE_CHECK_TIMER.cancel();
- READ_CHECK_TIMER.cancel();
+ if (CHECKER_COUNTER == 0) {
+ WRITE_CHECK_TIMER.cancel();
+ READ_CHECK_TIMER.cancel();
WRITE_CHECK_TIMER = null;
READ_CHECK_TIMER = null;
ThreadPoolUtils.shutdown(ASYNC_TASKS);