You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ro...@apache.org on 2017/09/15 17:15:48 UTC

[2/2] activemq git commit: AMQ-6813: update tick deadline handling to account for potential to be negative due to using nanoTime derived values, plus other edge case protections

AMQ-6813: update tick deadline handling to account for potential to be negative due to using nanoTime derived values, plus other edge case protections

(cherry picked from commit f82eccd2f504b59c2e98ba8273e28f4d7a2a8698)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4f6326f4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4f6326f4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4f6326f4

Branch: refs/heads/activemq-5.15.x
Commit: 4f6326f4fb35867b6d83c624a947a4510d0f674f
Parents: e1ac826
Author: Robbie Gemmell <ro...@apache.org>
Authored: Fri Sep 15 17:52:36 2017 +0100
Committer: Robbie Gemmell <ro...@apache.org>
Committed: Fri Sep 15 18:04:01 2017 +0100

----------------------------------------------------------------------
 .../transport/amqp/protocol/AmqpConnection.java      | 15 ++++++++++-----
 .../transport/amqp/client/AmqpConnection.java        | 15 ++++++++-------
 2 files changed, 18 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/4f6326f4/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
index 57b2502..dec1bc9 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
@@ -251,12 +251,16 @@ public class AmqpConnection implements AmqpProtocolConverter {
         if (protonConnection.getLocalState() != EndpointState.CLOSED) {
             // Using nano time since it is not related to the wall clock, which may change
             long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
-            rescheduleAt = protonTransport.tick(now) - now;
+            long deadline = protonTransport.tick(now);
             pumpProtonToSocket();
             if (protonTransport.isClosed()) {
-                rescheduleAt = 0;
                 LOG.debug("Transport closed after inactivity check.");
-                throw new InactivityIOException("Channel was inactive for to long");
+                throw new InactivityIOException("Channel was inactive for too long");
+            } else {
+                if(deadline != 0) {
+                    // caller treats 0 as no-work, ensure value is at least 1 as there was a deadline
+                    rescheduleAt = Math.max(deadline - now, 1);
+                }
             }
         }
 
@@ -835,8 +839,9 @@ public class AmqpConnection implements AmqpProtocolConverter {
         // Using nano time since it is not related to the wall clock, which may change
         long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
         long nextIdleCheck = protonTransport.tick(now);
-        if (nextIdleCheck > 0) {
-            long delay = nextIdleCheck - now;
+        if (nextIdleCheck != 0) {
+            // monitor treats <= 0 as no work, ensure value is at least 1 as there was a deadline
+            long delay = Math.max(nextIdleCheck - now, 1);
             LOG.trace("Connection keep-alive processing starts in: {}", delay);
             monitor.startKeepAliveTask(delay);
         } else {

http://git-wip-us.apache.org/repos/asf/activemq/blob/4f6326f4/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
index 1f3fe09..3fe3ab6 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
@@ -587,7 +587,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
                 // Using nano time since it is not related to the wall clock, which may change
                 long initialNow = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
                 long initialKeepAliveDeadline = protonTransport.tick(initialNow);
-                if (initialKeepAliveDeadline > 0) {
+                if (initialKeepAliveDeadline != 0) {
 
                     getScheduler().schedule(new Runnable() {
 
@@ -598,15 +598,16 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
                                     LOG.debug("Client performing next idle check");
                                     // Using nano time since it is not related to the wall clock, which may change
                                     long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
-                                    long rescheduleAt = protonTransport.tick(now) - now;
+                                    long deadline = protonTransport.tick(now);
+
                                     pumpToProtonTransport();
                                     if (protonTransport.isClosed()) {
                                         LOG.debug("Transport closed after inactivity check.");
-                                        throw new InactivityIOException("Channel was inactive for to long");
-                                    }
-
-                                    if (rescheduleAt > 0) {
-                                        getScheduler().schedule(this, rescheduleAt, TimeUnit.MILLISECONDS);
+                                        throw new InactivityIOException("Channel was inactive for too long");
+                                    } else {
+                                        if(deadline != 0) {
+                                            getScheduler().schedule(this, deadline - now, TimeUnit.MILLISECONDS);
+                                        }
                                     }
                                 }
                             } catch (Exception e) {