You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/11/02 20:39:53 UTC

svn commit: r1405122 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java

Author: tabish
Date: Fri Nov  2 19:39:53 2012
New Revision: 1405122

URL: http://svn.apache.org/viewvc?rev=1405122&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4156

More defensive checks when queuing read and write checks to the static executor.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java?rev=1405122&r1=1405121&r2=1405122&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java Fri Nov  2 19:39:53 2012
@@ -18,6 +18,7 @@ package org.apache.activemq.transport;
 
 import java.io.IOException;
 import java.util.Timer;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -107,10 +108,11 @@ public abstract class AbstractInactivity
 
     private final Runnable writeChecker = new Runnable() {
         long lastRunTime;
+
         public void run() {
             long now = System.currentTimeMillis();
-            if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
-                LOG.debug(this + " "+(now-lastRunTime)+" ms elapsed since last write check.");
+            if (lastRunTime != 0 && LOG.isDebugEnabled()) {
+                LOG.debug(this + " " + (now - lastRunTime) + " ms elapsed since last write check.");
 
             }
             lastRunTime = now;
@@ -146,39 +148,49 @@ public abstract class AbstractInactivity
             return;
         }
 
-        if (!commandSent.get() && useKeepAlive && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
+        if (!commandSent.get() && useKeepAlive && monitorStarted.get() &&
+            !ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
+
             if (LOG.isTraceEnabled()) {
                 LOG.trace(this + " no message sent since last write check, sending a KeepAliveInfo");
             }
-            ASYNC_TASKS.execute(new Runnable() {
-                public void run() {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Running {}", this);
-                    }
-                    if (monitorStarted.get()) {
-                        try {
-                            // If we can't get the lock it means another write beat us into the
-                            // send and we don't need to heart beat now.
-                            if (sendLock.writeLock().tryLock()) {
-                                KeepAliveInfo info = new KeepAliveInfo();
-                                info.setResponseRequired(keepAliveResponseRequired);
-                                doOnewaySend(info);
+
+            try {
+                ASYNC_TASKS.execute(new Runnable() {
+                    public void run() {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Running {}", this);
+                        }
+                        if (monitorStarted.get()) {
+                            try {
+                                // If we can't get the lock it means another write beat us into the
+                                // send and we don't need to heart beat now.
+                                if (sendLock.writeLock().tryLock()) {
+                                    KeepAliveInfo info = new KeepAliveInfo();
+                                    info.setResponseRequired(keepAliveResponseRequired);
+                                    doOnewaySend(info);
+                                }
+                            } catch (IOException e) {
+                                onException(e);
+                            } finally {
+                                 if (sendLock.writeLock().isHeldByCurrentThread()) {
+                                    sendLock.writeLock().unlock();
+                                 }
                             }
-                        } catch (IOException e) {
-                            onException(e);
-                        } finally {
-                             if (sendLock.writeLock().isHeldByCurrentThread()) {
-                                sendLock.writeLock().unlock();
-                             }
                         }
                     }
-                }
 
-                @Override
-                public String toString() {
-                    return "WriteCheck[" + getRemoteAddress() + "]";
-                };
-            });
+                    @Override
+                    public String toString() {
+                        return "WriteCheck[" + getRemoteAddress() + "]";
+                    };
+                });
+            } catch (RejectedExecutionException ex) {
+                if (!ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
+                    LOG.error("Async write check was rejected from the executor: ", ex);
+                    throw ex;
+                }
+            }
         } else {
             if (LOG.isTraceEnabled()) {
                 LOG.trace(this + " message sent since last write check, resetting flag");
@@ -197,23 +209,33 @@ public abstract class AbstractInactivity
             }
             return;
         }
-        if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
+        if (!commandReceived.get() && monitorStarted.get() &&
+            !ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
+
             if (LOG.isDebugEnabled()) {
                 LOG.debug("No message received since last read check for " + toString() + ". Throwing InactivityIOException.");
             }
-            ASYNC_TASKS.execute(new Runnable() {
-                public void run() {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Running {}", this);
+
+            try {
+                ASYNC_TASKS.execute(new Runnable() {
+                    public void run() {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Running {}", this);
+                        }
+                        onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: "+next.getRemoteAddress()));
                     }
-                    onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: "+next.getRemoteAddress()));
-                }
 
-                @Override
-                public String toString() {
-                    return "ReadCheck[" + getRemoteAddress() + "]";
-                };
-            });
+                    @Override
+                    public String toString() {
+                        return "ReadCheck[" + getRemoteAddress() + "]";
+                    };
+                });
+            } catch (RejectedExecutionException ex) {
+                if (!ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
+                    LOG.error("Async read check was rejected from the executor: ", ex);
+                    throw ex;
+                }
+            }
         } else {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Message received since last read check, resetting flag: ");
@@ -278,8 +300,8 @@ public abstract class AbstractInactivity
 
     // Must be called under lock, either read or write on sendLock.
     private void doOnewaySend(Object command) throws IOException {
-        if( failed.get() ) {
-            throw new InactivityIOException("Cannot send, channel has already failed: "+next.getRemoteAddress());
+        if (failed.get()) {
+            throw new InactivityIOException("Cannot send, channel has already failed: " + next.getRemoteAddress());
         }
         if (command.getClass() == WireFormatInfo.class) {
             synchronized (this) {
@@ -382,13 +404,13 @@ public abstract class AbstractInactivity
             if (writeCheckerTask != null) {
                 writeCheckerTask.cancel();
             }
-            synchronized( AbstractInactivityMonitor.class ) {
+            synchronized (AbstractInactivityMonitor.class) {
                 WRITE_CHECK_TIMER.purge();
                 READ_CHECK_TIMER.purge();
                 CHECKER_COUNTER--;
-                if(CHECKER_COUNTER==0) {
-                  WRITE_CHECK_TIMER.cancel();
-                  READ_CHECK_TIMER.cancel();
+                if (CHECKER_COUNTER == 0) {
+                    WRITE_CHECK_TIMER.cancel();
+                    READ_CHECK_TIMER.cancel();
                     WRITE_CHECK_TIMER = null;
                     READ_CHECK_TIMER = null;
                     ThreadPoolUtils.shutdown(ASYNC_TASKS);