You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2014/10/17 12:26:20 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-5266 https://issues.apache.org/jira/browse/AMQ-4485 - tidy up exception trace and async check, fix leveldb async test regressions

Repository: activemq
Updated Branches:
  refs/heads/trunk 76e29bdf9 -> 4705f95be


https://issues.apache.org/jira/browse/AMQ-5266 https://issues.apache.org/jira/browse/AMQ-4485 - tidy up exception trace and async check, fix leveldb async test regressions


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4705f95b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4705f95b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4705f95b

Branch: refs/heads/trunk
Commit: 4705f95becc983101d3534d573a45b157b400191
Parents: 76e29bd
Author: gtully <ga...@gmail.com>
Authored: Fri Oct 17 11:24:48 2014 +0100
Committer: gtully <ga...@gmail.com>
Committed: Fri Oct 17 11:25:47 2014 +0100

----------------------------------------------------------------------
 .../broker/region/cursors/AbstractStoreCursor.java | 17 +++++++++++++----
 1 file changed, 13 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/4705f95b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
index c4bf985..9998152 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
@@ -19,6 +19,7 @@ package org.apache.activemq.broker.region.cursors;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.ListIterator;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -249,9 +250,13 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
                     try {
                         future.get(5, TimeUnit.SECONDS);
                         setLastCachedId(ASYNC_ADD, lastPending);
+                    } catch (CancellationException ok) {
+                        continue;
                     } catch (TimeoutException potentialDeadlock) {
-                        LOG.warn("{} timed out waiting for async add", this, potentialDeadlock);
-                    } catch (Exception cancelledOrTimeOutOrErrorWorstCaseWeReplay) {cancelledOrTimeOutOrErrorWorstCaseWeReplay.printStackTrace();}
+                        LOG.debug("{} timed out waiting for async add", this, potentialDeadlock);
+                    } catch (Exception worstCaseWeReplay) {
+                        LOG.debug("{} exception waiting for async add", this, worstCaseWeReplay);
+                    }
                 } else {
                     setLastCachedId(ASYNC_ADD, lastPending);
                 }
@@ -259,7 +264,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
             }
             if (lastCachedIds[ASYNC_ADD] != null) {
                 // ensure we don't skip current possibly sync add b/c we waited on the future
-                if (currentAdd.isRecievedByDFBridge() || Long.compare(((Long) currentAdd.getMessageId().getFutureOrSequenceLong()), ((Long) lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong())) > 0) {
+                if (isAsync(currentAdd) || Long.compare(((Long) currentAdd.getMessageId().getFutureOrSequenceLong()), ((Long) lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong())) > 0) {
                     setBatch(lastCachedIds[ASYNC_ADD]);
                 }
             }
@@ -272,7 +277,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     }
 
     private void trackLastCached(MessageReference node) {
-        if (node.getMessageId().getFutureOrSequenceLong() instanceof Future || node.getMessage().isRecievedByDFBridge()) {
+        if (isAsync(node.getMessage())) {
             pruneLastCached();
             pendingCachedIds.add(node.getMessageId());
         } else {
@@ -280,6 +285,10 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
         }
     }
 
+    private static final boolean isAsync(Message message) {
+        return message.isRecievedByDFBridge() || message.getMessageId().getFutureOrSequenceLong() instanceof Future;
+    }
+
     private void pruneLastCached() {
         for (Iterator<MessageId> it = pendingCachedIds.iterator(); it.hasNext(); ) {
             MessageId candidate = it.next();