You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/02/09 18:33:30 UTC
kafka git commit: KAFKA-1925;
Fix coordinator broker id stuck with INT_MIN; reviewed by Jay Kreps
Repository: kafka
Updated Branches:
refs/heads/trunk 1c6d5bbac -> 0839def4b
KAFKA-1925; Fix coordinator broker id stuck with INT_MIN; reviewed by Jay Kreps
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0839def4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0839def4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0839def4
Branch: refs/heads/trunk
Commit: 0839def4bdbfd9f5939e463f42cfbc81a98a8eff
Parents: 1c6d5bb
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Feb 9 09:33:16 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Feb 9 09:33:16 2015 -0800
----------------------------------------------------------------------
.../main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java | 2 +-
clients/src/main/java/org/apache/kafka/common/Node.java | 2 +-
.../test/scala/unit/kafka/integration/KafkaServerTestHarness.scala | 1 +
3 files changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0839def4/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 09a6f11..67ceb75 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1281,7 +1281,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
if (!resp.wasDisconnected()) {
ConsumerMetadataResponse response = new ConsumerMetadataResponse(resp.responseBody());
if (response.errorCode() == Errors.NONE.code())
- return new Node(Integer.MIN_VALUE, response.node().host(), response.node().port());
+ return new Node(Integer.MAX_VALUE - response.node().id(), response.node().host(), response.node().port());
}
}
return null;
http://git-wip-us.apache.org/repos/asf/kafka/blob/0839def4/clients/src/main/java/org/apache/kafka/common/Node.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Node.java b/clients/src/main/java/org/apache/kafka/common/Node.java
index 0e47ff3..88c3b24 100644
--- a/clients/src/main/java/org/apache/kafka/common/Node.java
+++ b/clients/src/main/java/org/apache/kafka/common/Node.java
@@ -82,7 +82,7 @@ public class Node {
@Override
public String toString() {
- return "Node(" + (id < 0 ? "" : id + ", ") + host + ", " + port + ")";
+ return "Node(" + id + ", " + host + ", " + port + ")";
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0839def4/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index ef4c9ae..dc0512b 100644
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -64,6 +64,7 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
val index = TestUtils.random.nextInt(servers.length)
if(alive(index)) {
servers(index).shutdown()
+ servers(index).awaitShutdown()
alive(index) = false
}
index