You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by lu...@apache.org on 2024/01/09 08:24:40 UTC
(kafka) branch 3.7 updated: KAFKA-16097: Disable state updater in 3.7 (#15146)
This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.7 by this push:
new 01ecb1ab48f KAFKA-16097: Disable state updater in 3.7 (#15146)
01ecb1ab48f is described below
commit 01ecb1ab48f4b5aa949cc295e69f11baa1751bbf
Author: Lucas Brutschy <lb...@confluent.io>
AuthorDate: Tue Jan 9 09:24:33 2024 +0100
KAFKA-16097: Disable state updater in 3.7 (#15146)
Several problems are still appearing while running 3.7 with
the state updater. This change will disable the state updater
by default.
---
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java | 2 +-
.../org/apache/kafka/streams/integration/EosIntegrationTest.java | 1 +
.../kafka/streams/processor/internals/StoreChangelogReaderTest.java | 6 +++---
3 files changed, 5 insertions(+), 4 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 0f549aaa74e..da0b1d6f8a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -1226,7 +1226,7 @@ public class StreamsConfig extends AbstractConfig {
public static final String STATE_UPDATER_ENABLED = "__state.updater.enabled__";
public static boolean getStateUpdaterEnabled(final Map<String, Object> configs) {
- return InternalConfig.getBoolean(configs, InternalConfig.STATE_UPDATER_ENABLED, true);
+ return InternalConfig.getBoolean(configs, InternalConfig.STATE_UPDATER_ENABLED, false);
}
// Private API to enable processing threads (i.e. polling is decoupled from processing)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 688693dbcbf..d06613dd9a8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -1119,6 +1119,7 @@ public class EosIntegrationTest {
properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
properties.put(StreamsConfig.STATE_DIR_CONFIG, stateTmpDir + appDir);
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, dummyHostName + ":2142");
+ properties.put(InternalConfig.STATE_UPDATER_ENABLED, processingThreadsEnabled);
properties.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled);
final Properties config = StreamsTestUtils.getStreamsConfig(
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 0e69a9482b7..095c44510a9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -464,10 +464,10 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout());
} else {
if (!properties.containsKey(InternalConfig.STATE_UPDATER_ENABLED)
- || (boolean) properties.get(InternalConfig.STATE_UPDATER_ENABLED)) {
- assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout());
- } else {
+ || !((boolean) properties.get(InternalConfig.STATE_UPDATER_ENABLED))) {
assertEquals(Duration.ZERO, consumer.lastPollTimeout());
+ } else {
+ assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout());
}
}
}