You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/08 20:33:01 UTC

kafka git commit: KAFKA-2723: new consumer exception cleanup (0.9.0)

Repository: kafka
Updated Branches:
  refs/heads/0.9.0 7710b367f -> 27d44afe6


KAFKA-2723: new consumer exception cleanup (0.9.0)

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Guozhang Wang

Closes #452 from hachikuji/KAFKA-2723


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/27d44afe
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/27d44afe
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/27d44afe

Branch: refs/heads/0.9.0
Commit: 27d44afe664bff45d62f72335fdbb56671561512
Parents: 7710b36
Author: Jason Gustafson <ja...@confluent.io>
Authored: Sun Nov 8 11:38:50 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sun Nov 8 11:38:50 2015 -0800

----------------------------------------------------------------------
 .../clients/consumer/CommitFailedException.java | 35 ++++++++
 .../consumer/InvalidOffsetException.java        | 34 ++++++++
 .../kafka/clients/consumer/KafkaConsumer.java   | 65 +++++++++------
 .../kafka/clients/consumer/MockConsumer.java    |  2 +-
 .../consumer/NoOffsetForPartitionException.java | 25 ++++--
 .../consumer/OffsetOutOfRangeException.java     | 42 ++++++++++
 .../consumer/internals/AbstractCoordinator.java |  1 +
 .../consumer/internals/ConsumerCoordinator.java | 84 ++++++++++++--------
 .../internals/ConsumerNetworkClient.java        | 16 +++-
 .../clients/consumer/internals/Fetcher.java     | 37 ++++++---
 .../errors/OffsetOutOfRangeException.java       | 50 ------------
 .../apache/kafka/common/protocol/Errors.java    |  2 +-
 .../internals/ConsumerCoordinatorTest.java      | 33 +++++++-
 .../clients/consumer/internals/FetcherTest.java |  2 +-
 .../kafka/coordinator/GroupCoordinator.scala    |  2 +-
 .../coordinator/GroupMetadataManager.scala      |  3 +-
 .../kafka/api/ConsumerBounceTest.scala          |  2 +-
 .../kafka/api/PlaintextConsumerTest.scala       |  4 +-
 18 files changed, 300 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
new file mode 100644
index 0000000..39468bd
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.KafkaException;
+
+/**
+ * This exception is raised when an offset commit with {@link KafkaConsumer#commitSync()} fails
+ * with an unrecoverable error. This can happen when a group rebalance completes before the commit
+ * could be successfully applied. In this case, the commit cannot generally be retried because some
+ * of the partitions may have already been assigned to another member in the group.
+ */
+public class CommitFailedException extends KafkaException {
+
+    private static final long serialVersionUID = 1L;
+
+    public CommitFailedException(String message) {
+        super(message);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/clients/src/main/java/org/apache/kafka/clients/consumer/InvalidOffsetException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/InvalidOffsetException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/InvalidOffsetException.java
new file mode 100644
index 0000000..5f8a57f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/InvalidOffsetException.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Set;
+
+/**
+ * Thrown when the offset for a set of partitions is invalid (either undefined or out of range),
+ * and no reset policy has been configured.
+ * @see NoOffsetForPartitionException
+ * @see OffsetOutOfRangeException
+ */
+public abstract class InvalidOffsetException extends KafkaException {
+
+    public InvalidOffsetException(String message) {
+        super(message);
+    }
+
+    public abstract Set<TopicPartition> partitions();
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index d99607d..d3616f9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -752,7 +752,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
-     * Unsubscribe from all topics currently subscribed to.
+     * Unsubscribe from topics currently subscribed with {@link #subscribe(List)}. This
+     * also clears any partitions directly assigned through {@link #assign(List)}.
      */
     public void unsubscribe() {
         acquire();
@@ -800,18 +801,19 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * consumed offset can be manually set through {@link #seek(TopicPartition, long)} or automatically set as the last committed
      * offset for the subscribed list of partitions
      *
+     *
      * @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, returns
-     *            immediately with any records available now. Must not be negative.
+     *            immediately with any records that are available now. Must not be negative.
      * @return map of topic to records since the last fetch for the subscribed list of topics and partitions
      *
-     * @throws NoOffsetForPartitionException if there is no stored offset for a subscribed partition and no automatic
-     *             offset reset policy has been configured.
-     * @throws org.apache.kafka.common.errors.OffsetOutOfRangeException if there is OffsetOutOfRange error in fetchResponse and
-     *             the defaultResetPolicy is NONE
+     * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if the offset for a partition or set of
+     *             partitions is undefined or out of range and no offset reset policy has been configured
      * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
      *             function is called
      * @throws org.apache.kafka.common.errors.AuthorizationException if caller does Read access to any of the subscribed
      *             topics or to the configured groupId
+     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. invalid groupId or
+     *             session timeout, errors deserializing key/value pairs, or any new error cases in future versions)
      */
     @Override
     public ConsumerRecords<K, V> poll(long timeout) {
@@ -849,9 +851,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * heart-beating, auto-commits, and offset updates.
      * @param timeout The maximum time to block in the underlying poll
      * @return The fetched records (may be empty)
-     * @throws org.apache.kafka.common.errors.OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and
-     *         the defaultResetPolicy is NONE
-     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
      */
     private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
         // TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
@@ -879,8 +878,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         return fetcher.fetchedRecords();
     }
 
-
-
     /**
      * Commit offsets returned on the last {@link #poll(long) poll()} for all the subscribed list of topics and partitions.
      * <p>
@@ -891,7 +888,15 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is
      * encountered (in which case it is thrown to the caller).
      *
-     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
+     * @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried.
+     *             This can only occur if you are using automatic group management with {@link #subscribe(List)},
+     *             or if there is an active group with the same groupId which is using group management.
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
+     *             function is called
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
+     *             configured groupId
+     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata
+     *             is too large or if the committed offset is invalid).
      */
     @Override
     public void commitSync() {
@@ -914,10 +919,15 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * encountered (in which case it is thrown to the caller).
      *
      * @param offsets A map of offsets by partition with associated metadata
+     * @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried.
+     *             This can only occur if you are using automatic group management with {@link #subscribe(List)},
+     *             or if there is an active group with the same groupId which is using group management.
      * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
      *             function is called
      * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
      *             configured groupId
+     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata
+     *             is too large or if the committed offset is invalid).
      */
     @Override
     public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
@@ -1002,7 +1012,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
-     * Seek to the first offset for each of the given partitions
+     * Seek to the first offset for each of the given partitions. This function evaluates lazily, seeking to the
+     * final offset in all partitions only when {@link #poll(long)} or {@link #position(TopicPartition)} are called.
      */
     public void seekToBeginning(TopicPartition... partitions) {
         acquire();
@@ -1020,7 +1031,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
 
     /**
      * Seek to the last offset for each of the given partitions. This function evaluates lazily, seeking to the
-     * final offset in all partitions only when poll() or position() are called.
+     * final offset in all partitions only when {@link #poll(long)} or {@link #position(TopicPartition)} are called.
      */
     public void seekToEnd(TopicPartition... partitions) {
         acquire();
@@ -1041,13 +1052,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *
      * @param partition The partition to get the position for
      * @return The offset
-     * @throws NoOffsetForPartitionException If a position hasn't been set for a given partition, and no reset policy is
-     *             available.
-     *
+     * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no offset is currently defined for
+     *             the partition
      * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
      *             function is called
      * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
      *             configured groupId
+     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
      */
     public long position(TopicPartition partition) {
         acquire();
@@ -1078,6 +1089,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *             function is called
      * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
      *             configured groupId
+     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
      */
     @Override
     public OffsetAndMetadata committed(TopicPartition partition) {
@@ -1115,7 +1127,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *
      * @param topic The topic to get partition metadata for
      * @return The list of partitions
-     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
+     *             function is called
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the specified topic
      */
     @Override
     public List<PartitionInfo> partitionsFor(String topic) {
@@ -1134,11 +1148,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
-     * Get metadata about partitions for all topics. This method will issue a remote call to the
-     * server.
-     *
+     * Get metadata about partitions for all topics that the user is authorized to view. This method will issue a
+     * remote call to the server.
+
      * @return The map of topics and its partitions
-     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
+     *             function is called
      */
     @Override
     public Map<String, List<PartitionInfo>> listTopics() {
@@ -1190,8 +1205,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
-     * Close the consumer, waiting indefinitely for any needed cleanup. If auto-commit is
-     * enabled, this will commit the current offsets.
+     * Close the consumer, waiting indefinitely for any needed cleanup. If auto-commit is enabled, this
+     * will commit the current offsets. Note that {@link #wakeup()} cannot be use to interrupt close.
      */
     @Override
     public void close() {
@@ -1234,7 +1249,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * or reset it using the offset reset policy the user has configured.
      *
      * @param partitions The partitions that needs updating fetch positions
-     * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is
+     * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is
      *             defined
      */
     private void updateFetchPositions(Set<TopicPartition> partitions) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 894bc93..72fbe9e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -369,7 +369,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
             if (offset == null)
                 throw new IllegalStateException("MockConsumer didn't have end offset specified, but tried to seek to end");
         } else {
-            throw new NoOffsetForPartitionException("No offset available");
+            throw new NoOffsetForPartitionException(tp);
         }
         seek(tp, offset);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
index a21f97b..70fba36 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
@@ -13,17 +13,32 @@
 
 package org.apache.kafka.clients.consumer;
 
-import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collections;
+import java.util.Set;
 
 /**
- * Indicates that there is no stored offset and no defined offset reset policy
+ * Indicates that there is no stored offset for a partition and no defined offset
+ * reset policy.
  */
-public class NoOffsetForPartitionException extends KafkaException {
+public class NoOffsetForPartitionException extends InvalidOffsetException {
 
     private static final long serialVersionUID = 1L;
 
-    public NoOffsetForPartitionException(String message) {
-        super(message);
+    private final TopicPartition partition;
+
+    public NoOffsetForPartitionException(TopicPartition partition) {
+        super("Undefined offset with no reset policy for partition: " + partition);
+        this.partition = partition;
+    }
+
+    public TopicPartition partition() {
+        return partition;
+    }
+
+    public Set<TopicPartition> partitions() {
+        return Collections.singleton(partition);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetOutOfRangeException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetOutOfRangeException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetOutOfRangeException.java
new file mode 100644
index 0000000..3dd92fb
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetOutOfRangeException.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * No reset policy has been defined, and the offsets for these partitions are either larger or smaller
+ * than the range of offsets the server has for the given partition.
+ */
+public class OffsetOutOfRangeException extends InvalidOffsetException {
+
+    private static final long serialVersionUID = 1L;
+    private final Map<TopicPartition, Long> offsetOutOfRangePartitions;
+
+    public OffsetOutOfRangeException(Map<TopicPartition, Long> offsetOutOfRangePartitions) {
+        super("Offsets out of range with no configured reset policy for partitions: " + offsetOutOfRangePartitions);
+        this.offsetOutOfRangePartitions = offsetOutOfRangePartitions;
+    }
+
+    public Map<TopicPartition, Long> offsetOutOfRangePartitions() {
+        return offsetOutOfRangePartitions;
+    }
+
+    @Override
+    public Set<TopicPartition> partitions() {
+        return offsetOutOfRangePartitions.keySet();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 44371cb..4d964c9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -539,6 +539,7 @@ public abstract class AbstractCoordinator implements Closeable {
      */
     @Override
     public void close() {
+        client.disableWakeups();
         maybeLeaveGroup(true);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 25d389c..1bc4050 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -14,6 +14,7 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
@@ -304,16 +305,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
     @Override
     public void close() {
+        client.disableWakeups();
         try {
-            while (true) {
-                try {
-                    maybeAutoCommitOffsetsSync();
-                    return;
-                } catch (WakeupException e) {
-                    // ignore wakeups while closing to ensure we have a chance to commit
-                    continue;
-                }
-            }
+            maybeAutoCommitOffsetsSync();
         } finally {
             super.close();
         }
@@ -336,6 +330,14 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         });
     }
 
+    /**
+     * Commit offsets synchronously. This method will retry until the commit completes successfully
+     * or an unrecoverable error is encountered.
+     * @param offsets The offsets to be committed
+     * @throws org.apache.kafka.common.errors.AuthorizationException if the consumer is not authorized to the group
+     *             or to any of the specified partitions
+     * @throws CommitFailedException if an unrecoverable error occurs before the commit can be completed
+     */
     public void commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
         if (offsets.isEmpty())
             return;
@@ -450,45 +452,57 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
                 long offset = offsetAndMetadata.offset();
 
-                short errorCode = entry.getValue();
-                if (errorCode == Errors.NONE.code()) {
+                Errors error = Errors.forCode(entry.getValue());
+                if (error == Errors.NONE) {
                     log.debug("Committed offset {} for partition {}", offset, tp);
                     if (subscriptions.isAssigned(tp))
                         // update the local cache only if the partition is still assigned
                         subscriptions.committed(tp, offsetAndMetadata);
-                } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
+                } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+                    log.error("Unauthorized to commit for group {}", groupId);
                     future.raise(new GroupAuthorizationException(groupId));
                     return;
-                } else if (errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) {
+                } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                     unauthorizedTopics.add(tp.topic());
+                } else if (error == Errors.OFFSET_METADATA_TOO_LARGE
+                        || error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
+                    // raise the error to the user
+                    log.info("Offset commit for group {} failed on partition {} due to {}, will retry", groupId, tp, error);
+                    future.raise(error);
+                    return;
+                } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
+                    // just retry
+                    log.info("Offset commit for group {} failed due to {}, will retry", groupId, error);
+                    future.raise(error);
+                    return;
+                } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
+                        || error == Errors.NOT_COORDINATOR_FOR_GROUP
+                        || error == Errors.REQUEST_TIMED_OUT) {
+                    log.info("Offset commit for group {} failed due to {}, will find new coordinator and retry", groupId, error);
+                    coordinatorDead();
+                    future.raise(error);
+                    return;
+                } else if (error == Errors.UNKNOWN_MEMBER_ID
+                        || error == Errors.ILLEGAL_GENERATION
+                        || error == Errors.REBALANCE_IN_PROGRESS) {
+                    // need to re-join group
+                    log.error("Error {} occurred while committing offsets for group {}", error, groupId);
+                    subscriptions.needReassignment();
+                    future.raise(new CommitFailedException("Commit cannot be completed due to group rebalance"));
+                    return;
                 } else {
-                    if (errorCode == Errors.GROUP_LOAD_IN_PROGRESS.code()) {
-                        // just retry
-                        future.raise(Errors.GROUP_LOAD_IN_PROGRESS);
-                    } else if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()
-                            || errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
-                        coordinatorDead();
-                    } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()
-                            || errorCode == Errors.ILLEGAL_GENERATION.code()
-                            || errorCode == Errors.REBALANCE_IN_PROGRESS.code()) {
-                        // need to re-join group
-                        subscriptions.needReassignment();
-                    }
-
-                    log.error("Error committing partition {} at offset {}: {}",
-                            tp,
-                            offset,
-                            Errors.forCode(errorCode).exception().getMessage());
-
-                    future.raise(Errors.forCode(errorCode));
+                    log.error("Error committing partition {} at offset {}: {}", tp, offset, error.exception().getMessage());
+                    future.raise(new KafkaException("Unexpected error in commit: " + error.exception().getMessage()));
                     return;
                 }
             }
 
-            if (!unauthorizedTopics.isEmpty())
+            if (!unauthorizedTopics.isEmpty()) {
+                log.error("Unauthorized to commit to topics {}", unauthorizedTopics);
                 future.raise(new TopicAuthorizationException(unauthorizedTopics));
-            else
+            } else {
                 future.complete(null);
+            }
         }
     }
 
@@ -503,7 +517,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         if (coordinatorUnknown())
             return RequestFuture.coordinatorNotAvailable();
 
-        log.debug("Fetching committed offsets for partitions: {}",  Utils.join(partitions, ", "));
+        log.debug("Fetching committed offsets for partitions: {}",  partitions);
         // construct the request
         OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<TopicPartition>(partitions));
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index f1f1cc7..20eb45d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -52,6 +52,7 @@ public class ConsumerNetworkClient implements Closeable {
     private final Metadata metadata;
     private final Time time;
     private final long retryBackoffMs;
+    private boolean wakeupsEnabled = true;
 
     public ConsumerNetworkClient(KafkaClient client,
                                  Metadata metadata,
@@ -140,8 +141,10 @@ public class ConsumerNetworkClient implements Closeable {
      * on the current poll if one is active, or the next poll.
      */
     public void wakeup() {
-        this.wakeup.set(true);
-        this.client.wakeup();
+        if (wakeupsEnabled) {
+            this.wakeup.set(true);
+            this.client.wakeup();
+        }
     }
 
     /**
@@ -305,6 +308,15 @@ public class ConsumerNetworkClient implements Closeable {
         }
     }
 
+    public void disableWakeups() {
+        this.wakeup.set(false);
+        this.wakeupsEnabled = false;
+    }
+
+    public void enableWakeups() {
+        this.wakeupsEnabled = true;
+    }
+
     @Override
     public void close() throws IOException {
         client.close();

http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 7c5bca6..5907aca 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -17,13 +17,18 @@ import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.*;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.InvalidMetadataException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
@@ -50,11 +55,11 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.HashSet;
 
 /**
  * This class manage the fetching process with the brokers.
@@ -241,7 +246,7 @@ public class Fetcher<K, V> {
         else if (strategy == OffsetResetStrategy.LATEST)
             timestamp = ListOffsetRequest.LATEST_TIMESTAMP;
         else
-            throw new NoOffsetForPartitionException("No offset is set and no reset policy is defined");
+            throw new NoOffsetForPartitionException(partition);
 
         log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase());
         long offset = listOffset(partition, timestamp);
@@ -318,7 +323,7 @@ public class Fetcher<K, V> {
      *
      * @throws RecordTooLargeException If there is a message larger than fetch size and hence cannot be ever returned
      */
-    private void throwIfRecordTooLarge() throws OffsetOutOfRangeException {
+    private void throwIfRecordTooLarge() throws RecordTooLargeException {
         Map<TopicPartition, Long> copiedRecordTooLargePartitions = new HashMap<>(this.recordTooLargePartitions);
         this.recordTooLargePartitions.clear();
 
@@ -576,16 +581,22 @@ public class Fetcher<K, V> {
      * Parse the record entry, deserializing the key / value fields if necessary
      */
     private ConsumerRecord<K, V> parseRecord(TopicPartition partition, LogEntry logEntry) {
-        if (this.checkCrcs)
-            logEntry.record().ensureValid();
-
-        long offset = logEntry.offset();
-        ByteBuffer keyBytes = logEntry.record().key();
-        K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), Utils.toArray(keyBytes));
-        ByteBuffer valueBytes = logEntry.record().value();
-        V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), Utils.toArray(valueBytes));
+        try {
+            if (this.checkCrcs)
+                logEntry.record().ensureValid();
+            long offset = logEntry.offset();
+            ByteBuffer keyBytes = logEntry.record().key();
+            K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), Utils.toArray(keyBytes));
+            ByteBuffer valueBytes = logEntry.record().value();
+            V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), Utils.toArray(valueBytes));
+
+            return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, key, value);
+        } catch (KafkaException e) {
+            throw e;
+        } catch (RuntimeException e) {
+            throw new KafkaException("Error deserializing key/value for partition " + partition + " at offset " + logEntry.offset(), e);
+        }
 
-        return new ConsumerRecord<K, V>(partition.topic(), partition.partition(), offset, key, value);
     }
 
     private static class PartitionRecords<K, V> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java b/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java
deleted file mode 100644
index 4983bc0..0000000
--- a/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.common.errors;
-
-import org.apache.kafka.common.TopicPartition;
-import java.util.Map;
-
-/**
- * This offset is either larger or smaller than the range of offsets the server has for the given partition.
- * 
- */
-public class OffsetOutOfRangeException extends RetriableException {
-
-    private static final long serialVersionUID = 1L;
-    private Map<TopicPartition, Long> offsetOutOfRangePartitions = null;
-
-    public OffsetOutOfRangeException() {
-    }
-
-    public OffsetOutOfRangeException(Map<TopicPartition, Long> offsetOutOfRangePartitions) {
-        this.offsetOutOfRangePartitions = offsetOutOfRangePartitions;
-    }
-
-    public OffsetOutOfRangeException(String message) {
-        super(message);
-    }
-
-    public OffsetOutOfRangeException(Throwable cause) {
-        super(cause);
-    }
-
-    public OffsetOutOfRangeException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public Map<TopicPartition, Long> offsetOutOfRangePartitions() {
-        return offsetOutOfRangePartitions;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 516e50b..dd4f39c 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -34,7 +34,7 @@ public enum Errors {
     UNKNOWN(-1, new UnknownServerException("The server experienced an unexpected error when processing the request")),
     NONE(0, null),
     OFFSET_OUT_OF_RANGE(1,
-            new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")),
+            new ApiException("The requested offset is not within the range of offsets maintained by the server.")),
     CORRUPT_MESSAGE(2,
             new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")),
     UNKNOWN_TOPIC_OR_PARTITION(3,

http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 8e47fc3..86ac6b3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
@@ -796,7 +797,37 @@ public class ConsumerCoordinatorTest {
         coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")));
     }
 
-    @Test(expected = ApiException.class)
+    @Test(expected = CommitFailedException.class)
+    public void testCommitOffsetIllegalGeneration() {
+        // we cannot retry if a rebalance occurs before the commit completed
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.ILLEGAL_GENERATION.code())));
+        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")));
+    }
+
+    @Test(expected = CommitFailedException.class)
+    public void testCommitOffsetUnknownMemberId() {
+        // we cannot retry if a rebalance occurs before the commit completed
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.UNKNOWN_MEMBER_ID.code())));
+        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")));
+    }
+
+    @Test(expected = CommitFailedException.class)
+    public void testCommitOffsetRebalanceInProgress() {
+        // we cannot retry if a rebalance occurs before the commit completed
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.REBALANCE_IN_PROGRESS.code())));
+        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")));
+    }
+
+    @Test(expected = KafkaException.class)
     public void testCommitOffsetSyncCallbackWithNonRetriableException() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
         coordinator.ensureCoordinatorKnown();

http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 7b1e4cb..fe9a6aa 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -28,7 +28,7 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.metrics.KafkaMetric;

http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 900830f..4d69840 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -288,7 +288,7 @@ class GroupCoordinator(val brokerId: Int,
                     if (errorCode != Errors.NONE.code) {
                       resetAndPropagateAssignmentError(group, errorCode)
                       maybePrepareRebalance(group)
-                    } else if (group.is(AwaitingSync)) {
+                    } else {
                       setAndPropagateAssignment(group, assignment)
                       group.transitionTo(Stable)
                     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 0c8333f..f6b8103 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -402,7 +402,8 @@ class GroupMetadataManager(val brokerId: Int,
                     val groupId = baseKey.key.asInstanceOf[String]
                     val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, msgAndOffset.message.payload)
 
-                    addGroup(groupId, groupMetadata)
+                    if (groupMetadata != null)
+                      addGroup(groupId, groupMetadata)
                   }
 
                   currOffset = msgAndOffset.nextOffset

http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index f2b0f85..029eaf4 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -110,7 +110,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
       } catch {
         // TODO: should be no need to catch these exceptions once KAFKA-2017 is
         // merged since coordinator fail-over will not cause a rebalance
-        case _: UnknownMemberIdException | _: IllegalGenerationException =>
+        case _: CommitFailedException =>
       }
     }
     scheduler.shutdown()

http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 2e7471c..93bb229 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -15,11 +15,11 @@ package kafka.api
 import java.util.regex.Pattern
 
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.{NoOffsetForPartitionException, OffsetAndMetadata, KafkaConsumer, ConsumerConfig, RoundRobinAssignor}
+import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
-import org.apache.kafka.common.errors.{OffsetOutOfRangeException, RecordTooLargeException}
+import org.apache.kafka.common.errors.RecordTooLargeException
 import org.junit.Assert._
 import org.junit.Test
 import scala.collection.mutable.Buffer