You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2021/06/28 21:56:34 UTC
[kafka] branch 2.8 updated: KAFKA-12951: restore must terminate for
tx global topic (#10894)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new bfb1fdb KAFKA-12951: restore must terminate for tx global topic (#10894)
bfb1fdb is described below
commit bfb1fdb9f7f9f547b42ded973a4ab68c81617fff
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Mon Jun 28 14:10:25 2021 -0700
KAFKA-12951: restore must terminate for tx global topic (#10894)
Reviewers: Guozhang Wang <gu...@confluent.io>, Luke Chen <sh...@gmail.com>, Gasparina Damien <d....@gmail.com>
---
.../internals/GlobalStateManagerImpl.java | 72 ++++------
.../GlobalKTableEOSIntegrationTest.java | 146 ++++++++++++++++++---
.../internals/GlobalStateManagerImplTest.java | 67 +---------
3 files changed, 163 insertions(+), 122 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index f10c707..08fa43c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -72,7 +72,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
private final FixedOrderMap<String, Optional<StateStore>> globalStores = new FixedOrderMap<>();
private final StateRestoreListener stateRestoreListener;
private InternalProcessorContext globalProcessorContext;
- private final Duration requestTimeoutPlusTaskTimeout;
+ private final Duration pollMsPlusRequestTimeout;
private final long taskTimeoutMs;
private final Set<String> globalNonPersistentStoresTopics = new HashSet<>();
private final OffsetCheckpoint checkpointFile;
@@ -112,9 +112,10 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
final int requestTimeoutMs = new ClientUtils.QuietConsumerConfig(consumerProps)
.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+ pollMsPlusRequestTimeout = Duration.ofMillis(
+ config.getLong(StreamsConfig.POLL_MS_CONFIG) + requestTimeoutMs
+ );
taskTimeoutMs = config.getLong(StreamsConfig.TASK_TIMEOUT_MS_CONFIG);
- requestTimeoutPlusTaskTimeout =
- Duration.ofMillis(requestTimeoutMs + taskTimeoutMs);
}
@Override
@@ -252,6 +253,8 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
final String storeName,
final RecordConverter recordConverter) {
for (final TopicPartition topicPartition : topicPartitions) {
+ long currentDeadline = NO_DEADLINE;
+
globalConsumer.assign(Collections.singletonList(topicPartition));
long offset;
final Long checkpoint = checkpointFileCache.get(topicPartition);
@@ -260,13 +263,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
offset = checkpoint;
} else {
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
- offset = retryUntilSuccessOrThrowOnTaskTimeout(
- () -> globalConsumer.position(topicPartition),
- String.format(
- "Failed to get position for partition %s. The broker may be transiently unavailable at the moment.",
- topicPartition
- )
- );
+ offset = getGlobalConsumerOffset(topicPartition);
}
final Long highWatermark = highWatermarks.get(topicPartition);
@@ -276,34 +273,19 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark);
long restoreCount = 0L;
- while (offset < highWatermark) { // when we "fix" this loop (KAFKA-7380 / KAFKA-10317)
- // we should update the `poll()` timeout below
-
- // we ignore `poll.ms` config during bootstrapping phase and
- // apply `request.timeout.ms` plus `task.timeout.ms` instead
- //
- // the reason is, that `poll.ms` might be too short to give a fetch request a fair chance
- // to actually complete and we don't want to start `task.timeout.ms` too early
- //
- // we also pass `task.timeout.ms` into `poll()` directly right now as it simplifies our own code:
- // if we don't pass it in, we would just track the timeout ourselves and call `poll()` again
- // in our own retry loop; by passing the timeout we can reuse the consumer's internal retry loop instead
- //
- // note that using `request.timeout.ms` provides a conservative upper bound for the timeout;
- // this implies that we might start `task.timeout.ms` "delayed" -- however, starting the timeout
- // delayed is preferable (as it's more robust) than starting it too early
- //
- // TODO https://issues.apache.org/jira/browse/KAFKA-10315
- // -> do a more precise timeout handling if `poll` would throw an exception if a fetch request fails
- // (instead of letting the consumer retry fetch requests silently)
+ while (offset < highWatermark) {
+ // we add `request.timeout.ms` to `poll.ms` because `poll.ms` might be too short
+ // to give a fetch request a fair chance to actually complete and we don't want to
+ // start `task.timeout.ms` too early
//
- // TODO https://issues.apache.org/jira/browse/KAFKA-10317 and
- // https://issues.apache.org/jira/browse/KAFKA-7380
- // -> don't pass in `task.timeout.ms` to stay responsive if `KafkaStreams#close` gets called
- final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(requestTimeoutPlusTaskTimeout);
+ // TODO with https://issues.apache.org/jira/browse/KAFKA-10315 we can just call
+ // `poll(pollMS)` without adding the request timeout and do a more precise
+ // timeout handling
+ final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(pollMsPlusRequestTimeout);
if (records.isEmpty()) {
- // this will always throw
- maybeUpdateDeadlineOrThrow(time.milliseconds());
+ currentDeadline = maybeUpdateDeadlineOrThrow(currentDeadline);
+ } else {
+ currentDeadline = NO_DEADLINE;
}
final List<ConsumerRecord<byte[], byte[]>> restoreRecords = new ArrayList<>();
@@ -313,13 +295,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
}
}
- offset = retryUntilSuccessOrThrowOnTaskTimeout(
- () -> globalConsumer.position(topicPartition),
- String.format(
- "Failed to get position for partition %s. The broker may be transiently unavailable at the moment.",
- topicPartition
- )
- );
+ offset = getGlobalConsumerOffset(topicPartition);
stateRestoreAdapter.restoreBatch(restoreRecords);
stateRestoreListener.onBatchRestored(topicPartition, storeName, offset, restoreRecords.size());
@@ -330,6 +306,16 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
}
}
+ private long getGlobalConsumerOffset(final TopicPartition topicPartition) {
+ return retryUntilSuccessOrThrowOnTaskTimeout(
+ () -> globalConsumer.position(topicPartition),
+ String.format(
+ "Failed to get position for partition %s. The broker may be transiently unavailable at the moment.",
+ topicPartition
+ )
+ );
+ }
+
private <R> R retryUntilSuccessOrThrowOnTaskTimeout(final Supplier<R> supplier,
final String errorMessage) {
long deadlineMs = NO_DEADLINE;
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
index 5706f67..00bec9f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
@@ -16,9 +16,13 @@
*/
package org.apache.kafka.streams.integration;
+import java.io.File;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicReference;
import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -36,9 +40,11 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@@ -60,6 +66,7 @@ import java.util.Properties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
@@ -113,12 +120,14 @@ public class GlobalKTableEOSIntegrationTest {
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
- streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
- streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+ streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0L);
+ streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
+ streamsConfiguration.put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1L);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 300);
+ streamsConfiguration.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
globalTable = builder.globalTable(
globalTableTopic,
Consumed.with(Serdes.Long(), Serdes.String()),
@@ -155,7 +164,7 @@ public class GlobalKTableEOSIntegrationTest {
TestUtils.waitForCondition(
() -> results.equals(expected),
- 30000L,
+ 30_000L,
() -> "waiting for initial values;" +
"\n expected: " + expected +
"\n received: " + results
@@ -183,7 +192,7 @@ public class GlobalKTableEOSIntegrationTest {
replicatedStore.all().forEachRemaining(pair -> globalState.put(pair.key, pair.value));
return globalState.equals(expectedState);
},
- 30000,
+ 30_000L,
() -> "waiting for data in replicated store" +
"\n expected: " + expectedState +
"\n received: " + globalState
@@ -200,7 +209,7 @@ public class GlobalKTableEOSIntegrationTest {
TestUtils.waitForCondition(
() -> results.equals(expected),
- 30000L,
+ 30_000L,
() -> "waiting for final values" +
"\n expected: " + expected +
"\n received: " + results
@@ -223,7 +232,7 @@ public class GlobalKTableEOSIntegrationTest {
TestUtils.waitForCondition(
() -> results.equals(expected),
- 30000L,
+ 30_000L,
() -> "waiting for initial values" +
"\n expected: " + expected +
"\n received: " + results
@@ -251,7 +260,7 @@ public class GlobalKTableEOSIntegrationTest {
replicatedStore.all().forEachRemaining(pair -> globalState.put(pair.key, pair.value));
return globalState.equals(expectedState);
},
- 30000,
+ 30_000L,
() -> "waiting for data in replicated store" +
"\n expected: " + expectedState +
"\n received: " + globalState
@@ -268,7 +277,7 @@ public class GlobalKTableEOSIntegrationTest {
TestUtils.waitForCondition(
() -> results.equals(expected),
- 30000L,
+ 30_000L,
() -> "waiting for final values" +
"\n expected: " + expected +
"\n received: " + results
@@ -302,13 +311,111 @@ public class GlobalKTableEOSIntegrationTest {
}
return result.equals(expected);
},
- 30000L,
+ 30_000L,
() -> "waiting for initial values" +
"\n expected: " + expected +
- "\n received: " + results
+ "\n received: " + result
);
}
-
+
+ @Test
+ public void shouldSkipOverTxMarkersOnRestore() throws Exception {
+ shouldSkipOverTxMarkersAndAbortedMessagesOnRestore(false);
+ }
+
+ @Test
+ public void shouldSkipOverAbortedMessagesOnRestore() throws Exception {
+ shouldSkipOverTxMarkersAndAbortedMessagesOnRestore(true);
+ }
+
+ private void shouldSkipOverTxMarkersAndAbortedMessagesOnRestore(final boolean appendAbortedMessages) throws Exception {
+ // records with key 1L, 2L, and 4L are written into partition-0
+ // record with key 3L is written into partition-1
+ produceInitialGlobalTableValues();
+
+ final String stateDir = streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG);
+ final File globalStateDir = new File(
+ stateDir
+ + File.separator
+ + streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
+ + File.separator
+ + "global");
+ assertTrue(globalStateDir.mkdirs());
+ final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(globalStateDir, ".checkpoint"));
+
+ // set the checkpointed offset to the commit marker of partition-1
+ // even if `poll()` won't return any data for partition-1, we should still finish the restore
+ checkpoint.write(Collections.singletonMap(new TopicPartition(globalTableTopic, 1), 1L));
+
+ if (appendAbortedMessages) {
+ final AtomicReference<Exception> error = new AtomicReference<>();
+ startStreams(new StateRestoreListener() {
+ @Override
+ public void onRestoreStart(final TopicPartition topicPartition,
+ final String storeName,
+ final long startingOffset,
+ final long endingOffset) {
+ // we need to write aborted messages only after we init the `highWatermark`
+ // to move the `endOffset` beyond the `highWatermark
+ //
+ // we cannot write committed messages because we want to test the case that
+ // poll() returns no records
+ //
+ // cf. GlobalStateManagerImpl#restoreState()
+ try {
+ produceAbortedMessages();
+ } catch (final Exception fatal) {
+ error.set(fatal);
+ }
+ }
+
+ @Override
+ public void onBatchRestored(final TopicPartition topicPartition,
+ final String storeName,
+ final long batchEndOffset,
+ final long numRestored) { }
+
+ @Override
+ public void onRestoreEnd(final TopicPartition topicPartition,
+ final String storeName,
+ final long totalRestored) { }
+ });
+ final Exception fatal = error.get();
+ if (fatal != null) {
+ throw fatal;
+ }
+ } else {
+ startStreams();
+ }
+
+ final Map<Long, String> expected = new HashMap<>();
+ expected.put(1L, "A");
+ expected.put(2L, "B");
+ // skip record <3L, "C"> because we won't read it (cf checkpoint file above)
+ expected.put(4L, "D");
+
+ final ReadOnlyKeyValueStore<Long, String> store = IntegrationTestUtils
+ .getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore());
+ assertNotNull(store);
+
+ final Map<Long, String> storeContent = new HashMap<>();
+ TestUtils.waitForCondition(
+ () -> {
+ storeContent.clear();
+ final Iterator<KeyValue<Long, String>> it = store.all();
+ while (it.hasNext()) {
+ final KeyValue<Long, String> kv = it.next();
+ storeContent.put(kv.key, kv.value);
+ }
+ return storeContent.equals(expected);
+ },
+ 30_000L,
+ () -> "waiting for initial values" +
+ "\n expected: " + expected +
+ "\n received: " + storeContent
+ );
+ }
+
@Test
public void shouldNotRestoreAbortedMessages() throws Exception {
produceAbortedMessages();
@@ -327,17 +434,17 @@ public class GlobalKTableEOSIntegrationTest {
.getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore());
assertNotNull(store);
- final Map<Long, String> result = new HashMap<>();
+ final Map<Long, String> storeContent = new HashMap<>();
TestUtils.waitForCondition(
() -> {
- result.clear();
- store.all().forEachRemaining(pair -> result.put(pair.key, pair.value));
- return result.equals(expected);
+ storeContent.clear();
+ store.all().forEachRemaining(pair -> storeContent.put(pair.key, pair.value));
+ return storeContent.equals(expected);
},
- 30000L,
+ 30_000L,
() -> "waiting for initial values" +
"\n expected: " + expected +
- "\n received: " + results
+ "\n received: " + storeContent
);
}
@@ -350,7 +457,12 @@ public class GlobalKTableEOSIntegrationTest {
}
private void startStreams() {
+ startStreams(null);
+ }
+
+ private void startStreams(final StateRestoreListener stateRestoreListener) {
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
+ kafkaStreams.setGlobalStateRestoreListener(stateRestoreListener);
kafkaStreams.start();
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 584cc94..1859a09 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MockConsumer;
@@ -81,7 +80,6 @@ import static org.junit.Assert.fail;
public class GlobalStateManagerImplTest {
-
private final MockTime time = new MockTime();
private final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
private final MockStateRestoreListener stateRestoreListener = new MockStateRestoreListener();
@@ -427,7 +425,7 @@ public class GlobalStateManagerImplTest {
}
@Test
- public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() throws IOException {
+ public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() {
stateManager.initialize();
initializeConsumer(1, 0, t1);
stateManager.registerStore(new NoOpReadOnlyStore<Object, Object>(store1.name()) {
@@ -1123,7 +1121,7 @@ public class GlobalStateManagerImplTest {
}
@Test
- public void shouldUseRequestTimeoutPlusTaskTimeoutInPollDuringRestoreAndFailIfNoDataReturned() {
+ public void shouldUsePollMsPlusRequestTimeoutInPollDuringRestoreAndTimeoutWhenNoProgressDuringRestore() {
consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
@Override
public synchronized ConsumerRecords<byte[], byte[]> poll(final Duration timeout) {
@@ -1144,62 +1142,7 @@ public class GlobalStateManagerImplTest {
streamsConfig = new StreamsConfig(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"),
- mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
- mkEntry(StreamsConfig.POLL_MS_CONFIG, 5L),
- mkEntry(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 10L),
- mkEntry(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 100)
- ));
-
- stateManager = new GlobalStateManagerImpl(
- new LogContext("mock"),
- time,
- topology,
- consumer,
- stateDirectory,
- stateRestoreListener,
- streamsConfig
- );
- processorContext.setStateManger(stateManager);
- stateManager.setGlobalProcessorContext(processorContext);
-
- final long startTime = time.milliseconds();
- final TimeoutException exception = assertThrows(
- TimeoutException.class,
- () -> stateManager.initialize()
- );
- assertThat(
- exception.getMessage(),
- equalTo("Global task did not make progress to restore state within 10 ms. Adjust `task.timeout.ms` if needed.")
- );
-
- assertThat(time.milliseconds() - startTime, equalTo(110L));
-
- }
-
- @Test
- public void shouldTimeoutWhenNoProgressDuringRestore() {
- consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
- @Override
- public synchronized ConsumerRecords<byte[], byte[]> poll(final Duration timeout) {
- time.sleep(1L);
- return super.poll(timeout);
- }
- };
-
- final HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
- startOffsets.put(t1, 1L);
- final HashMap<TopicPartition, Long> endOffsets = new HashMap<>();
- endOffsets.put(t1, 3L);
- consumer.updatePartitions(t1.topic(), Collections.singletonList(new PartitionInfo(t1.topic(), t1.partition(), null, null, null)));
- consumer.assign(Collections.singletonList(t1));
- consumer.updateBeginningOffsets(startOffsets);
- consumer.updateEndOffsets(endOffsets);
-
- streamsConfig = new StreamsConfig(mkMap(
- mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
- mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"),
- mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
- mkEntry(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 5L)
+ mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath())
));
stateManager = new GlobalStateManagerImpl(
@@ -1222,9 +1165,9 @@ public class GlobalStateManagerImplTest {
);
assertThat(
exception.getMessage(),
- equalTo("Global task did not make progress to restore state within 5 ms. Adjust `task.timeout.ms` if needed.")
+ equalTo("Global task did not make progress to restore state within 301000 ms. Adjust `task.timeout.ms` if needed.")
);
- assertThat(time.milliseconds() - startTime, equalTo(1L));
+ assertThat(time.milliseconds() - startTime, equalTo(331_100L));
}
private void writeCorruptCheckpoint() throws IOException {