You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/18 20:59:18 UTC

[GitHub] [kafka] mjsax commented on a change in pull request #10894: KAFKA-12951: restore must terminate for tx global topic

mjsax commented on a change in pull request #10894:
URL: https://github.com/apache/kafka/pull/10894#discussion_r654155821



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -288,6 +288,19 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback,
                 //  -> don't pass in `task.timeout.ms` to stay responsive if `KafkaStreams#close` gets called
                 final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(requestTimeoutPlusTaskTimeout);
                 if (records.isEmpty()) {
+                    // get consumer position to step over potential TX markers
+                    offset = retryUntilSuccessOrThrowOnTaskTimeout(
+                        () -> globalConsumer.position(topicPartition),
+                        String.format(
+                            "Failed to get position for partition %s. The broker may be transiently unavailable at the moment.",
+                            topicPartition
+                        )
+                    );
+
+                    if (offset == highWatermark) {

Review comment:
       Could it mask a bug? `offset` should never be larger than `highWatermark` ?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -288,6 +288,19 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback,
                 //  -> don't pass in `task.timeout.ms` to stay responsive if `KafkaStreams#close` gets called
                 final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(requestTimeoutPlusTaskTimeout);
                 if (records.isEmpty()) {
+                    // get consumer position to step over potential TX markers
+                    offset = retryUntilSuccessOrThrowOnTaskTimeout(
+                        () -> globalConsumer.position(topicPartition),
+                        String.format(
+                            "Failed to get position for partition %s. The broker may be transiently unavailable at the moment.",
+                            topicPartition
+                        )
+                    );
+
+                    if (offset == highWatermark) {

Review comment:
       Could it mask a bug? `offset` should never be larger than `highWatermark` ?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -288,6 +288,19 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback,
                 //  -> don't pass in `task.timeout.ms` to stay responsive if `KafkaStreams#close` gets called
                 final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(requestTimeoutPlusTaskTimeout);
                 if (records.isEmpty()) {
+                    // get consumer position to step over potential TX markers
+                    offset = retryUntilSuccessOrThrowOnTaskTimeout(
+                        () -> globalConsumer.position(topicPartition),
+                        String.format(
+                            "Failed to get position for partition %s. The broker may be transiently unavailable at the moment.",
+                            topicPartition
+                        )
+                    );
+
+                    if (offset == highWatermark) {

Review comment:
       Good catch. I also updated the tests to cover this case.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
##########
@@ -339,17 +446,17 @@ public void shouldNotRestoreAbortedMessages() throws Exception {
             .getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore());
         assertNotNull(store);
 
-        final Map<Long, String> result = new HashMap<>();
+        final Map<Long, String> storeContent = new HashMap<>();
         TestUtils.waitForCondition(
             () -> {
-                result.clear();
-                store.all().forEachRemaining(pair -> result.put(pair.key, pair.value));
-                return result.equals(expected);
+                storeContent.clear();
+                store.all().forEachRemaining(pair -> storeContent.put(pair.key, pair.value));
+                return storeContent.equals(expected);
             },
-            30000L,
+            30_000L,
             () -> "waiting for initial values" +
                 "\n  expected: " + expected +
-                "\n  received: " + results

Review comment:
       side fix: this should have be `result` (singular) not `results` plural to refer to the local variable instead of the global one. Renaming to avoid this issue in the future.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -288,6 +288,22 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback,
                 //  -> don't pass in `task.timeout.ms` to stay responsive if `KafkaStreams#close` gets called
                 final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(requestTimeoutPlusTaskTimeout);

Review comment:
       Ah. Good point. We would need to refactor this anyway (cf the long comment above). Seems it's time to do it now :) 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -288,6 +288,22 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback,
                 //  -> don't pass in `task.timeout.ms` to stay responsive if `KafkaStreams#close` gets called
                 final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(requestTimeoutPlusTaskTimeout);
                 if (records.isEmpty()) {
+                    // get consumer position to step over potential TX markers
+                    offset = retryUntilSuccessOrThrowOnTaskTimeout(
+                        () -> globalConsumer.position(topicPartition),
+                        String.format(

Review comment:
       Good idea!

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -262,34 +260,19 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback,
             stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark);
             long restoreCount = 0L;
 
-            while (offset < highWatermark) { // when we "fix" this loop (KAFKA-7380 / KAFKA-10317)
-                                             // we should update the `poll()` timeout below
-
-                // we ignore `poll.ms` config during bootstrapping phase and
-                // apply `request.timeout.ms` plus `task.timeout.ms` instead
-                //
-                // the reason is, that `poll.ms` might be too short to give a fetch request a fair chance
-                // to actually complete and we don't want to start `task.timeout.ms` too early
-                //
-                // we also pass `task.timeout.ms` into `poll()` directly right now as it simplifies our own code:
-                // if we don't pass it in, we would just track the timeout ourselves and call `poll()` again
-                // in our own retry loop; by passing the timeout we can reuse the consumer's internal retry loop instead
-                //
-                // note that using `request.timeout.ms` provides a conservative upper bound for the timeout;
-                // this implies that we might start `task.timeout.ms` "delayed" -- however, starting the timeout
-                // delayed is preferable (as it's more robust) than starting it too early
+            while (offset < highWatermark) {
+                // we add `request.timeout.ms` to `poll.ms` because `poll.ms` might be too short
+                // to give a fetch request a fair chance to actually complete and we don't want to
+                // start `task.timeout.ms` too early
                 //
-                // TODO https://issues.apache.org/jira/browse/KAFKA-10315
-                //   -> do a more precise timeout handling if `poll` would throw an exception if a fetch request fails
-                //      (instead of letting the consumer retry fetch requests silently)
-                //
-                // TODO https://issues.apache.org/jira/browse/KAFKA-10317 and
-                //      https://issues.apache.org/jira/browse/KAFKA-7380
-                //  -> don't pass in `task.timeout.ms` to stay responsive if `KafkaStreams#close` gets called
-                final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(requestTimeoutPlusTaskTimeout);
+                // TODO with https://issues.apache.org/jira/browse/KAFKA-10315 we can just call
+                //      `poll(pollMS)` without adding the request timeout and do a more precise
+                //      timeout handling
+                final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(pollMsPlusRequestTimeout);

Review comment:
       I still think we should pass `poll.ms` plus `request.timeout.ms` as long as we don't have https://issues.apache.org/jira/browse/KAFKA-10315
   
   This might still result in 30sec startup delay if `poll()` blocks as there is no data, but I think it's acceptable (at least compared to 5 minutes...)

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##########
@@ -1044,7 +1043,7 @@ public synchronized long position(final TopicPartition partition) {
     }
 
     @Test
-    public void shouldUseRequestTimeoutPlusTaskTimeoutInPollDuringRestoreAndFailIfNoDataReturned() {
+    public void shouldUsePollMsPlusRequestTimeoutInPollDuringRestoreAndTimeoutWhenNoProgressDuringRestore() {

Review comment:
       After the refactoring who we handle `task.timeout.ms` during global restore, this test and the next test did cover the exact some thing. So I removed one test to avoid redundancy.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##########
@@ -1044,7 +1043,7 @@ public synchronized long position(final TopicPartition partition) {
     }
 
     @Test
-    public void shouldUseRequestTimeoutPlusTaskTimeoutInPollDuringRestoreAndFailIfNoDataReturned() {
+    public void shouldUsePollMsPlusRequestTimeoutInPollDuringRestoreAndTimeoutWhenNoProgressDuringRestore() {

Review comment:
       After the refactoring who we handle `task.timeout.ms` during global restore, this test and the next test did cover the exact some thing. So I removed one test to avoid redundancy. (The diff is a little weird as it cut's out in the middle...)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org