You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/12/10 20:48:54 UTC
kafka git commit: HOTFIX: Fix HerderRequest.compareTo()
Repository: kafka
Updated Branches:
refs/heads/trunk 7c7becd4c -> 8e9e17767
HOTFIX: Fix HerderRequest.compareTo()
With KAFKA-3008 (#1788), the implementation does not respect the contract that 'sgn(x.compareTo(y)) == -sgn(y.compareTo(x))'
This fix addresses the hang with JDK8 in DistributedHerderTest.compareTo()
Author: Shikhar Bhushan <sh...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>, Konstantine Karantasis <ko...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #2232 from shikhar/herderreq-compareto
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8e9e1776
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8e9e1776
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8e9e1776
Branch: refs/heads/trunk
Commit: 8e9e1776790abb0a9b103f75d0231dd66d09e68f
Parents: 7c7becd
Author: Shikhar Bhushan <sh...@confluent.io>
Authored: Sat Dec 10 12:48:51 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Sat Dec 10 12:48:51 2016 -0800
----------------------------------------------------------------------
.../runtime/distributed/DistributedHerder.java | 25 ++++++++++++--------
.../distributed/DistributedHerderTest.java | 13 ++++++++++
2 files changed, 28 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8e9e1776/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index a8799c6..25dfc6b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -67,6 +67,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
/**
* <p>
@@ -103,6 +104,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250;
private static final int START_STOP_THREAD_POOL_SIZE = 8;
+ private final AtomicLong requestSeqNum = new AtomicLong();
+
private final Time time;
private final String workerGroupId;
@@ -125,7 +128,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
// To handle most external requests, like creating or destroying a connector, we can use a generic request where
// the caller specifies all the code that should be executed.
- private final NavigableSet<HerderRequest> requests = new ConcurrentSkipListSet<>();
+ final NavigableSet<HerderRequest> requests = new ConcurrentSkipListSet<>();
// Config updates can be collected and applied together when possible. Also, we need to take care to rebalance when
// needed (e.g. task reconfiguration, which requires everyone to coordinate offset commits).
private Set<String> connectorConfigUpdates = new HashSet<>();
@@ -1016,15 +1019,16 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
return false;
}
- private void addRequest(Callable<Void> action, Callback<Void> callback) {
- addRequest(0, action, callback);
+ HerderRequest addRequest(Callable<Void> action, Callback<Void> callback) {
+ return addRequest(0, action, callback);
}
- private void addRequest(long delayMs, Callable<Void> action, Callback<Void> callback) {
- HerderRequest req = new HerderRequest(time.milliseconds() + delayMs, action, callback);
+ HerderRequest addRequest(long delayMs, Callable<Void> action, Callback<Void> callback) {
+ HerderRequest req = new HerderRequest(time.milliseconds() + delayMs, requestSeqNum.incrementAndGet(), action, callback);
requests.add(req);
if (peekWithoutException() == req)
member.wakeup();
+ return req;
}
private HerderRequest peekWithoutException() {
@@ -1091,13 +1095,15 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
}
- private static class HerderRequest implements Comparable<HerderRequest> {
+ static class HerderRequest implements Comparable<HerderRequest> {
private final long at;
+ private final long seq;
private final Callable<Void> action;
private final Callback<Void> callback;
- public HerderRequest(long at, Callable<Void> action, Callback<Void> callback) {
+ public HerderRequest(long at, long seq, Callable<Void> action, Callback<Void> callback) {
this.at = at;
+ this.seq = seq;
this.action = action;
this.callback = callback;
}
@@ -1112,9 +1118,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
@Override
public int compareTo(HerderRequest o) {
- final int soonest = Long.compare(at, o.at);
- // If tied, returning a positive value should respect insertion order.
- return soonest != 0 ? soonest : 1;
+ final int cmp = Long.compare(at, o.at);
+ return cmp == 0 ? Long.compare(seq, o.seq) : cmp;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8e9e1776/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 5be2044..6a36957 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -732,6 +732,19 @@ public class DistributedHerderTest {
}
@Test
+ public void testRequestProcessingOrder() throws Exception {
+ final DistributedHerder.HerderRequest req1 = herder.addRequest(100, null, null);
+ final DistributedHerder.HerderRequest req2 = herder.addRequest(10, null, null);
+ final DistributedHerder.HerderRequest req3 = herder.addRequest(200, null, null);
+ final DistributedHerder.HerderRequest req4 = herder.addRequest(200, null, null);
+
+ assertEquals(req2, herder.requests.pollFirst()); // lowest delay
+ assertEquals(req1, herder.requests.pollFirst()); // next lowest delay
+ assertEquals(req3, herder.requests.pollFirst()); // same delay as req4, but added first
+ assertEquals(req4, herder.requests.pollFirst());
+ }
+
+ @Test
public void testRestartTaskRedirectToLeader() throws Exception {
// get the initial assignment
EasyMock.expect(member.memberId()).andStubReturn("member");