You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/11/08 16:55:00 UTC

[kafka] branch 2.4 updated: MINOR: Return null in key mapping of committed (#7659)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 9be77e1  MINOR: Return null in key mapping of committed (#7659)
9be77e1 is described below

commit 9be77e184e8f3f6a6df8b640102ecbdaf612eb71
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Fri Nov 8 08:53:38 2019 -0800

    MINOR: Return null in key mapping of committed (#7659)
    
    To be consistent with other grouping APIs, and also modified callers accordingly.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../kafka/clients/consumer/KafkaConsumer.java      | 15 +++++------
 .../consumer/internals/ConsumerCoordinator.java    | 16 ++++++-----
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 31 ++++++++++++++++++++++
 .../kafka/api/PlaintextConsumerTest.scala          |  5 ++--
 .../integration/kafka/api/TransactionsTest.scala   |  2 +-
 .../kafka/admin/ConsumerGroupCommandTest.scala     |  2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  2 +-
 .../streams/processor/internals/StandbyTask.java   | 10 ++-----
 .../streams/processor/internals/StreamTask.java    |  4 ++-
 9 files changed, 58 insertions(+), 29 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 5a5cad8..7c6023d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1796,8 +1796,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * Get the last committed offsets for the given partitions (whether the commit happened by this process or
      * another). The returned offsets will be used as the position for the consumer in the event of a failure.
      * <p>
-     * Partitions that do not have a committed offset would not be included in the returned map.
-     * <p>
      * If any of the partitions requested do not exist, an exception would be thrown.
      * <p>
      * This call will do a remote call to get the latest committed offsets from the server, and will block until the
@@ -1806,8 +1804,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * {@link org.apache.kafka.common.errors.TimeoutException} is thrown to the caller).
      *
      * @param partitions The partitions to check
-     * @return The latest committed offsets for the given partitions; partitions that do not have any committed offsets
-     *         would not be included in the returned result
+     * @return The latest committed offsets for the given partitions; {@code null} will be returned for the
+     *         partition if there is no such message.
      * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
      *             function is called
      * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
@@ -1828,16 +1826,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * Get the last committed offsets for the given partitions (whether the commit happened by this process or
      * another). The returned offsets will be used as the position for the consumer in the event of a failure.
      * <p>
-     * Partitions that do not have a committed offset would not be included in the returned map.
-     * <p>
      * If any of the partitions requested do not exist, an exception would be thrown.
      * <p>
      * This call will block to do a remote call to get the latest committed offsets from the server.
      *
      * @param partitions The partitions to check
      * @param timeout  The maximum amount of time to await the latest committed offsets
-     * @return The latest committed offsets for the given partitions; partitions that do not have any committed offsets
-     *         would not be included in the returned result
+     * @return The latest committed offsets for the given partitions; {@code null} will be returned for the
+     *         partition if there is no such message.
      * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
      *             function is called
      * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
@@ -2398,7 +2394,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
-        offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
+        if (offsetAndMetadata != null)
+            offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
     }
 
     // Visible for testing
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 142b1a1..3f8d81d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -753,14 +753,16 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
             final TopicPartition tp = entry.getKey();
             final OffsetAndMetadata offsetAndMetadata = entry.getValue();
-            final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.leaderAndEpoch(tp);
-            final SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(
+            if (offsetAndMetadata != null) {
+                final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.leaderAndEpoch(tp);
+                final SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(
                     offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch(),
                     leaderAndEpoch);
 
-            log.info("Setting offset for partition {} to the committed offset {}", tp, position);
-            entry.getValue().leaderEpoch().ifPresent(epoch -> this.metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch));
-            this.subscriptions.seekUnvalidated(tp, position);
+                log.info("Setting offset for partition {} to the committed offset {}", tp, position);
+                entry.getValue().leaderEpoch().ifPresent(epoch -> this.metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch));
+                this.subscriptions.seekUnvalidated(tp, position);
+            }
         }
         return true;
     }
@@ -1232,10 +1234,12 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                         return;
                     }
                 } else if (data.offset >= 0) {
-                    // record the position with the offset (-1 indicates no committed offset to fetch)
+                    // record the position with the offset (-1 indicates no committed offset to fetch);
+                    // if there's no committed offset, record as null
                     offsets.put(tp, new OffsetAndMetadata(data.offset, data.leaderEpoch, data.metadata));
                 } else {
                     log.info("Found no committed offset for partition {}", tp);
+                    offsets.put(tp, null);
                 }
             }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index e939d96..1fd9fb3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -720,6 +720,37 @@ public class KafkaConsumerTest {
     }
 
     @Test
+    public void testNoCommittedOffsets() {
+        long offset1 = 10000;
+
+        Time time = new MockTime();
+        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
+        ConsumerMetadata metadata = createMetadata(subscription);
+        MockClient client = new MockClient(time, metadata);
+
+        initMetadata(client, Collections.singletonMap(topic, 2));
+        Node node = metadata.fetch().nodes().get(0);
+
+        ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
+
+        KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+        consumer.assign(Arrays.asList(tp0, tp1));
+
+        // lookup coordinator
+        client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
+        Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
+
+        // fetch offset for one topic
+        client.prepareResponseFrom(offsetResponse(Utils.mkMap(Utils.mkEntry(tp0, offset1), Utils.mkEntry(tp1, -1L)), Errors.NONE), coordinator);
+        final Map<TopicPartition, OffsetAndMetadata> committed = consumer.committed(Utils.mkSet(tp0, tp1));
+        assertEquals(2, committed.size());
+        assertEquals(offset1, committed.get(tp0).offset());
+        assertNull(committed.get(tp1));
+
+        consumer.close(Duration.ofMillis(0));
+    }
+
+    @Test
     public void testAutoCommitSentBeforePositionUpdate() {
         Time time = new MockTime();
         SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 62b358e..9df468a 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -622,12 +622,13 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     val producer = createProducer()
     sendRecords(producer, numRecords = 5, tp)
 
+    val topicPartition = new TopicPartition(topic, 15)
     val consumer = createConsumer()
-    assertTrue(consumer.committed(Set(new TopicPartition(topic, 15)).asJava).isEmpty)
+    assertNull(consumer.committed(Set(topicPartition).asJava).get(topicPartition))
 
     // position() on a partition that we aren't subscribed to throws an exception
     intercept[IllegalStateException] {
-      consumer.position(new TopicPartition(topic, 15))
+      consumer.position(topicPartition)
     }
 
     consumer.assign(List(tp).asJava)
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index e1bf3db..63bfc9f 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -389,7 +389,7 @@ class TransactionsTest extends KafkaServerTestHarness {
     val producer2 = transactionalProducers(1)
     producer2.initTransactions()
 
-    assertEquals(offsetAndMetadata, consumer.committed(Set(tp).asJava).get(tp))
+    TestUtils.waitUntilTrue(() => offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot read committed offset")
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
index a1f58d4..97b638f 100644
--- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
@@ -73,7 +73,7 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness {
       val partitions: Set[TopicPartition] = consumer.partitionsFor(topic)
         .asScala.toSet.map {partitionInfo : PartitionInfo => new TopicPartition(partitionInfo.topic, partitionInfo.partition)}
 
-      consumer.committed(partitions.asJava).asScala.mapValues(_.offset()).toMap
+      consumer.committed(partitions.asJava).asScala.filter(_._2 != null).mapValues(_.offset()).toMap
     } finally {
       consumer.close()
     }
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 2e8afe3..a7421d7 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1443,7 +1443,7 @@ object TestUtils extends Logging {
   }
 
   def resetToCommittedPositions(consumer: KafkaConsumer[Array[Byte], Array[Byte]]) {
-    val committed = consumer.committed(consumer.assignment).asScala.mapValues(_.offset)
+    val committed = consumer.committed(consumer.assignment).asScala.filter(_._2 != null).mapValues(_.offset)
 
     consumer.assignment.asScala.foreach { topicPartition =>
       if (committed.contains(topicPartition))
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index fc1ff4f..251d99f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -222,15 +222,9 @@ public class StandbyTask extends AbstractTask {
 
     private Map<TopicPartition, Long> committedOffsetForPartitions(final Set<TopicPartition> partitions) {
         try {
-            final Map<TopicPartition, Long> results = consumer.committed(partitions)
-                .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
-
             // those do not have a committed offset would default to 0
-            for (final TopicPartition tp : partitions) {
-                results.putIfAbsent(tp, 0L);
-            }
-
-            return results;
+            return consumer.committed(partitions).entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() == null ? 0L : e.getValue().offset()));
         } catch (final AuthorizationException e) {
             throw new ProcessorStateException(String.format("task [%s] AuthorizationException when initializing offsets for %s", id, partitions), e);
         } catch (final WakeupException e) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index f167e1a..fcab9f1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -257,7 +257,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     @Override
     public void initializeMetadata() {
         try {
-            final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata = consumer.committed(partitions);
+            final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata = consumer.committed(partitions).entrySet().stream()
+                .filter(e -> e.getValue() != null)
+                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
             initializeCommittedOffsets(offsetsAndMetadata);
             initializeTaskTime(offsetsAndMetadata);
         } catch (final AuthorizationException e) {