You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2021/06/28 21:56:34 UTC

[kafka] branch 2.8 updated: KAFKA-12951: restore must terminate for tx global topic (#10894)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new bfb1fdb  KAFKA-12951: restore must terminate for tx global topic (#10894)
bfb1fdb is described below

commit bfb1fdb9f7f9f547b42ded973a4ab68c81617fff
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Mon Jun 28 14:10:25 2021 -0700

    KAFKA-12951: restore must terminate for tx global topic (#10894)
    
    Reviewers: Guozhang Wang <gu...@confluent.io>, Luke Chen <sh...@gmail.com>, Gasparina Damien <d....@gmail.com>
---
 .../internals/GlobalStateManagerImpl.java          |  72 ++++------
 .../GlobalKTableEOSIntegrationTest.java            | 146 ++++++++++++++++++---
 .../internals/GlobalStateManagerImplTest.java      |  67 +---------
 3 files changed, 163 insertions(+), 122 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index f10c707..08fa43c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -72,7 +72,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
     private final FixedOrderMap<String, Optional<StateStore>> globalStores = new FixedOrderMap<>();
     private final StateRestoreListener stateRestoreListener;
     private InternalProcessorContext globalProcessorContext;
-    private final Duration requestTimeoutPlusTaskTimeout;
+    private final Duration pollMsPlusRequestTimeout;
     private final long taskTimeoutMs;
     private final Set<String> globalNonPersistentStoresTopics = new HashSet<>();
     private final OffsetCheckpoint checkpointFile;
@@ -112,9 +112,10 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
         final int requestTimeoutMs = new ClientUtils.QuietConsumerConfig(consumerProps)
             .getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        pollMsPlusRequestTimeout = Duration.ofMillis(
+            config.getLong(StreamsConfig.POLL_MS_CONFIG) + requestTimeoutMs
+        );
         taskTimeoutMs = config.getLong(StreamsConfig.TASK_TIMEOUT_MS_CONFIG);
-        requestTimeoutPlusTaskTimeout =
-            Duration.ofMillis(requestTimeoutMs + taskTimeoutMs);
     }
 
     @Override
@@ -252,6 +253,8 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
                               final String storeName,
                               final RecordConverter recordConverter) {
         for (final TopicPartition topicPartition : topicPartitions) {
+            long currentDeadline = NO_DEADLINE;
+
             globalConsumer.assign(Collections.singletonList(topicPartition));
             long offset;
             final Long checkpoint = checkpointFileCache.get(topicPartition);
@@ -260,13 +263,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
                 offset = checkpoint;
             } else {
                 globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
-                offset = retryUntilSuccessOrThrowOnTaskTimeout(
-                    () -> globalConsumer.position(topicPartition),
-                    String.format(
-                        "Failed to get position for partition %s. The broker may be transiently unavailable at the moment.",
-                        topicPartition
-                    )
-                );
+                offset = getGlobalConsumerOffset(topicPartition);
             }
 
             final Long highWatermark = highWatermarks.get(topicPartition);
@@ -276,34 +273,19 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
             stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark);
             long restoreCount = 0L;
 
-            while (offset < highWatermark) { // when we "fix" this loop (KAFKA-7380 / KAFKA-10317)
-                                             // we should update the `poll()` timeout below
-
-                // we ignore `poll.ms` config during bootstrapping phase and
-                // apply `request.timeout.ms` plus `task.timeout.ms` instead
-                //
-                // the reason is, that `poll.ms` might be too short to give a fetch request a fair chance
-                // to actually complete and we don't want to start `task.timeout.ms` too early
-                //
-                // we also pass `task.timeout.ms` into `poll()` directly right now as it simplifies our own code:
-                // if we don't pass it in, we would just track the timeout ourselves and call `poll()` again
-                // in our own retry loop; by passing the timeout we can reuse the consumer's internal retry loop instead
-                //
-                // note that using `request.timeout.ms` provides a conservative upper bound for the timeout;
-                // this implies that we might start `task.timeout.ms` "delayed" -- however, starting the timeout
-                // delayed is preferable (as it's more robust) than starting it too early
-                //
-                // TODO https://issues.apache.org/jira/browse/KAFKA-10315
-                //   -> do a more precise timeout handling if `poll` would throw an exception if a fetch request fails
-                //      (instead of letting the consumer retry fetch requests silently)
+            while (offset < highWatermark) {
+                // we add `request.timeout.ms` to `poll.ms` because `poll.ms` might be too short
+                // to give a fetch request a fair chance to actually complete and we don't want to
+                // start `task.timeout.ms` too early
                 //
-                // TODO https://issues.apache.org/jira/browse/KAFKA-10317 and
-                //      https://issues.apache.org/jira/browse/KAFKA-7380
-                //  -> don't pass in `task.timeout.ms` to stay responsive if `KafkaStreams#close` gets called
-                final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(requestTimeoutPlusTaskTimeout);
+                // TODO with https://issues.apache.org/jira/browse/KAFKA-10315 we can just call
+                //      `poll(pollMS)` without adding the request timeout and do a more precise
+                //      timeout handling
+                final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(pollMsPlusRequestTimeout);
                 if (records.isEmpty()) {
-                    // this will always throw
-                    maybeUpdateDeadlineOrThrow(time.milliseconds());
+                    currentDeadline = maybeUpdateDeadlineOrThrow(currentDeadline);
+                } else {
+                    currentDeadline = NO_DEADLINE;
                 }
 
                 final List<ConsumerRecord<byte[], byte[]>> restoreRecords = new ArrayList<>();
@@ -313,13 +295,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
                     }
                 }
 
-                offset = retryUntilSuccessOrThrowOnTaskTimeout(
-                    () -> globalConsumer.position(topicPartition),
-                    String.format(
-                        "Failed to get position for partition %s. The broker may be transiently unavailable at the moment.",
-                        topicPartition
-                    )
-                );
+                offset = getGlobalConsumerOffset(topicPartition);
 
                 stateRestoreAdapter.restoreBatch(restoreRecords);
                 stateRestoreListener.onBatchRestored(topicPartition, storeName, offset, restoreRecords.size());
@@ -330,6 +306,16 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
         }
     }
 
+    private long getGlobalConsumerOffset(final TopicPartition topicPartition) {
+        return retryUntilSuccessOrThrowOnTaskTimeout(
+            () -> globalConsumer.position(topicPartition),
+            String.format(
+                "Failed to get position for partition %s. The broker may be transiently unavailable at the moment.",
+                topicPartition
+            )
+        );
+    }
+
     private <R> R retryUntilSuccessOrThrowOnTaskTimeout(final Supplier<R> supplier,
                                                         final String errorMessage) {
         long deadlineMs = NO_DEADLINE;
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
index 5706f67..00bec9f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
@@ -16,9 +16,13 @@
  */
 package org.apache.kafka.streams.integration;
 
+import java.io.File;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicReference;
 import kafka.utils.MockTime;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -36,9 +40,11 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -60,6 +66,7 @@ import java.util.Properties;
 
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 @RunWith(Parameterized.class)
 @Category({IntegrationTest.class})
@@ -113,12 +120,14 @@ public class GlobalKTableEOSIntegrationTest {
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
-        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0L);
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
         streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
+        streamsConfiguration.put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1L);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
         streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 300);
+        streamsConfiguration.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
         globalTable = builder.globalTable(
             globalTableTopic,
             Consumed.with(Serdes.Long(), Serdes.String()),
@@ -155,7 +164,7 @@ public class GlobalKTableEOSIntegrationTest {
 
         TestUtils.waitForCondition(
             () -> results.equals(expected),
-            30000L,
+            30_000L,
             () -> "waiting for initial values;" +
                 "\n  expected: " + expected +
                 "\n  received: " + results
@@ -183,7 +192,7 @@ public class GlobalKTableEOSIntegrationTest {
                 replicatedStore.all().forEachRemaining(pair -> globalState.put(pair.key, pair.value));
                 return globalState.equals(expectedState);
             },
-            30000,
+            30_000L,
             () -> "waiting for data in replicated store" +
                 "\n  expected: " + expectedState +
                 "\n  received: " + globalState
@@ -200,7 +209,7 @@ public class GlobalKTableEOSIntegrationTest {
 
         TestUtils.waitForCondition(
             () -> results.equals(expected),
-            30000L,
+            30_000L,
             () -> "waiting for final values" +
                 "\n  expected: " + expected +
                 "\n  received: " + results
@@ -223,7 +232,7 @@ public class GlobalKTableEOSIntegrationTest {
 
         TestUtils.waitForCondition(
             () -> results.equals(expected),
-            30000L,
+            30_000L,
             () -> "waiting for initial values" +
                 "\n  expected: " + expected +
                 "\n  received: " + results
@@ -251,7 +260,7 @@ public class GlobalKTableEOSIntegrationTest {
                 replicatedStore.all().forEachRemaining(pair -> globalState.put(pair.key, pair.value));
                 return globalState.equals(expectedState);
             },
-            30000,
+            30_000L,
             () -> "waiting for data in replicated store" +
                 "\n  expected: " + expectedState +
                 "\n  received: " + globalState
@@ -268,7 +277,7 @@ public class GlobalKTableEOSIntegrationTest {
 
         TestUtils.waitForCondition(
             () -> results.equals(expected),
-            30000L,
+            30_000L,
             () -> "waiting for final values" +
                 "\n  expected: " + expected +
                 "\n  received: " + results
@@ -302,13 +311,111 @@ public class GlobalKTableEOSIntegrationTest {
                 }
                 return result.equals(expected);
             },
-            30000L,
+            30_000L,
             () -> "waiting for initial values" +
                 "\n  expected: " + expected +
-                "\n  received: " + results
+                "\n  received: " + result
         );
     }
-    
+
+    @Test
+    public void shouldSkipOverTxMarkersOnRestore() throws Exception {
+        shouldSkipOverTxMarkersAndAbortedMessagesOnRestore(false);
+    }
+
+    @Test
+    public void shouldSkipOverAbortedMessagesOnRestore() throws Exception {
+        shouldSkipOverTxMarkersAndAbortedMessagesOnRestore(true);
+    }
+
+    private void shouldSkipOverTxMarkersAndAbortedMessagesOnRestore(final boolean appendAbortedMessages) throws Exception {
+        // records with key 1L, 2L, and 4L are written into partition-0
+        // record with key 3L is written into partition-1
+        produceInitialGlobalTableValues();
+
+        final String stateDir = streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG);
+        final File globalStateDir = new File(
+            stateDir
+                + File.separator
+                + streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
+                + File.separator
+                + "global");
+        assertTrue(globalStateDir.mkdirs());
+        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(globalStateDir, ".checkpoint"));
+
+        // set the checkpointed offset to the commit marker of partition-1
+        // even if `poll()` won't return any data for partition-1, we should still finish the restore
+        checkpoint.write(Collections.singletonMap(new TopicPartition(globalTableTopic, 1), 1L));
+
+        if (appendAbortedMessages) {
+            final AtomicReference<Exception> error = new AtomicReference<>();
+            startStreams(new StateRestoreListener() {
+                @Override
+                public void onRestoreStart(final TopicPartition topicPartition,
+                                           final String storeName,
+                                           final long startingOffset,
+                                           final long endingOffset) {
+                    // we need to write aborted messages only after we init the `highWatermark`
+                    // to move the `endOffset` beyond the `highWatermark
+                    //
+                    // we cannot write committed messages because we want to test the case that
+                    // poll() returns no records
+                    //
+                    // cf. GlobalStateManagerImpl#restoreState()
+                    try {
+                        produceAbortedMessages();
+                    } catch (final Exception fatal) {
+                        error.set(fatal);
+                    }
+                }
+
+                @Override
+                public void onBatchRestored(final TopicPartition topicPartition,
+                                            final String storeName,
+                                            final long batchEndOffset,
+                                            final long numRestored) { }
+
+                @Override
+                public void onRestoreEnd(final TopicPartition topicPartition,
+                                         final String storeName,
+                                         final long totalRestored) { }
+            });
+            final Exception fatal = error.get();
+            if (fatal != null) {
+                throw fatal;
+            }
+        } else {
+            startStreams();
+        }
+
+        final Map<Long, String> expected = new HashMap<>();
+        expected.put(1L, "A");
+        expected.put(2L, "B");
+        // skip record <3L, "C"> because we won't read it (cf checkpoint file above)
+        expected.put(4L, "D");
+
+        final ReadOnlyKeyValueStore<Long, String> store = IntegrationTestUtils
+            .getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore());
+        assertNotNull(store);
+
+        final Map<Long, String> storeContent = new HashMap<>();
+        TestUtils.waitForCondition(
+            () -> {
+                storeContent.clear();
+                final Iterator<KeyValue<Long, String>> it = store.all();
+                while (it.hasNext()) {
+                    final KeyValue<Long, String> kv = it.next();
+                    storeContent.put(kv.key, kv.value);
+                }
+                return storeContent.equals(expected);
+            },
+            30_000L,
+            () -> "waiting for initial values" +
+                "\n  expected: " + expected +
+                "\n  received: " + storeContent
+        );
+    }
+
     @Test
     public void shouldNotRestoreAbortedMessages() throws Exception {
         produceAbortedMessages();
@@ -327,17 +434,17 @@ public class GlobalKTableEOSIntegrationTest {
             .getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore());
         assertNotNull(store);
 
-        final Map<Long, String> result = new HashMap<>();
+        final Map<Long, String> storeContent = new HashMap<>();
         TestUtils.waitForCondition(
             () -> {
-                result.clear();
-                store.all().forEachRemaining(pair -> result.put(pair.key, pair.value));
-                return result.equals(expected);
+                storeContent.clear();
+                store.all().forEachRemaining(pair -> storeContent.put(pair.key, pair.value));
+                return storeContent.equals(expected);
             },
-            30000L,
+            30_000L,
             () -> "waiting for initial values" +
                 "\n  expected: " + expected +
-                "\n  received: " + results
+                "\n  received: " + storeContent
         );
     }
 
@@ -350,7 +457,12 @@ public class GlobalKTableEOSIntegrationTest {
     }
     
     private void startStreams() {
+        startStreams(null);
+    }
+
+    private void startStreams(final StateRestoreListener stateRestoreListener) {
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
+        kafkaStreams.setGlobalStateRestoreListener(stateRestoreListener);
         kafkaStreams.start();
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 584cc94..1859a09 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.MockConsumer;
@@ -81,7 +80,6 @@ import static org.junit.Assert.fail;
 
 public class GlobalStateManagerImplTest {
 
-
     private final MockTime time = new MockTime();
     private final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
     private final MockStateRestoreListener stateRestoreListener = new MockStateRestoreListener();
@@ -427,7 +425,7 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test
-    public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() throws IOException {
+    public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() {
         stateManager.initialize();
         initializeConsumer(1, 0, t1);
         stateManager.registerStore(new NoOpReadOnlyStore<Object, Object>(store1.name()) {
@@ -1123,7 +1121,7 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test
-    public void shouldUseRequestTimeoutPlusTaskTimeoutInPollDuringRestoreAndFailIfNoDataReturned() {
+    public void shouldUsePollMsPlusRequestTimeoutInPollDuringRestoreAndTimeoutWhenNoProgressDuringRestore() {
         consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
             @Override
             public synchronized ConsumerRecords<byte[], byte[]> poll(final Duration timeout) {
@@ -1144,62 +1142,7 @@ public class GlobalStateManagerImplTest {
         streamsConfig = new StreamsConfig(mkMap(
             mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
             mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"),
-            mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
-            mkEntry(StreamsConfig.POLL_MS_CONFIG, 5L),
-            mkEntry(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 10L),
-            mkEntry(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 100)
-        ));
-
-        stateManager = new GlobalStateManagerImpl(
-            new LogContext("mock"),
-            time,
-            topology,
-            consumer,
-            stateDirectory,
-            stateRestoreListener,
-            streamsConfig
-        );
-        processorContext.setStateManger(stateManager);
-        stateManager.setGlobalProcessorContext(processorContext);
-
-        final long startTime = time.milliseconds();
-        final TimeoutException exception = assertThrows(
-            TimeoutException.class,
-            () -> stateManager.initialize()
-        );
-        assertThat(
-            exception.getMessage(),
-            equalTo("Global task did not make progress to restore state within 10 ms. Adjust `task.timeout.ms` if needed.")
-        );
-
-        assertThat(time.milliseconds() - startTime, equalTo(110L));
-
-    }
-
-    @Test
-    public void shouldTimeoutWhenNoProgressDuringRestore() {
-        consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
-            @Override
-            public synchronized ConsumerRecords<byte[], byte[]> poll(final Duration timeout) {
-                time.sleep(1L);
-                return super.poll(timeout);
-            }
-        };
-
-        final HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
-        startOffsets.put(t1, 1L);
-        final HashMap<TopicPartition, Long> endOffsets = new HashMap<>();
-        endOffsets.put(t1, 3L);
-        consumer.updatePartitions(t1.topic(), Collections.singletonList(new PartitionInfo(t1.topic(), t1.partition(), null, null, null)));
-        consumer.assign(Collections.singletonList(t1));
-        consumer.updateBeginningOffsets(startOffsets);
-        consumer.updateEndOffsets(endOffsets);
-
-        streamsConfig = new StreamsConfig(mkMap(
-            mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
-            mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"),
-            mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
-            mkEntry(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 5L)
+            mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath())
         ));
 
         stateManager = new GlobalStateManagerImpl(
@@ -1222,9 +1165,9 @@ public class GlobalStateManagerImplTest {
         );
         assertThat(
             exception.getMessage(),
-            equalTo("Global task did not make progress to restore state within 5 ms. Adjust `task.timeout.ms` if needed.")
+            equalTo("Global task did not make progress to restore state within 301000 ms. Adjust `task.timeout.ms` if needed.")
         );
-        assertThat(time.milliseconds() - startTime, equalTo(1L));
+        assertThat(time.milliseconds() - startTime, equalTo(331_100L));
     }
 
     private void writeCorruptCheckpoint() throws IOException {