You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/08 16:44:49 UTC
kafka git commit: KAFKA-4839;
Throw NoOffsetForPartitionException from poll once for all
TopicPartitions affected
Repository: kafka
Updated Branches:
refs/heads/trunk 257ad524d -> 2554a8dd4
KAFKA-4839; Throw NoOffsetForPartitionException from poll once for all TopicPartitions affected
Signed-off-by: radai-rosenblatt <radai.rosenblattgmail.com>
Author: radai-rosenblatt <ra...@gmail.com>
Reviewers: Apurva Mehta <ap...@confluent.io>, Vahid Hashemian <va...@us.ibm.com>, Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
Closes #2637 from radai-rosenblatt/KAFKA-4839
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2554a8dd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2554a8dd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2554a8dd
Branch: refs/heads/trunk
Commit: 2554a8dd4dd07b0ac844839b51533bd1e67eed85
Parents: 257ad52
Author: radai-rosenblatt <ra...@gmail.com>
Authored: Mon May 8 09:42:44 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon May 8 09:42:44 2017 -0700
----------------------------------------------------------------------
.../consumer/NoOffsetForPartitionException.java | 25 ++++++++++++++++----
.../clients/consumer/internals/Fetcher.java | 22 ++++++++++-------
.../clients/consumer/internals/FetcherTest.java | 15 ++++++++++++
3 files changed, 50 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/2554a8dd/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
index 14bb710..375cda2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
@@ -18,7 +18,9 @@ package org.apache.kafka.clients.consumer;
import org.apache.kafka.common.TopicPartition;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Set;
/**
@@ -29,19 +31,34 @@ public class NoOffsetForPartitionException extends InvalidOffsetException {
private static final long serialVersionUID = 1L;
- private final TopicPartition partition;
+ private final Set<TopicPartition> partitions;
public NoOffsetForPartitionException(TopicPartition partition) {
super("Undefined offset with no reset policy for partition: " + partition);
- this.partition = partition;
+ this.partitions = Collections.singleton(partition);
}
+ public NoOffsetForPartitionException(Collection<TopicPartition> partitions) {
+ super("Undefined offset with no reset policy for partitions: " + partitions);
+ this.partitions = Collections.unmodifiableSet(new HashSet<>(partitions));
+ }
+
+ /**
+ * returns the first partition (out of {@link #partitions}) for which no offset is defined.
+ * @deprecated please use {@link #partitions}
+ * @return a partition with no offset
+ */
+ @Deprecated
public TopicPartition partition() {
- return partition;
+ return partitions.isEmpty() ? null : partitions.iterator().next();
}
+ /**
+ * returns all partitions for which no offests are defined.
+ * @return all partitions without offsets
+ */
public Set<TopicPartition> partitions() {
- return Collections.singleton(partition);
+ return partitions;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2554a8dd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index dc6c338..bf5df95 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -382,16 +382,17 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
return client.send(node, request);
}
- private long offsetResetStrategyTimestamp(final TopicPartition partition) {
+ private void offsetResetStrategyTimestamp(
+ final TopicPartition partition,
+ final Map<TopicPartition, Long> output,
+ final Set<TopicPartition> partitionsWithNoOffsets) {
OffsetResetStrategy strategy = subscriptions.resetStrategy(partition);
- final long timestamp;
if (strategy == OffsetResetStrategy.EARLIEST)
- timestamp = ListOffsetRequest.EARLIEST_TIMESTAMP;
+ output.put(partition, ListOffsetRequest.EARLIEST_TIMESTAMP);
else if (strategy == OffsetResetStrategy.LATEST)
- timestamp = endTimestamp();
+ output.put(partition, endTimestamp());
else
- throw new NoOffsetForPartitionException(partition);
- return timestamp;
+ partitionsWithNoOffsets.add(partition);
}
/**
@@ -402,14 +403,16 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
*/
private void resetOffsets(final Set<TopicPartition> partitions) {
final Map<TopicPartition, Long> offsetResets = new HashMap<>();
+ final Set<TopicPartition> partitionsWithNoOffsets = new HashSet<>();
for (final TopicPartition partition : partitions) {
- offsetResets.put(partition, offsetResetStrategyTimestamp(partition));
+ offsetResetStrategyTimestamp(partition, offsetResets, partitionsWithNoOffsets);
}
final Map<TopicPartition, OffsetData> offsetsByTimes = retrieveOffsetsByTimes(offsetResets, Long.MAX_VALUE, false);
for (final TopicPartition partition : partitions) {
final OffsetData offsetData = offsetsByTimes.get(partition);
if (offsetData == null) {
- throw new NoOffsetForPartitionException(partition);
+ partitionsWithNoOffsets.add(partition);
+ continue;
}
// we might lose the assignment while fetching the offset, so check it is still active
if (subscriptions.isAssigned(partition)) {
@@ -417,6 +420,9 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
this.subscriptions.seek(partition, offsetData.offset);
}
}
+ if (!partitionsWithNoOffsets.isEmpty()) {
+ throw new NoOffsetForPartitionException(partitionsWithNoOffsets);
+ }
}
public Map<TopicPartition, OffsetAndTimestamp> getOffsetsByTimes(Map<TopicPartition, Long> timestampsToSearch,
http://git-wip-us.apache.org/repos/asf/kafka/blob/2554a8dd/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 0a0f3d9..4b957a3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
@@ -780,6 +781,20 @@ public class FetcherTest {
}
@Test
+ public void testUpdateFetchPositionsNoneCommittedNoResetStrategy() {
+ Set<TopicPartition> tps = new HashSet<>(Arrays.asList(tp1, tp2));
+ subscriptionsNoAutoReset.assignFromUser(tps);
+ try {
+ fetcherNoAutoReset.updateFetchPositions(tps);
+ fail("Should have thrown NoOffsetForPartitionException");
+ } catch (NoOffsetForPartitionException e) {
+ // we expect the exception to be thrown for both TPs at the same time
+ Set<TopicPartition> partitions = e.partitions();
+ assertEquals(tps, partitions);
+ }
+ }
+
+ @Test
public void testUpdateFetchPositionToCommitted() {
// unless a specific reset is expected, the default behavior is to reset to the committed
// position if one is present