You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/10/18 23:04:15 UTC
[kafka] branch 3.3 updated: KAFKA-14316; Fix feature control iterator metadata version handling (#12765)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 5cd9c9bf1df KAFKA-14316; Fix feature control iterator metadata version handling (#12765)
5cd9c9bf1df is described below
commit 5cd9c9bf1df34d60d4d147bfb77e220fb76467f7
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Oct 18 15:30:45 2022 -0700
KAFKA-14316; Fix feature control iterator metadata version handling (#12765)
The iterator `FeatureControlIterator.hasNext()` checks two conditions: 1) whether we have already written the metadata version, and 2) whether the underlying iterator has additional records. However, in `next()`, we also check that the metadata version is at least high enough to include it in the log. When this fails, then we can see an unexpected `NoSuchElementException` if the underlying iterator is empty.
Reviewers: Colin Patrick McCabe <cm...@apache.org>
---
.../kafka/controller/FeatureControlManager.java | 21 ++++++++++++---------
.../kafka/controller/FeatureControlManagerTest.java | 18 ++++++++++++++++++
2 files changed, 30 insertions(+), 9 deletions(-)
diff --git a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index 274e28906bf..e596b705f45 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -320,22 +320,25 @@ public class FeatureControlManager {
@Override
public boolean hasNext() {
- return !wroteVersion || iterator.hasNext();
+ return needsWriteMetadataVersion() || iterator.hasNext();
+ }
+
+ private boolean needsWriteMetadataVersion() {
+ return !wroteVersion && metadataVersion.isAtLeast(minimumBootstrapVersion);
}
@Override
public List<ApiMessageAndVersion> next() {
// Write the metadata.version first
- if (!wroteVersion) {
- if (metadataVersion.isAtLeast(minimumBootstrapVersion)) {
- wroteVersion = true;
- return Collections.singletonList(new ApiMessageAndVersion(new FeatureLevelRecord()
- .setName(MetadataVersion.FEATURE_NAME)
- .setFeatureLevel(metadataVersion.featureLevel()), FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
- }
+ if (needsWriteMetadataVersion()) {
+ wroteVersion = true;
+ return Collections.singletonList(new ApiMessageAndVersion(new FeatureLevelRecord()
+ .setName(MetadataVersion.FEATURE_NAME)
+ .setFeatureLevel(metadataVersion.featureLevel()), FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
}
+
// Then write the rest of the features
- if (!hasNext()) throw new NoSuchElementException();
+ if (!iterator.hasNext()) throw new NoSuchElementException();
Entry<String, Short> entry = iterator.next();
return Collections.singletonList(new ApiMessageAndVersion(new FeatureLevelRecord()
.setName(entry.getKey())
diff --git a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index 3845f646668..2478f4ce164 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -201,6 +201,24 @@ public class FeatureControlManagerTest {
);
}
+ @Test
+ public void testFeatureControlIteratorWithOldMetadataVersion() throws Exception {
+ // We require minimum of IBP_3_3_IV0 to write metadata version in the snapshot.
+
+ LogContext logContext = new LogContext();
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
+ FeatureControlManager manager = new FeatureControlManager.Builder()
+ .setLogContext(logContext)
+ .setSnapshotRegistry(snapshotRegistry)
+ .setMetadataVersion(MetadataVersion.IBP_3_2_IV0)
+ .build();
+
+ RecordTestUtils.assertBatchIteratorContains(
+ Collections.emptyList(),
+ manager.iterator(Long.MAX_VALUE)
+ );
+ }
+
@Test
public void testFeatureControlIterator() throws Exception {
LogContext logContext = new LogContext();