You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/12/10 20:48:54 UTC

kafka git commit: HOTFIX: Fix HerderRequest.compareTo()

Repository: kafka
Updated Branches:
  refs/heads/trunk 7c7becd4c -> 8e9e17767


HOTFIX: Fix HerderRequest.compareTo()

With KAFKA-3008 (#1788), the implementation does not respect the contract that 'sgn(x.compareTo(y)) == -sgn(y.compareTo(x))'

This fix addresses the hang with JDK8 in DistributedHerderTest.compareTo()

Author: Shikhar Bhushan <sh...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>, Konstantine Karantasis <ko...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #2232 from shikhar/herderreq-compareto


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8e9e1776
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8e9e1776
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8e9e1776

Branch: refs/heads/trunk
Commit: 8e9e1776790abb0a9b103f75d0231dd66d09e68f
Parents: 7c7becd
Author: Shikhar Bhushan <sh...@confluent.io>
Authored: Sat Dec 10 12:48:51 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Sat Dec 10 12:48:51 2016 -0800

----------------------------------------------------------------------
 .../runtime/distributed/DistributedHerder.java  | 25 ++++++++++++--------
 .../distributed/DistributedHerderTest.java      | 13 ++++++++++
 2 files changed, 28 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8e9e1776/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index a8799c6..25dfc6b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -67,6 +67,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * <p>
@@ -103,6 +104,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250;
     private static final int START_STOP_THREAD_POOL_SIZE = 8;
 
+    private final AtomicLong requestSeqNum = new AtomicLong();
+
     private final Time time;
 
     private final String workerGroupId;
@@ -125,7 +128,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
 
     // To handle most external requests, like creating or destroying a connector, we can use a generic request where
     // the caller specifies all the code that should be executed.
-    private final NavigableSet<HerderRequest> requests = new ConcurrentSkipListSet<>();
+    final NavigableSet<HerderRequest> requests = new ConcurrentSkipListSet<>();
     // Config updates can be collected and applied together when possible. Also, we need to take care to rebalance when
     // needed (e.g. task reconfiguration, which requires everyone to coordinate offset commits).
     private Set<String> connectorConfigUpdates = new HashSet<>();
@@ -1016,15 +1019,16 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         return false;
     }
 
-    private void addRequest(Callable<Void> action, Callback<Void> callback) {
-        addRequest(0, action, callback);
+    HerderRequest addRequest(Callable<Void> action, Callback<Void> callback) {
+        return addRequest(0, action, callback);
     }
 
-    private void addRequest(long delayMs, Callable<Void> action, Callback<Void> callback) {
-        HerderRequest req = new HerderRequest(time.milliseconds() + delayMs, action, callback);
+    HerderRequest addRequest(long delayMs, Callable<Void> action, Callback<Void> callback) {
+        HerderRequest req = new HerderRequest(time.milliseconds() + delayMs, requestSeqNum.incrementAndGet(), action, callback);
         requests.add(req);
         if (peekWithoutException() == req)
             member.wakeup();
+        return req;
     }
 
     private HerderRequest peekWithoutException() {
@@ -1091,13 +1095,15 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         }
     }
 
-    private static class HerderRequest implements Comparable<HerderRequest> {
+    static class HerderRequest implements Comparable<HerderRequest> {
         private final long at;
+        private final long seq;
         private final Callable<Void> action;
         private final Callback<Void> callback;
 
-        public HerderRequest(long at, Callable<Void> action, Callback<Void> callback) {
+        public HerderRequest(long at, long seq, Callable<Void> action, Callback<Void> callback) {
             this.at = at;
+            this.seq = seq;
             this.action = action;
             this.callback = callback;
         }
@@ -1112,9 +1118,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
 
         @Override
         public int compareTo(HerderRequest o) {
-            final int soonest = Long.compare(at, o.at);
-            // If tied, returning a positive value should respect insertion order.
-            return soonest != 0 ? soonest : 1;
+            final int cmp = Long.compare(at, o.at);
+            return cmp == 0 ? Long.compare(seq, o.seq) : cmp;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e9e1776/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 5be2044..6a36957 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -732,6 +732,19 @@ public class DistributedHerderTest {
     }
 
     @Test
+    public void testRequestProcessingOrder() throws Exception {
+        final DistributedHerder.HerderRequest req1 = herder.addRequest(100, null, null);
+        final DistributedHerder.HerderRequest req2 = herder.addRequest(10, null, null);
+        final DistributedHerder.HerderRequest req3 = herder.addRequest(200, null, null);
+        final DistributedHerder.HerderRequest req4 = herder.addRequest(200, null, null);
+
+        assertEquals(req2, herder.requests.pollFirst()); // lowest delay
+        assertEquals(req1, herder.requests.pollFirst()); // next lowest delay
+        assertEquals(req3, herder.requests.pollFirst()); // same delay as req4, but added first
+        assertEquals(req4, herder.requests.pollFirst());
+    }
+
+    @Test
     public void testRestartTaskRedirectToLeader() throws Exception {
         // get the initial assignment
         EasyMock.expect(member.memberId()).andStubReturn("member");