You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/08 20:33:01 UTC
kafka git commit: KAFKA-2723: new consumer exception cleanup (0.9.0)
Repository: kafka
Updated Branches:
refs/heads/0.9.0 7710b367f -> 27d44afe6
KAFKA-2723: new consumer exception cleanup (0.9.0)
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Guozhang Wang
Closes #452 from hachikuji/KAFKA-2723
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/27d44afe
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/27d44afe
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/27d44afe
Branch: refs/heads/0.9.0
Commit: 27d44afe664bff45d62f72335fdbb56671561512
Parents: 7710b36
Author: Jason Gustafson <ja...@confluent.io>
Authored: Sun Nov 8 11:38:50 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sun Nov 8 11:38:50 2015 -0800
----------------------------------------------------------------------
.../clients/consumer/CommitFailedException.java | 35 ++++++++
.../consumer/InvalidOffsetException.java | 34 ++++++++
.../kafka/clients/consumer/KafkaConsumer.java | 65 +++++++++------
.../kafka/clients/consumer/MockConsumer.java | 2 +-
.../consumer/NoOffsetForPartitionException.java | 25 ++++--
.../consumer/OffsetOutOfRangeException.java | 42 ++++++++++
.../consumer/internals/AbstractCoordinator.java | 1 +
.../consumer/internals/ConsumerCoordinator.java | 84 ++++++++++++--------
.../internals/ConsumerNetworkClient.java | 16 +++-
.../clients/consumer/internals/Fetcher.java | 37 ++++++---
.../errors/OffsetOutOfRangeException.java | 50 ------------
.../apache/kafka/common/protocol/Errors.java | 2 +-
.../internals/ConsumerCoordinatorTest.java | 33 +++++++-
.../clients/consumer/internals/FetcherTest.java | 2 +-
.../kafka/coordinator/GroupCoordinator.scala | 2 +-
.../coordinator/GroupMetadataManager.scala | 3 +-
.../kafka/api/ConsumerBounceTest.scala | 2 +-
.../kafka/api/PlaintextConsumerTest.scala | 4 +-
18 files changed, 300 insertions(+), 139 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
new file mode 100644
index 0000000..39468bd
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/CommitFailedException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.KafkaException;
+
+/**
+ * This exception is raised when an offset commit with {@link KafkaConsumer#commitSync()} fails
+ * with an unrecoverable error. This can happen when a group rebalance completes before the commit
+ * could be successfully applied. In this case, the commit cannot generally be retried because some
+ * of the partitions may have already been assigned to another member in the group.
+ */
+public class CommitFailedException extends KafkaException {
+
+ private static final long serialVersionUID = 1L;
+
+ public CommitFailedException(String message) {
+ super(message);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/clients/src/main/java/org/apache/kafka/clients/consumer/InvalidOffsetException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/InvalidOffsetException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/InvalidOffsetException.java
new file mode 100644
index 0000000..5f8a57f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/InvalidOffsetException.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Set;
+
+/**
+ * Thrown when the offset for a set of partitions is invalid (either undefined or out of range),
+ * and no reset policy has been configured.
+ * @see NoOffsetForPartitionException
+ * @see OffsetOutOfRangeException
+ */
+public abstract class InvalidOffsetException extends KafkaException {
+
+ public InvalidOffsetException(String message) {
+ super(message);
+ }
+
+ public abstract Set<TopicPartition> partitions();
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index d99607d..d3616f9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -752,7 +752,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
/**
- * Unsubscribe from all topics currently subscribed to.
+ * Unsubscribe from topics currently subscribed with {@link #subscribe(List)}. This
+ * also clears any partitions directly assigned through {@link #assign(List)}.
*/
public void unsubscribe() {
acquire();
@@ -800,18 +801,19 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* consumed offset can be manually set through {@link #seek(TopicPartition, long)} or automatically set as the last committed
* offset for the subscribed list of partitions
*
+ *
* @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, returns
- * immediately with any records available now. Must not be negative.
+ * immediately with any records that are available now. Must not be negative.
* @return map of topic to records since the last fetch for the subscribed list of topics and partitions
*
- * @throws NoOffsetForPartitionException if there is no stored offset for a subscribed partition and no automatic
- * offset reset policy has been configured.
- * @throws org.apache.kafka.common.errors.OffsetOutOfRangeException if there is OffsetOutOfRange error in fetchResponse and
- * the defaultResetPolicy is NONE
+ * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if the offset for a partition or set of
+ * partitions is undefined or out of range and no offset reset policy has been configured
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
* @throws org.apache.kafka.common.errors.AuthorizationException if caller does Read access to any of the subscribed
* topics or to the configured groupId
+ * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. invalid groupId or
+ * session timeout, errors deserializing key/value pairs, or any new error cases in future versions)
*/
@Override
public ConsumerRecords<K, V> poll(long timeout) {
@@ -849,9 +851,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* heart-beating, auto-commits, and offset updates.
* @param timeout The maximum time to block in the underlying poll
* @return The fetched records (may be empty)
- * @throws org.apache.kafka.common.errors.OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and
- * the defaultResetPolicy is NONE
- * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
*/
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
// TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
@@ -879,8 +878,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
return fetcher.fetchedRecords();
}
-
-
/**
* Commit offsets returned on the last {@link #poll(long) poll()} for all the subscribed list of topics and partitions.
* <p>
@@ -891,7 +888,15 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is
* encountered (in which case it is thrown to the caller).
*
- * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
+ * @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried.
+ * This can only occur if you are using automatic group management with {@link #subscribe(List)},
+ * or if there is an active group with the same groupId which is using group management.
+ * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
+ * function is called
+ * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
+ * configured groupId
+ * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata
+ * is too large or if the committed offset is invalid).
*/
@Override
public void commitSync() {
@@ -914,10 +919,15 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* encountered (in which case it is thrown to the caller).
*
* @param offsets A map of offsets by partition with associated metadata
+ * @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried.
+ * This can only occur if you are using automatic group management with {@link #subscribe(List)},
+ * or if there is an active group with the same groupId which is using group management.
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
* configured groupId
+ * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata
+ * is too large or if the committed offset is invalid).
*/
@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
@@ -1002,7 +1012,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
/**
- * Seek to the first offset for each of the given partitions
+ * Seek to the first offset for each of the given partitions. This function evaluates lazily, seeking to the
+ * final offset in all partitions only when {@link #poll(long)} or {@link #position(TopicPartition)} are called.
*/
public void seekToBeginning(TopicPartition... partitions) {
acquire();
@@ -1020,7 +1031,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
/**
* Seek to the last offset for each of the given partitions. This function evaluates lazily, seeking to the
- * final offset in all partitions only when poll() or position() are called.
+ * final offset in all partitions only when {@link #poll(long)} or {@link #position(TopicPartition)} are called.
*/
public void seekToEnd(TopicPartition... partitions) {
acquire();
@@ -1041,13 +1052,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*
* @param partition The partition to get the position for
* @return The offset
- * @throws NoOffsetForPartitionException If a position hasn't been set for a given partition, and no reset policy is
- * available.
- *
+ * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no offset is currently defined for
+ * the partition
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
* configured groupId
+ * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
*/
public long position(TopicPartition partition) {
acquire();
@@ -1078,6 +1089,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* function is called
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
* configured groupId
+ * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
*/
@Override
public OffsetAndMetadata committed(TopicPartition partition) {
@@ -1115,7 +1127,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*
* @param topic The topic to get partition metadata for
* @return The list of partitions
- * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
+ * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
+ * function is called
+ * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the specified topic
*/
@Override
public List<PartitionInfo> partitionsFor(String topic) {
@@ -1134,11 +1148,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
/**
- * Get metadata about partitions for all topics. This method will issue a remote call to the
- * server.
- *
+ * Get metadata about partitions for all topics that the user is authorized to view. This method will issue a
+ * remote call to the server.
+
* @return The map of topics and its partitions
- * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
+ * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
+ * function is called
*/
@Override
public Map<String, List<PartitionInfo>> listTopics() {
@@ -1190,8 +1205,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
/**
- * Close the consumer, waiting indefinitely for any needed cleanup. If auto-commit is
- * enabled, this will commit the current offsets.
+ * Close the consumer, waiting indefinitely for any needed cleanup. If auto-commit is enabled, this
+ * will commit the current offsets. Note that {@link #wakeup()} cannot be use to interrupt close.
*/
@Override
public void close() {
@@ -1234,7 +1249,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* or reset it using the offset reset policy the user has configured.
*
* @param partitions The partitions that needs updating fetch positions
- * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is
+ * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is
* defined
*/
private void updateFetchPositions(Set<TopicPartition> partitions) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 894bc93..72fbe9e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -369,7 +369,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
if (offset == null)
throw new IllegalStateException("MockConsumer didn't have end offset specified, but tried to seek to end");
} else {
- throw new NoOffsetForPartitionException("No offset available");
+ throw new NoOffsetForPartitionException(tp);
}
seek(tp, offset);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
index a21f97b..70fba36 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
@@ -13,17 +13,32 @@
package org.apache.kafka.clients.consumer;
-import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collections;
+import java.util.Set;
/**
- * Indicates that there is no stored offset and no defined offset reset policy
+ * Indicates that there is no stored offset for a partition and no defined offset
+ * reset policy.
*/
-public class NoOffsetForPartitionException extends KafkaException {
+public class NoOffsetForPartitionException extends InvalidOffsetException {
private static final long serialVersionUID = 1L;
- public NoOffsetForPartitionException(String message) {
- super(message);
+ private final TopicPartition partition;
+
+ public NoOffsetForPartitionException(TopicPartition partition) {
+ super("Undefined offset with no reset policy for partition: " + partition);
+ this.partition = partition;
+ }
+
+ public TopicPartition partition() {
+ return partition;
+ }
+
+ public Set<TopicPartition> partitions() {
+ return Collections.singleton(partition);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetOutOfRangeException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetOutOfRangeException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetOutOfRangeException.java
new file mode 100644
index 0000000..3dd92fb
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetOutOfRangeException.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * No reset policy has been defined, and the offsets for these partitions are either larger or smaller
+ * than the range of offsets the server has for the given partition.
+ */
+public class OffsetOutOfRangeException extends InvalidOffsetException {
+
+ private static final long serialVersionUID = 1L;
+ private final Map<TopicPartition, Long> offsetOutOfRangePartitions;
+
+ public OffsetOutOfRangeException(Map<TopicPartition, Long> offsetOutOfRangePartitions) {
+ super("Offsets out of range with no configured reset policy for partitions: " + offsetOutOfRangePartitions);
+ this.offsetOutOfRangePartitions = offsetOutOfRangePartitions;
+ }
+
+ public Map<TopicPartition, Long> offsetOutOfRangePartitions() {
+ return offsetOutOfRangePartitions;
+ }
+
+ @Override
+ public Set<TopicPartition> partitions() {
+ return offsetOutOfRangePartitions.keySet();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 44371cb..4d964c9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -539,6 +539,7 @@ public abstract class AbstractCoordinator implements Closeable {
*/
@Override
public void close() {
+ client.disableWakeups();
maybeLeaveGroup(true);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 25d389c..1bc4050 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -14,6 +14,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
@@ -304,16 +305,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
@Override
public void close() {
+ client.disableWakeups();
try {
- while (true) {
- try {
- maybeAutoCommitOffsetsSync();
- return;
- } catch (WakeupException e) {
- // ignore wakeups while closing to ensure we have a chance to commit
- continue;
- }
- }
+ maybeAutoCommitOffsetsSync();
} finally {
super.close();
}
@@ -336,6 +330,14 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
});
}
+ /**
+ * Commit offsets synchronously. This method will retry until the commit completes successfully
+ * or an unrecoverable error is encountered.
+ * @param offsets The offsets to be committed
+ * @throws org.apache.kafka.common.errors.AuthorizationException if the consumer is not authorized to the group
+ * or to any of the specified partitions
+ * @throws CommitFailedException if an unrecoverable error occurs before the commit can be completed
+ */
public void commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
if (offsets.isEmpty())
return;
@@ -450,45 +452,57 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
long offset = offsetAndMetadata.offset();
- short errorCode = entry.getValue();
- if (errorCode == Errors.NONE.code()) {
+ Errors error = Errors.forCode(entry.getValue());
+ if (error == Errors.NONE) {
log.debug("Committed offset {} for partition {}", offset, tp);
if (subscriptions.isAssigned(tp))
// update the local cache only if the partition is still assigned
subscriptions.committed(tp, offsetAndMetadata);
- } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
+ } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+ log.error("Unauthorized to commit for group {}", groupId);
future.raise(new GroupAuthorizationException(groupId));
return;
- } else if (errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) {
+ } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
unauthorizedTopics.add(tp.topic());
+ } else if (error == Errors.OFFSET_METADATA_TOO_LARGE
+ || error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
+ // raise the error to the user
+ log.info("Offset commit for group {} failed on partition {} due to {}, will retry", groupId, tp, error);
+ future.raise(error);
+ return;
+ } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
+ // just retry
+ log.info("Offset commit for group {} failed due to {}, will retry", groupId, error);
+ future.raise(error);
+ return;
+ } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
+ || error == Errors.NOT_COORDINATOR_FOR_GROUP
+ || error == Errors.REQUEST_TIMED_OUT) {
+ log.info("Offset commit for group {} failed due to {}, will find new coordinator and retry", groupId, error);
+ coordinatorDead();
+ future.raise(error);
+ return;
+ } else if (error == Errors.UNKNOWN_MEMBER_ID
+ || error == Errors.ILLEGAL_GENERATION
+ || error == Errors.REBALANCE_IN_PROGRESS) {
+ // need to re-join group
+ log.error("Error {} occurred while committing offsets for group {}", error, groupId);
+ subscriptions.needReassignment();
+ future.raise(new CommitFailedException("Commit cannot be completed due to group rebalance"));
+ return;
} else {
- if (errorCode == Errors.GROUP_LOAD_IN_PROGRESS.code()) {
- // just retry
- future.raise(Errors.GROUP_LOAD_IN_PROGRESS);
- } else if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()
- || errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
- coordinatorDead();
- } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()
- || errorCode == Errors.ILLEGAL_GENERATION.code()
- || errorCode == Errors.REBALANCE_IN_PROGRESS.code()) {
- // need to re-join group
- subscriptions.needReassignment();
- }
-
- log.error("Error committing partition {} at offset {}: {}",
- tp,
- offset,
- Errors.forCode(errorCode).exception().getMessage());
-
- future.raise(Errors.forCode(errorCode));
+ log.error("Error committing partition {} at offset {}: {}", tp, offset, error.exception().getMessage());
+ future.raise(new KafkaException("Unexpected error in commit: " + error.exception().getMessage()));
return;
}
}
- if (!unauthorizedTopics.isEmpty())
+ if (!unauthorizedTopics.isEmpty()) {
+ log.error("Unauthorized to commit to topics {}", unauthorizedTopics);
future.raise(new TopicAuthorizationException(unauthorizedTopics));
- else
+ } else {
future.complete(null);
+ }
}
}
@@ -503,7 +517,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
if (coordinatorUnknown())
return RequestFuture.coordinatorNotAvailable();
- log.debug("Fetching committed offsets for partitions: {}", Utils.join(partitions, ", "));
+ log.debug("Fetching committed offsets for partitions: {}", partitions);
// construct the request
OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<TopicPartition>(partitions));
http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index f1f1cc7..20eb45d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -52,6 +52,7 @@ public class ConsumerNetworkClient implements Closeable {
private final Metadata metadata;
private final Time time;
private final long retryBackoffMs;
+ private boolean wakeupsEnabled = true;
public ConsumerNetworkClient(KafkaClient client,
Metadata metadata,
@@ -140,8 +141,10 @@ public class ConsumerNetworkClient implements Closeable {
* on the current poll if one is active, or the next poll.
*/
public void wakeup() {
- this.wakeup.set(true);
- this.client.wakeup();
+ if (wakeupsEnabled) {
+ this.wakeup.set(true);
+ this.client.wakeup();
+ }
}
/**
@@ -305,6 +308,15 @@ public class ConsumerNetworkClient implements Closeable {
}
}
+ public void disableWakeups() {
+ this.wakeup.set(false);
+ this.wakeupsEnabled = false;
+ }
+
+ public void enableWakeups() {
+ this.wakeupsEnabled = true;
+ }
+
@Override
public void close() throws IOException {
client.close();
http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 7c5bca6..5907aca 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -17,13 +17,18 @@ import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.*;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.InvalidMetadataException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
@@ -50,11 +55,11 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.HashSet;
/**
* This class manage the fetching process with the brokers.
@@ -241,7 +246,7 @@ public class Fetcher<K, V> {
else if (strategy == OffsetResetStrategy.LATEST)
timestamp = ListOffsetRequest.LATEST_TIMESTAMP;
else
- throw new NoOffsetForPartitionException("No offset is set and no reset policy is defined");
+ throw new NoOffsetForPartitionException(partition);
log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase());
long offset = listOffset(partition, timestamp);
@@ -318,7 +323,7 @@ public class Fetcher<K, V> {
*
* @throws RecordTooLargeException If there is a message larger than fetch size and hence cannot be ever returned
*/
- private void throwIfRecordTooLarge() throws OffsetOutOfRangeException {
+ private void throwIfRecordTooLarge() throws RecordTooLargeException {
Map<TopicPartition, Long> copiedRecordTooLargePartitions = new HashMap<>(this.recordTooLargePartitions);
this.recordTooLargePartitions.clear();
@@ -576,16 +581,22 @@ public class Fetcher<K, V> {
* Parse the record entry, deserializing the key / value fields if necessary
*/
private ConsumerRecord<K, V> parseRecord(TopicPartition partition, LogEntry logEntry) {
- if (this.checkCrcs)
- logEntry.record().ensureValid();
-
- long offset = logEntry.offset();
- ByteBuffer keyBytes = logEntry.record().key();
- K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), Utils.toArray(keyBytes));
- ByteBuffer valueBytes = logEntry.record().value();
- V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), Utils.toArray(valueBytes));
+ try {
+ if (this.checkCrcs)
+ logEntry.record().ensureValid();
+ long offset = logEntry.offset();
+ ByteBuffer keyBytes = logEntry.record().key();
+ K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), Utils.toArray(keyBytes));
+ ByteBuffer valueBytes = logEntry.record().value();
+ V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), Utils.toArray(valueBytes));
+
+ return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, key, value);
+ } catch (KafkaException e) {
+ throw e;
+ } catch (RuntimeException e) {
+ throw new KafkaException("Error deserializing key/value for partition " + partition + " at offset " + logEntry.offset(), e);
+ }
- return new ConsumerRecord<K, V>(partition.topic(), partition.partition(), offset, key, value);
}
private static class PartitionRecords<K, V> {
http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java b/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java
deleted file mode 100644
index 4983bc0..0000000
--- a/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.common.errors;
-
-import org.apache.kafka.common.TopicPartition;
-import java.util.Map;
-
-/**
- * This offset is either larger or smaller than the range of offsets the server has for the given partition.
- *
- */
-public class OffsetOutOfRangeException extends RetriableException {
-
- private static final long serialVersionUID = 1L;
- private Map<TopicPartition, Long> offsetOutOfRangePartitions = null;
-
- public OffsetOutOfRangeException() {
- }
-
- public OffsetOutOfRangeException(Map<TopicPartition, Long> offsetOutOfRangePartitions) {
- this.offsetOutOfRangePartitions = offsetOutOfRangePartitions;
- }
-
- public OffsetOutOfRangeException(String message) {
- super(message);
- }
-
- public OffsetOutOfRangeException(Throwable cause) {
- super(cause);
- }
-
- public OffsetOutOfRangeException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public Map<TopicPartition, Long> offsetOutOfRangePartitions() {
- return offsetOutOfRangePartitions;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 516e50b..dd4f39c 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -34,7 +34,7 @@ public enum Errors {
UNKNOWN(-1, new UnknownServerException("The server experienced an unexpected error when processing the request")),
NONE(0, null),
OFFSET_OUT_OF_RANGE(1,
- new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")),
+ new ApiException("The requested offset is not within the range of offsets maintained by the server.")),
CORRUPT_MESSAGE(2,
new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")),
UNKNOWN_TOPIC_OR_PARTITION(3,
http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 8e47fc3..86ac6b3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
@@ -796,7 +797,37 @@ public class ConsumerCoordinatorTest {
coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")));
}
- @Test(expected = ApiException.class)
+ @Test(expected = CommitFailedException.class)
+ public void testCommitOffsetIllegalGeneration() {
+ // we cannot retry if a rebalance occurs before the commit completed
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.ILLEGAL_GENERATION.code())));
+ coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")));
+ }
+
+ @Test(expected = CommitFailedException.class)
+ public void testCommitOffsetUnknownMemberId() {
+ // we cannot retry if a rebalance occurs before the commit completed
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.UNKNOWN_MEMBER_ID.code())));
+ coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")));
+ }
+
+ @Test(expected = CommitFailedException.class)
+ public void testCommitOffsetRebalanceInProgress() {
+ // we cannot retry if a rebalance occurs before the commit completed
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.REBALANCE_IN_PROGRESS.code())));
+ coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")));
+ }
+
+ @Test(expected = KafkaException.class)
public void testCommitOffsetSyncCallbackWithNonRetriableException() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 7b1e4cb..fe9a6aa 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -28,7 +28,7 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.metrics.KafkaMetric;
http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 900830f..4d69840 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -288,7 +288,7 @@ class GroupCoordinator(val brokerId: Int,
if (errorCode != Errors.NONE.code) {
resetAndPropagateAssignmentError(group, errorCode)
maybePrepareRebalance(group)
- } else if (group.is(AwaitingSync)) {
+ } else {
setAndPropagateAssignment(group, assignment)
group.transitionTo(Stable)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 0c8333f..f6b8103 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -402,7 +402,8 @@ class GroupMetadataManager(val brokerId: Int,
val groupId = baseKey.key.asInstanceOf[String]
val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, msgAndOffset.message.payload)
- addGroup(groupId, groupMetadata)
+ if (groupMetadata != null)
+ addGroup(groupId, groupMetadata)
}
currOffset = msgAndOffset.nextOffset
http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index f2b0f85..029eaf4 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -110,7 +110,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
} catch {
// TODO: should be no need to catch these exceptions once KAFKA-2017 is
// merged since coordinator fail-over will not cause a rebalance
- case _: UnknownMemberIdException | _: IllegalGenerationException =>
+ case _: CommitFailedException =>
}
}
scheduler.shutdown()
http://git-wip-us.apache.org/repos/asf/kafka/blob/27d44afe/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 2e7471c..93bb229 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -15,11 +15,11 @@ package kafka.api
import java.util.regex.Pattern
import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.{NoOffsetForPartitionException, OffsetAndMetadata, KafkaConsumer, ConsumerConfig, RoundRobinAssignor}
+import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.ByteArrayDeserializer
-import org.apache.kafka.common.errors.{OffsetOutOfRangeException, RecordTooLargeException}
+import org.apache.kafka.common.errors.RecordTooLargeException
import org.junit.Assert._
import org.junit.Test
import scala.collection.mutable.Buffer