You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/18 17:28:57 UTC

[GitHub] [kafka] vvcephei opened a new pull request #8896: KAFKA-10185: Restoration info logging

vvcephei opened a new pull request #8896:
URL: https://github.com/apache/kafka/pull/8896


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #8896: KAFKA-10185: Restoration info logging

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #8896:
URL: https://github.com/apache/kafka/pull/8896


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8896: KAFKA-10185: Restoration info logging

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8896:
URL: https://github.com/apache/kafka/pull/8896#discussion_r442529975



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -496,8 +539,9 @@ private void bufferChangelogRecords(final ChangelogMetadata changelogMetadata, f
             } else {
                 changelogMetadata.bufferedRecords.add(record);
                 final long offset = record.offset();
-                if (changelogMetadata.restoreEndOffset == null || offset < changelogMetadata.restoreEndOffset)
+                if (changelogMetadata.restoreEndOffset == null || offset < changelogMetadata.restoreEndOffset) {
                     changelogMetadata.bufferedLimitIndex = changelogMetadata.bufferedRecords.size();
+                }

Review comment:
       I've rolled back a bunch of accidental formatting changes, but left the ones that are actually code style compliance issues (like using brackets around conditional bodies).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8896: KAFKA-10185: Restoration info logging

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8896:
URL: https://github.com/apache/kafka/pull/8896#discussion_r443005504



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
##########
@@ -223,6 +227,7 @@ public void shouldInitializeChangelogAndCheckForCompletion() {
     @Test
     public void shouldPollWithRightTimeout() {
         EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes();
+        EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L));

Review comment:
       I didn't think to do this... This might be equivocation, but it seems like if I wrote that in a code comment, it may or may not be true in the future. Looking at the tests, there are already like a dozen cryptic, redundant mocks, so I'm not sure justifying this one really makes a material impact on this test's readability, which is already approaching zero.
   
   Adding a comment like "this is just to prevent the logger from throwing an NPE" carries the risk that it can quickly become untrue in two ways:
   1. Maybe we remove or change the log so that it wouldn't need this mock; since it's a "nice" mock, we'll never know. In fact, I can't verify this call because the way the logger is configured only to print every ten seconds makes the NPE nondeterministic. Plus, it's not great to verify stuff that is beside the point of the test.
   2. Maybe we change the implementation so that it actually does exercise this mocked behavior, then the comment will become untrue, but we may not even notice.
   
   Typically, having this many specific and complex mocks in a test indicates that we shouldn't be using easymock, but instead configure the component with "dummy" state manager, etc. If we re-wrote this test to use that strategy, then we wouldn't need to make explicit expectations like this.
   
   Anyway, that's why I'm sort of inclined on just declaring bankruptcy on the comprehensibility of this test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8896: KAFKA-10185: Restoration info logging

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8896:
URL: https://github.com/apache/kafka/pull/8896#discussion_r442397523



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -415,19 +418,20 @@ public void restore() {
                 // for restoring active and updating standby we may prefer different poll time
                 // in order to make sure we call the main consumer#poll in time.
                 // TODO: once we move ChangelogReader to a separate thread this may no longer be a concern
-                polledRecords = restoreConsumer.poll(state.equals(ChangelogReaderState.STANDBY_UPDATING) ? Duration.ZERO : pollTime);
+                polledRecords = restoreConsumer.poll(state == ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime);

Review comment:
       trivial cleanup

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -415,19 +418,20 @@ public void restore() {
                 // for restoring active and updating standby we may prefer different poll time
                 // in order to make sure we call the main consumer#poll in time.
                 // TODO: once we move ChangelogReader to a separate thread this may no longer be a concern
-                polledRecords = restoreConsumer.poll(state.equals(ChangelogReaderState.STANDBY_UPDATING) ? Duration.ZERO : pollTime);
+                polledRecords = restoreConsumer.poll(state == ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime);
             } catch (final InvalidOffsetException e) {
-                log.warn("Encountered {} fetching records from restore consumer for partitions {}, it is likely that " +
+                log.warn("Encountered " + e.getClass().getName() +
+                    " fetching records from restore consumer for partitions " + e.partitions() + ", it is likely that " +
                     "the consumer's position has fallen out of the topic partition offset range because the topic was " +
                     "truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing" +
-                    " it later.", e.getClass().getName(), e.partitions());
+                    " it later.", e);

Review comment:
       Added the exception itself as the "cause" of the warning. The actual message of the IOE is actually pretty good at explaining the root cause.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -446,6 +450,38 @@ public void restore() {
             }
 
             maybeUpdateLimitOffsetsForStandbyChangelogs();
+
+            maybeLogRestorationProgress();

Review comment:
       This is the main change. Once every ten seconds, we will log the progress for each active restoring changelog.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -415,19 +418,20 @@ public void restore() {
                 // for restoring active and updating standby we may prefer different poll time
                 // in order to make sure we call the main consumer#poll in time.
                 // TODO: once we move ChangelogReader to a separate thread this may no longer be a concern
-                polledRecords = restoreConsumer.poll(state.equals(ChangelogReaderState.STANDBY_UPDATING) ? Duration.ZERO : pollTime);
+                polledRecords = restoreConsumer.poll(state == ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime);
             } catch (final InvalidOffsetException e) {
-                log.warn("Encountered {} fetching records from restore consumer for partitions {}, it is likely that " +
+                log.warn("Encountered " + e.getClass().getName() +
+                    " fetching records from restore consumer for partitions " + e.partitions() + ", it is likely that " +
                     "the consumer's position has fallen out of the topic partition offset range because the topic was " +
                     "truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing" +
-                    " it later.", e.getClass().getName(), e.partitions());
+                    " it later.", e);
 
                 final Map<TaskId, Collection<TopicPartition>> taskWithCorruptedChangelogs = new HashMap<>();
                 for (final TopicPartition partition : e.partitions()) {
                     final TaskId taskId = changelogs.get(partition).stateManager.taskId();
                     taskWithCorruptedChangelogs.computeIfAbsent(taskId, k -> new HashSet<>()).add(partition);
                 }
-                throw new TaskCorruptedException(taskWithCorruptedChangelogs);
+                throw new TaskCorruptedException(taskWithCorruptedChangelogs, e);

Review comment:
       Also added the cause to the thrown exception.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8896: KAFKA-10185: Restoration info logging

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8896:
URL: https://github.com/apache/kafka/pull/8896#discussion_r443004192



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -458,9 +462,48 @@ public void restore() {
             }
 
             maybeUpdateLimitOffsetsForStandbyChangelogs();
+
+            maybeLogRestorationProgress();
+        }
+    }
+
+    private void maybeLogRestorationProgress() {
+        if (state == ChangelogReaderState.ACTIVE_RESTORING) {
+            if (time.milliseconds() - lastRestoreLogTime > RESTORE_LOG_INTERVAL_MS) {
+                final Set<TopicPartition> topicPartitions = activeRestoringChangelogs();
+                if (!topicPartitions.isEmpty()) {
+                    final StringBuilder builder = new StringBuilder().append("Restoration in progress for ")
+                                                                     .append(topicPartitions.size())
+                                                                     .append(" partitions.");
+                    for (final TopicPartition partition : topicPartitions) {

Review comment:
       Actually I do have complaints about the StreamsPartitionAssignor log entries haha :)
   
   Anyways, I think `grep` a valid reason. My rationale was that when searching for this entry, most people would use "Restoration in progress for" and then manually check if the particular interested partition in the following line, but I guess I'm just biased because I'm not a heavy grep user.
   
   It is a quite nit comment and I won't feel strong about it. Your call.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8896: KAFKA-10185: Restoration info logging

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8896:
URL: https://github.com/apache/kafka/pull/8896#discussion_r442531300



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
##########
@@ -223,6 +227,7 @@ public void shouldInitializeChangelogAndCheckForCompletion() {
     @Test
     public void shouldPollWithRightTimeout() {
         EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes();
+        EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L));

Review comment:
       This is moderately obnoxious... The addition of logging these values means that these tests will get a NullPointerException unless we mock this call, but the mock is irrelevant to the test outcome.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8896: KAFKA-10185: Restoration info logging

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8896:
URL: https://github.com/apache/kafka/pull/8896#issuecomment-646823870


   Unrelated failure:
   `org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8896: KAFKA-10185: Restoration info logging

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8896:
URL: https://github.com/apache/kafka/pull/8896#discussion_r443000076



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -415,19 +418,20 @@ public void restore() {
                 // for restoring active and updating standby we may prefer different poll time
                 // in order to make sure we call the main consumer#poll in time.
                 // TODO: once we move ChangelogReader to a separate thread this may no longer be a concern
-                polledRecords = restoreConsumer.poll(state.equals(ChangelogReaderState.STANDBY_UPDATING) ? Duration.ZERO : pollTime);
+                polledRecords = restoreConsumer.poll(state == ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime);
             } catch (final InvalidOffsetException e) {
-                log.warn("Encountered {} fetching records from restore consumer for partitions {}, it is likely that " +
+                log.warn("Encountered " + e.getClass().getName() +
+                    " fetching records from restore consumer for partitions " + e.partitions() + ", it is likely that " +
                     "the consumer's position has fallen out of the topic partition offset range because the topic was " +
                     "truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing" +
-                    " it later.", e.getClass().getName(), e.partitions());
+                    " it later.", e);

Review comment:
       It is still there, on L424.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8896: KAFKA-10185: Restoration info logging

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8896:
URL: https://github.com/apache/kafka/pull/8896#discussion_r443001288



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -415,19 +418,20 @@ public void restore() {
                 // for restoring active and updating standby we may prefer different poll time
                 // in order to make sure we call the main consumer#poll in time.
                 // TODO: once we move ChangelogReader to a separate thread this may no longer be a concern
-                polledRecords = restoreConsumer.poll(state.equals(ChangelogReaderState.STANDBY_UPDATING) ? Duration.ZERO : pollTime);
+                polledRecords = restoreConsumer.poll(state == ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime);
             } catch (final InvalidOffsetException e) {
-                log.warn("Encountered {} fetching records from restore consumer for partitions {}, it is likely that " +
+                log.warn("Encountered " + e.getClass().getName() +
+                    " fetching records from restore consumer for partitions " + e.partitions() + ", it is likely that " +
                     "the consumer's position has fallen out of the topic partition offset range because the topic was " +
                     "truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing" +
-                    " it later.", e.getClass().getName(), e.partitions());
+                    " it later.", e);

Review comment:
       Ah got it, I'm still think about it as the string template and was overlooking that. SG.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8896: KAFKA-10185: Restoration info logging

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8896:
URL: https://github.com/apache/kafka/pull/8896#discussion_r442931356



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -458,9 +462,48 @@ public void restore() {
             }
 
             maybeUpdateLimitOffsetsForStandbyChangelogs();
+
+            maybeLogRestorationProgress();
+        }
+    }
+
+    private void maybeLogRestorationProgress() {
+        if (state == ChangelogReaderState.ACTIVE_RESTORING) {
+            if (time.milliseconds() - lastRestoreLogTime > RESTORE_LOG_INTERVAL_MS) {
+                final Set<TopicPartition> topicPartitions = activeRestoringChangelogs();
+                if (!topicPartitions.isEmpty()) {
+                    final StringBuilder builder = new StringBuilder().append("Restoration in progress for ")
+                                                                     .append(topicPartitions.size())
+                                                                     .append(" partitions.");
+                    for (final TopicPartition partition : topicPartitions) {

Review comment:
       nit: should we have a newline for each partition? Otherwise that ling maybe too long.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -415,19 +418,20 @@ public void restore() {
                 // for restoring active and updating standby we may prefer different poll time
                 // in order to make sure we call the main consumer#poll in time.
                 // TODO: once we move ChangelogReader to a separate thread this may no longer be a concern
-                polledRecords = restoreConsumer.poll(state.equals(ChangelogReaderState.STANDBY_UPDATING) ? Duration.ZERO : pollTime);
+                polledRecords = restoreConsumer.poll(state == ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime);
             } catch (final InvalidOffsetException e) {
-                log.warn("Encountered {} fetching records from restore consumer for partitions {}, it is likely that " +
+                log.warn("Encountered " + e.getClass().getName() +
+                    " fetching records from restore consumer for partitions " + e.partitions() + ", it is likely that " +
                     "the consumer's position has fallen out of the topic partition offset range because the topic was " +
                     "truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing" +
-                    " it later.", e.getClass().getName(), e.partitions());
+                    " it later.", e);

Review comment:
       The exception message may not always contain the `partitions()` list, maybe we should still print that as part of warn log?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
##########
@@ -223,6 +227,7 @@ public void shouldInitializeChangelogAndCheckForCompletion() {
     @Test
     public void shouldPollWithRightTimeout() {
         EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes();
+        EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L));

Review comment:
       This comment seems worth adding to the code :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8896: KAFKA-10185: Restoration info logging

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8896:
URL: https://github.com/apache/kafka/pull/8896#discussion_r443001442



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -458,9 +462,48 @@ public void restore() {
             }
 
             maybeUpdateLimitOffsetsForStandbyChangelogs();
+
+            maybeLogRestorationProgress();
+        }
+    }
+
+    private void maybeLogRestorationProgress() {
+        if (state == ChangelogReaderState.ACTIVE_RESTORING) {
+            if (time.milliseconds() - lastRestoreLogTime > RESTORE_LOG_INTERVAL_MS) {
+                final Set<TopicPartition> topicPartitions = activeRestoringChangelogs();
+                if (!topicPartitions.isEmpty()) {
+                    final StringBuilder builder = new StringBuilder().append("Restoration in progress for ")
+                                                                     .append(topicPartitions.size())
+                                                                     .append(" partitions.");
+                    for (final TopicPartition partition : topicPartitions) {

Review comment:
       I thought about it; while it does make the logs easier to read, it makes them harder to search (as in `grep`, since the line that would match the query doesn't contain all the information.
   
   We do have other places where we concatenate every topic-partition on a single line, eg in the StreamsPartitionAssignor, so I think if long lines were a problem, people would already be complaining.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org