You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/08 16:44:49 UTC

kafka git commit: KAFKA-4839; Throw NoOffsetForPartitionException from poll once for all TopicPartitions affected

Repository: kafka
Updated Branches:
  refs/heads/trunk 257ad524d -> 2554a8dd4


KAFKA-4839; Throw NoOffsetForPartitionException from poll once for all TopicPartitions affected

Signed-off-by: radai-rosenblatt <radai.rosenblattgmail.com>

Author: radai-rosenblatt <ra...@gmail.com>

Reviewers: Apurva Mehta <ap...@confluent.io>, Vahid Hashemian <va...@us.ibm.com>, Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>

Closes #2637 from radai-rosenblatt/KAFKA-4839


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2554a8dd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2554a8dd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2554a8dd

Branch: refs/heads/trunk
Commit: 2554a8dd4dd07b0ac844839b51533bd1e67eed85
Parents: 257ad52
Author: radai-rosenblatt <ra...@gmail.com>
Authored: Mon May 8 09:42:44 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon May 8 09:42:44 2017 -0700

----------------------------------------------------------------------
 .../consumer/NoOffsetForPartitionException.java | 25 ++++++++++++++++----
 .../clients/consumer/internals/Fetcher.java     | 22 ++++++++++-------
 .../clients/consumer/internals/FetcherTest.java | 15 ++++++++++++
 3 files changed, 50 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2554a8dd/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
index 14bb710..375cda2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
@@ -18,7 +18,9 @@ package org.apache.kafka.clients.consumer;
 
 import org.apache.kafka.common.TopicPartition;
 
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Set;
 
 /**
@@ -29,19 +31,34 @@ public class NoOffsetForPartitionException extends InvalidOffsetException {
 
     private static final long serialVersionUID = 1L;
 
-    private final TopicPartition partition;
+    private final Set<TopicPartition> partitions;
 
     public NoOffsetForPartitionException(TopicPartition partition) {
         super("Undefined offset with no reset policy for partition: " + partition);
-        this.partition = partition;
+        this.partitions = Collections.singleton(partition);
     }
 
+    public NoOffsetForPartitionException(Collection<TopicPartition> partitions) {
+        super("Undefined offset with no reset policy for partitions: " + partitions);
+        this.partitions = Collections.unmodifiableSet(new HashSet<>(partitions));
+    }
+
+    /**
+     * returns the first partition (out of {@link #partitions}) for which no offset is defined.
+     * @deprecated please use {@link #partitions}
+     * @return a partition with no offset
+     */
+    @Deprecated
     public TopicPartition partition() {
-        return partition;
+        return partitions.isEmpty() ? null : partitions.iterator().next();
     }
 
+    /**
+     * returns all partitions for which no offests are defined.
+     * @return all partitions without offsets
+     */
     public Set<TopicPartition> partitions() {
-        return Collections.singleton(partition);
+        return partitions;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2554a8dd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index dc6c338..bf5df95 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -382,16 +382,17 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
             return client.send(node, request);
     }
 
-    private long offsetResetStrategyTimestamp(final TopicPartition partition) {
+    private void offsetResetStrategyTimestamp(
+            final TopicPartition partition,
+            final Map<TopicPartition, Long> output,
+            final Set<TopicPartition> partitionsWithNoOffsets) {
         OffsetResetStrategy strategy = subscriptions.resetStrategy(partition);
-        final long timestamp;
         if (strategy == OffsetResetStrategy.EARLIEST)
-            timestamp = ListOffsetRequest.EARLIEST_TIMESTAMP;
+            output.put(partition, ListOffsetRequest.EARLIEST_TIMESTAMP);
         else if (strategy == OffsetResetStrategy.LATEST)
-            timestamp = endTimestamp();
+            output.put(partition, endTimestamp());
         else
-            throw new NoOffsetForPartitionException(partition);
-        return timestamp;
+            partitionsWithNoOffsets.add(partition);
     }
 
     /**
@@ -402,14 +403,16 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
      */
     private void resetOffsets(final Set<TopicPartition> partitions) {
         final Map<TopicPartition, Long> offsetResets = new HashMap<>();
+        final Set<TopicPartition> partitionsWithNoOffsets = new HashSet<>();
         for (final TopicPartition partition : partitions) {
-            offsetResets.put(partition, offsetResetStrategyTimestamp(partition));
+            offsetResetStrategyTimestamp(partition, offsetResets, partitionsWithNoOffsets);
         }
         final Map<TopicPartition, OffsetData> offsetsByTimes = retrieveOffsetsByTimes(offsetResets, Long.MAX_VALUE, false);
         for (final TopicPartition partition : partitions) {
             final OffsetData offsetData = offsetsByTimes.get(partition);
             if (offsetData == null) {
-                throw new NoOffsetForPartitionException(partition);
+                partitionsWithNoOffsets.add(partition);
+                continue;
             }
             // we might lose the assignment while fetching the offset, so check it is still active
             if (subscriptions.isAssigned(partition)) {
@@ -417,6 +420,9 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                 this.subscriptions.seek(partition, offsetData.offset);
             }
         }
+        if (!partitionsWithNoOffsets.isEmpty()) {
+            throw new NoOffsetForPartitionException(partitionsWithNoOffsets);
+        }
     }
 
     public Map<TopicPartition, OffsetAndTimestamp> getOffsetsByTimes(Map<TopicPartition, Long> timestampsToSearch,

http://git-wip-us.apache.org/repos/asf/kafka/blob/2554a8dd/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 0a0f3d9..4b957a3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
@@ -780,6 +781,20 @@ public class FetcherTest {
     }
 
     @Test
+    public void testUpdateFetchPositionsNoneCommittedNoResetStrategy() {
+        Set<TopicPartition> tps = new HashSet<>(Arrays.asList(tp1, tp2));
+        subscriptionsNoAutoReset.assignFromUser(tps);
+        try {
+            fetcherNoAutoReset.updateFetchPositions(tps);
+            fail("Should have thrown NoOffsetForPartitionException");
+        } catch (NoOffsetForPartitionException e) {
+            // we expect the exception to be thrown for both TPs at the same time
+            Set<TopicPartition> partitions = e.partitions();
+            assertEquals(tps, partitions);
+        }
+    }
+
+    @Test
     public void testUpdateFetchPositionToCommitted() {
         // unless a specific reset is expected, the default behavior is to reset to the committed
         // position if one is present