You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/03/12 00:25:41 UTC

kafka git commit: KAFKA-1910 Follow-up again; fix ListOffsetResponse handling for the expected error codes

Repository: kafka
Updated Branches:
  refs/heads/trunk 1eb5f53aa -> 01f20e029


KAFKA-1910 Follow-up again; fix ListOffsetResponse handling for the expected error codes


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/01f20e02
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/01f20e02
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/01f20e02

Branch: refs/heads/trunk
Commit: 01f20e029fbd068a8493492a2c4a1364a5668579
Parents: 1eb5f53
Author: Guozhang Wang <wa...@gmail.com>
Authored: Wed Mar 11 16:25:21 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Mar 11 16:25:21 2015 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/consumer/internals/Coordinator.java | 2 +-
 .../org/apache/kafka/clients/consumer/internals/Fetcher.java     | 3 ++-
 .../org/apache/kafka/common/requests/ListOffsetResponse.java     | 4 +++-
 core/src/test/scala/integration/kafka/api/ConsumerTest.scala     | 1 +
 4 files changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/01f20e02/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index e972efb..436f9b2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -129,7 +129,7 @@ public final class Coordinator {
 
         // process the response
         JoinGroupResponse response = new JoinGroupResponse(resp.responseBody());
-        // TODO: needs to handle disconnects and errors
+        // TODO: needs to handle disconnects and errors, should not just throw exceptions
         Errors.forCode(response.errorCode()).maybeThrow();
         this.consumerId = response.consumerId();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/01f20e02/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 27c78b8..8b71fba 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -231,11 +231,12 @@ public class Fetcher<K, V> {
                         log.debug("Fetched offset {} for partition {}", offset, topicPartition);
                         return offset;
                     } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
-                        || errorCode == Errors.LEADER_NOT_AVAILABLE.code()) {
+                        || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
                         log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
                             topicPartition);
                         awaitMetadataUpdate();
                     } else {
+                        // TODO: we should not just throw exceptions but should handle and log it.
                         Errors.forCode(errorCode).maybeThrow();
                     }
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/01f20e02/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index af704f3..f706086 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -45,7 +45,9 @@ public class ListOffsetResponse extends AbstractRequestResponse {
     /**
      * Possible error code:
      *
-     * TODO
+     *  UNKNOWN_TOPIC_OR_PARTITION (3)
+     *  NOT_LEADER_FOR_PARTITION (6)
+     *  UNKNOWN (-1)
      */
 
     private static final String OFFSETS_KEY_NAME = "offsets";

http://git-wip-us.apache.org/repos/asf/kafka/blob/01f20e02/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
index fed37e3..c82bdaa 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -261,6 +261,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
 
     override def doWork(): Unit = {
       killRandomBroker()
+      Thread.sleep(500)
       restartDeadBrokers()
 
       iter += 1