You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by lu...@apache.org on 2024/01/09 08:24:40 UTC

(kafka) branch 3.7 updated: KAFKA-16097: Disable state updater in 3.7 (#15146)

This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.7 by this push:
     new 01ecb1ab48f KAFKA-16097: Disable state updater in 3.7 (#15146)
01ecb1ab48f is described below

commit 01ecb1ab48f4b5aa949cc295e69f11baa1751bbf
Author: Lucas Brutschy <lb...@confluent.io>
AuthorDate: Tue Jan 9 09:24:33 2024 +0100

    KAFKA-16097: Disable state updater in 3.7 (#15146)
    
    Several problems are still appearing while running 3.7 with
    the state updater. This change will disable the state updater
    by default.
---
 streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java   | 2 +-
 .../org/apache/kafka/streams/integration/EosIntegrationTest.java    | 1 +
 .../kafka/streams/processor/internals/StoreChangelogReaderTest.java | 6 +++---
 3 files changed, 5 insertions(+), 4 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 0f549aaa74e..da0b1d6f8a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -1226,7 +1226,7 @@ public class StreamsConfig extends AbstractConfig {
         public static final String STATE_UPDATER_ENABLED = "__state.updater.enabled__";
 
         public static boolean getStateUpdaterEnabled(final Map<String, Object> configs) {
-            return InternalConfig.getBoolean(configs, InternalConfig.STATE_UPDATER_ENABLED, true);
+            return InternalConfig.getBoolean(configs, InternalConfig.STATE_UPDATER_ENABLED, false);
         }
         
         // Private API to enable processing threads (i.e. polling is decoupled from processing)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 688693dbcbf..d06613dd9a8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -1119,6 +1119,7 @@ public class EosIntegrationTest {
         properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
         properties.put(StreamsConfig.STATE_DIR_CONFIG, stateTmpDir + appDir);
         properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, dummyHostName + ":2142");
+        properties.put(InternalConfig.STATE_UPDATER_ENABLED, processingThreadsEnabled);
         properties.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled);
 
         final Properties config = StreamsTestUtils.getStreamsConfig(
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 0e69a9482b7..095c44510a9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -464,10 +464,10 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
             assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout());
         } else {
             if (!properties.containsKey(InternalConfig.STATE_UPDATER_ENABLED)
-                    || (boolean) properties.get(InternalConfig.STATE_UPDATER_ENABLED)) {
-                assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout());
-            } else {
+                    || !((boolean) properties.get(InternalConfig.STATE_UPDATER_ENABLED))) {
                 assertEquals(Duration.ZERO, consumer.lastPollTimeout());
+            } else {
+                assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout());
             }
         }
     }