You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/11/08 16:55:00 UTC
[kafka] branch 2.4 updated: MINOR: Return null in key mapping of
committed (#7659)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 9be77e1 MINOR: Return null in key mapping of committed (#7659)
9be77e1 is described below
commit 9be77e184e8f3f6a6df8b640102ecbdaf612eb71
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Fri Nov 8 08:53:38 2019 -0800
MINOR: Return null in key mapping of committed (#7659)
To be consistent with other grouping APIs, and also modified callers accordingly.
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../kafka/clients/consumer/KafkaConsumer.java | 15 +++++------
.../consumer/internals/ConsumerCoordinator.java | 16 ++++++-----
.../kafka/clients/consumer/KafkaConsumerTest.java | 31 ++++++++++++++++++++++
.../kafka/api/PlaintextConsumerTest.scala | 5 ++--
.../integration/kafka/api/TransactionsTest.scala | 2 +-
.../kafka/admin/ConsumerGroupCommandTest.scala | 2 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 2 +-
.../streams/processor/internals/StandbyTask.java | 10 ++-----
.../streams/processor/internals/StreamTask.java | 4 ++-
9 files changed, 58 insertions(+), 29 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 5a5cad8..7c6023d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1796,8 +1796,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* Get the last committed offsets for the given partitions (whether the commit happened by this process or
* another). The returned offsets will be used as the position for the consumer in the event of a failure.
* <p>
- * Partitions that do not have a committed offset would not be included in the returned map.
- * <p>
* If any of the partitions requested do not exist, an exception would be thrown.
* <p>
* This call will do a remote call to get the latest committed offsets from the server, and will block until the
@@ -1806,8 +1804,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* {@link org.apache.kafka.common.errors.TimeoutException} is thrown to the caller).
*
* @param partitions The partitions to check
- * @return The latest committed offsets for the given partitions; partitions that do not have any committed offsets
- * would not be included in the returned result
+ * @return The latest committed offsets for the given partitions; {@code null} will be returned for the
+ * partition if there is no such message.
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
@@ -1828,16 +1826,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* Get the last committed offsets for the given partitions (whether the commit happened by this process or
* another). The returned offsets will be used as the position for the consumer in the event of a failure.
* <p>
- * Partitions that do not have a committed offset would not be included in the returned map.
- * <p>
* If any of the partitions requested do not exist, an exception would be thrown.
* <p>
* This call will block to do a remote call to get the latest committed offsets from the server.
*
* @param partitions The partitions to check
* @param timeout The maximum amount of time to await the latest committed offsets
- * @return The latest committed offsets for the given partitions; partitions that do not have any committed offsets
- * would not be included in the returned result
+ * @return The latest committed offsets for the given partitions; {@code null} will be returned for the
+ * partition if there is no such message.
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
@@ -2398,7 +2394,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
- offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
+ if (offsetAndMetadata != null)
+ offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
}
// Visible for testing
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 142b1a1..3f8d81d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -753,14 +753,16 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
final TopicPartition tp = entry.getKey();
final OffsetAndMetadata offsetAndMetadata = entry.getValue();
- final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.leaderAndEpoch(tp);
- final SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(
+ if (offsetAndMetadata != null) {
+ final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.leaderAndEpoch(tp);
+ final SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(
offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch(),
leaderAndEpoch);
- log.info("Setting offset for partition {} to the committed offset {}", tp, position);
- entry.getValue().leaderEpoch().ifPresent(epoch -> this.metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch));
- this.subscriptions.seekUnvalidated(tp, position);
+ log.info("Setting offset for partition {} to the committed offset {}", tp, position);
+ entry.getValue().leaderEpoch().ifPresent(epoch -> this.metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch));
+ this.subscriptions.seekUnvalidated(tp, position);
+ }
}
return true;
}
@@ -1232,10 +1234,12 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
return;
}
} else if (data.offset >= 0) {
- // record the position with the offset (-1 indicates no committed offset to fetch)
+ // record the position with the offset (-1 indicates no committed offset to fetch);
+ // if there's no committed offset, record as null
offsets.put(tp, new OffsetAndMetadata(data.offset, data.leaderEpoch, data.metadata));
} else {
log.info("Found no committed offset for partition {}", tp);
+ offsets.put(tp, null);
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index e939d96..1fd9fb3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -720,6 +720,37 @@ public class KafkaConsumerTest {
}
@Test
+ public void testNoCommittedOffsets() {
+ long offset1 = 10000;
+
+ Time time = new MockTime();
+ SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
+ ConsumerMetadata metadata = createMetadata(subscription);
+ MockClient client = new MockClient(time, metadata);
+
+ initMetadata(client, Collections.singletonMap(topic, 2));
+ Node node = metadata.fetch().nodes().get(0);
+
+ ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
+
+ KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId);
+ consumer.assign(Arrays.asList(tp0, tp1));
+
+ // lookup coordinator
+ client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node);
+ Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
+
+ // fetch offset for one topic
+ client.prepareResponseFrom(offsetResponse(Utils.mkMap(Utils.mkEntry(tp0, offset1), Utils.mkEntry(tp1, -1L)), Errors.NONE), coordinator);
+ final Map<TopicPartition, OffsetAndMetadata> committed = consumer.committed(Utils.mkSet(tp0, tp1));
+ assertEquals(2, committed.size());
+ assertEquals(offset1, committed.get(tp0).offset());
+ assertNull(committed.get(tp1));
+
+ consumer.close(Duration.ofMillis(0));
+ }
+
+ @Test
public void testAutoCommitSentBeforePositionUpdate() {
Time time = new MockTime();
SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 62b358e..9df468a 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -622,12 +622,13 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val producer = createProducer()
sendRecords(producer, numRecords = 5, tp)
+ val topicPartition = new TopicPartition(topic, 15)
val consumer = createConsumer()
- assertTrue(consumer.committed(Set(new TopicPartition(topic, 15)).asJava).isEmpty)
+ assertNull(consumer.committed(Set(topicPartition).asJava).get(topicPartition))
// position() on a partition that we aren't subscribed to throws an exception
intercept[IllegalStateException] {
- consumer.position(new TopicPartition(topic, 15))
+ consumer.position(topicPartition)
}
consumer.assign(List(tp).asJava)
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index e1bf3db..63bfc9f 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -389,7 +389,7 @@ class TransactionsTest extends KafkaServerTestHarness {
val producer2 = transactionalProducers(1)
producer2.initTransactions()
- assertEquals(offsetAndMetadata, consumer.committed(Set(tp).asJava).get(tp))
+ TestUtils.waitUntilTrue(() => offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot read committed offset")
}
@Test
diff --git a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
index a1f58d4..97b638f 100644
--- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
@@ -73,7 +73,7 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness {
val partitions: Set[TopicPartition] = consumer.partitionsFor(topic)
.asScala.toSet.map {partitionInfo : PartitionInfo => new TopicPartition(partitionInfo.topic, partitionInfo.partition)}
- consumer.committed(partitions.asJava).asScala.mapValues(_.offset()).toMap
+ consumer.committed(partitions.asJava).asScala.filter(_._2 != null).mapValues(_.offset()).toMap
} finally {
consumer.close()
}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 2e8afe3..a7421d7 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1443,7 +1443,7 @@ object TestUtils extends Logging {
}
def resetToCommittedPositions(consumer: KafkaConsumer[Array[Byte], Array[Byte]]) {
- val committed = consumer.committed(consumer.assignment).asScala.mapValues(_.offset)
+ val committed = consumer.committed(consumer.assignment).asScala.filter(_._2 != null).mapValues(_.offset)
consumer.assignment.asScala.foreach { topicPartition =>
if (committed.contains(topicPartition))
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index fc1ff4f..251d99f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -222,15 +222,9 @@ public class StandbyTask extends AbstractTask {
private Map<TopicPartition, Long> committedOffsetForPartitions(final Set<TopicPartition> partitions) {
try {
- final Map<TopicPartition, Long> results = consumer.committed(partitions)
- .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
-
// those do not have a committed offset would default to 0
- for (final TopicPartition tp : partitions) {
- results.putIfAbsent(tp, 0L);
- }
-
- return results;
+ return consumer.committed(partitions).entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() == null ? 0L : e.getValue().offset()));
} catch (final AuthorizationException e) {
throw new ProcessorStateException(String.format("task [%s] AuthorizationException when initializing offsets for %s", id, partitions), e);
} catch (final WakeupException e) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index f167e1a..fcab9f1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -257,7 +257,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
@Override
public void initializeMetadata() {
try {
- final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata = consumer.committed(partitions);
+ final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata = consumer.committed(partitions).entrySet().stream()
+ .filter(e -> e.getValue() != null)
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
initializeCommittedOffsets(offsetsAndMetadata);
initializeTaskTime(offsetsAndMetadata);
} catch (final AuthorizationException e) {