You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/03/12 00:25:41 UTC
kafka git commit: KAFKA-1910 Follow-up again;
fix ListOffsetResponse handling for the expected error codes
Repository: kafka
Updated Branches:
refs/heads/trunk 1eb5f53aa -> 01f20e029
KAFKA-1910 Follow-up again; fix ListOffsetResponse handling for the expected error codes
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/01f20e02
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/01f20e02
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/01f20e02
Branch: refs/heads/trunk
Commit: 01f20e029fbd068a8493492a2c4a1364a5668579
Parents: 1eb5f53
Author: Guozhang Wang <wa...@gmail.com>
Authored: Wed Mar 11 16:25:21 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Mar 11 16:25:21 2015 -0700
----------------------------------------------------------------------
.../org/apache/kafka/clients/consumer/internals/Coordinator.java | 2 +-
.../org/apache/kafka/clients/consumer/internals/Fetcher.java | 3 ++-
.../org/apache/kafka/common/requests/ListOffsetResponse.java | 4 +++-
core/src/test/scala/integration/kafka/api/ConsumerTest.scala | 1 +
4 files changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/01f20e02/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index e972efb..436f9b2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -129,7 +129,7 @@ public final class Coordinator {
// process the response
JoinGroupResponse response = new JoinGroupResponse(resp.responseBody());
- // TODO: needs to handle disconnects and errors
+ // TODO: needs to handle disconnects and errors, should not just throw exceptions
Errors.forCode(response.errorCode()).maybeThrow();
this.consumerId = response.consumerId();
http://git-wip-us.apache.org/repos/asf/kafka/blob/01f20e02/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 27c78b8..8b71fba 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -231,11 +231,12 @@ public class Fetcher<K, V> {
log.debug("Fetched offset {} for partition {}", offset, topicPartition);
return offset;
} else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
- || errorCode == Errors.LEADER_NOT_AVAILABLE.code()) {
+ || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
topicPartition);
awaitMetadataUpdate();
} else {
+ // TODO: we should not just throw exceptions but should handle and log it.
Errors.forCode(errorCode).maybeThrow();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/01f20e02/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index af704f3..f706086 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -45,7 +45,9 @@ public class ListOffsetResponse extends AbstractRequestResponse {
/**
* Possible error code:
*
- * TODO
+ * UNKNOWN_TOPIC_OR_PARTITION (3)
+ * NOT_LEADER_FOR_PARTITION (6)
+ * UNKNOWN (-1)
*/
private static final String OFFSETS_KEY_NAME = "offsets";
http://git-wip-us.apache.org/repos/asf/kafka/blob/01f20e02/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
index fed37e3..c82bdaa 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -261,6 +261,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
override def doWork(): Unit = {
killRandomBroker()
+ Thread.sleep(500)
restartDeadBrokers()
iter += 1