You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/10/18 23:04:15 UTC

[kafka] branch 3.3 updated: KAFKA-14316; Fix feature control iterator metadata version handling (#12765)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 5cd9c9bf1df KAFKA-14316; Fix feature control iterator metadata version handling (#12765)
5cd9c9bf1df is described below

commit 5cd9c9bf1df34d60d4d147bfb77e220fb76467f7
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Oct 18 15:30:45 2022 -0700

    KAFKA-14316; Fix feature control iterator metadata version handling (#12765)
    
    The iterator `FeatureControlIterator.hasNext()` checks two conditions: 1) whether we have already written the metadata version, and 2) whether the underlying iterator has additional records. However, in `next()`, we also check that the metadata version is at least high enough to include it in the log. When this fails, then we can see an unexpected `NoSuchElementException` if the underlying iterator is empty.
    
    Reviewers: Colin Patrick McCabe <cm...@apache.org>
---
 .../kafka/controller/FeatureControlManager.java     | 21 ++++++++++++---------
 .../kafka/controller/FeatureControlManagerTest.java | 18 ++++++++++++++++++
 2 files changed, 30 insertions(+), 9 deletions(-)

diff --git a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index 274e28906bf..e596b705f45 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -320,22 +320,25 @@ public class FeatureControlManager {
 
         @Override
         public boolean hasNext() {
-            return !wroteVersion || iterator.hasNext();
+            return needsWriteMetadataVersion() || iterator.hasNext();
+        }
+
+        private boolean needsWriteMetadataVersion() {
+            return !wroteVersion && metadataVersion.isAtLeast(minimumBootstrapVersion);
         }
 
         @Override
         public List<ApiMessageAndVersion> next() {
             // Write the metadata.version first
-            if (!wroteVersion) {
-                if (metadataVersion.isAtLeast(minimumBootstrapVersion)) {
-                    wroteVersion = true;
-                    return Collections.singletonList(new ApiMessageAndVersion(new FeatureLevelRecord()
-                            .setName(MetadataVersion.FEATURE_NAME)
-                            .setFeatureLevel(metadataVersion.featureLevel()), FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
-                }
+            if (needsWriteMetadataVersion()) {
+                wroteVersion = true;
+                return Collections.singletonList(new ApiMessageAndVersion(new FeatureLevelRecord()
+                    .setName(MetadataVersion.FEATURE_NAME)
+                    .setFeatureLevel(metadataVersion.featureLevel()), FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
             }
+
             // Then write the rest of the features
-            if (!hasNext()) throw new NoSuchElementException();
+            if (!iterator.hasNext()) throw new NoSuchElementException();
             Entry<String, Short> entry = iterator.next();
             return Collections.singletonList(new ApiMessageAndVersion(new FeatureLevelRecord()
                 .setName(entry.getKey())
diff --git a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index 3845f646668..2478f4ce164 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -201,6 +201,24 @@ public class FeatureControlManagerTest {
         );
     }
 
+    @Test
+    public void testFeatureControlIteratorWithOldMetadataVersion() throws Exception {
+        // We require minimum of IBP_3_3_IV0 to write metadata version in the snapshot.
+
+        LogContext logContext = new LogContext();
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
+        FeatureControlManager manager = new FeatureControlManager.Builder()
+            .setLogContext(logContext)
+            .setSnapshotRegistry(snapshotRegistry)
+            .setMetadataVersion(MetadataVersion.IBP_3_2_IV0)
+            .build();
+
+        RecordTestUtils.assertBatchIteratorContains(
+            Collections.emptyList(),
+            manager.iterator(Long.MAX_VALUE)
+        );
+    }
+
     @Test
     public void testFeatureControlIterator() throws Exception {
         LogContext logContext = new LogContext();