You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2024/01/17 01:25:39 UTC

(kafka) branch trunk updated: KAFKA-16139: Fix StreamsUpgradeTest (#15199)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e563aad4eec KAFKA-16139: Fix StreamsUpgradeTest (#15199)
e563aad4eec is described below

commit e563aad4eec2e08c8db54e1afebe28c746130ba4
Author: Bruno Cadonna <ca...@apache.org>
AuthorDate: Wed Jan 17 02:25:33 2024 +0100

    KAFKA-16139: Fix StreamsUpgradeTest (#15199)
    
    Adds version 3.5 to the possible values for config upgrade_from.
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>
---
 streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java     | 4 +++-
 .../java/org/apache/kafka/streams/internals/UpgradeFromValues.java    | 3 ++-
 .../streams/processor/internals/assignment/AssignorConfiguration.java | 2 ++
 3 files changed, 7 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 0f549aaa74e..f8f84e9e9c2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -71,6 +71,7 @@ import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.Range.between;
 import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
 import static org.apache.kafka.common.config.ConfigDef.parseType;
+import static org.apache.kafka.streams.internals.UpgradeFromValues.UPGRADE_FROM_35;
 
 /**
  * Configuration for a {@link KafkaStreams} instance.
@@ -762,7 +763,8 @@ public class StreamsConfig extends AbstractConfig {
         UPGRADE_FROM_22 + "\", \"" + UPGRADE_FROM_23 + "\", \"" + UPGRADE_FROM_24 + "\", \"" +
         UPGRADE_FROM_25 + "\", \"" + UPGRADE_FROM_26 + "\", \"" + UPGRADE_FROM_27 + "\", \"" +
         UPGRADE_FROM_28 + "\", \"" + UPGRADE_FROM_30 + "\", \"" + UPGRADE_FROM_31 + "\", \"" +
-        UPGRADE_FROM_32 + "\", \"" + UPGRADE_FROM_33 + "\", \"" + UPGRADE_FROM_34 + "\" (for upgrading from the corresponding old version).";
+        UPGRADE_FROM_32 + "\", \"" + UPGRADE_FROM_33 + "\", \"" + UPGRADE_FROM_34 + "\", \"" +
+        UPGRADE_FROM_35 + "(for upgrading from the corresponding old version).";
 
     /** {@code windowstore.changelog.additional.retention.ms} */
     @SuppressWarnings("WeakerAccess")
diff --git a/streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java b/streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java
index a5ae71a33d4..e803e918701 100644
--- a/streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java
@@ -36,7 +36,8 @@ public enum UpgradeFromValues {
     UPGRADE_FROM_31("3.1"),
     UPGRADE_FROM_32("3.2"),
     UPGRADE_FROM_33("3.3"),
-    UPGRADE_FROM_34("3.4");
+    UPGRADE_FROM_34("3.4"),
+    UPGRADE_FROM_35("3.5");
 
     private final String value;
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
index 02a0cb8bab9..76e824776e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
@@ -125,6 +125,7 @@ public final class AssignorConfiguration {
                 case UPGRADE_FROM_32:
                 case UPGRADE_FROM_33:
                 case UPGRADE_FROM_34:
+                case UPGRADE_FROM_35:
                     // we need to add new version when new "upgrade.from" values become available
 
                     // This config is for explicitly sending FK response to a requested partition
@@ -183,6 +184,7 @@ public final class AssignorConfiguration {
                 case UPGRADE_FROM_32:
                 case UPGRADE_FROM_33:
                 case UPGRADE_FROM_34:
+                case UPGRADE_FROM_35:
                     // we need to add new version when new "upgrade.from" values become available
 
                     // This config is for explicitly sending FK response to a requested partition