You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "showuon (via GitHub)" <gi...@apache.org> on 2023/04/06 05:41:47 UTC

[GitHub] [kafka] showuon opened a new pull request, #13512: MINOR: fix stream failing tests

showuon opened a new pull request, #13512:
URL: https://github.com/apache/kafka/pull/13512

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #13512: MINOR: fix stream failing tests

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #13512:
URL: https://github.com/apache/kafka/pull/13512#discussion_r1159574976


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -690,7 +691,7 @@ private int restoreChangelog(final Task task, final ChangelogMetadata changelogM
             }
         }
 
-        if (numRecords > 0 || changelogMetadata.state().equals(ChangelogState.COMPLETED)) {
+        if (task != null && (numRecords > 0 || changelogMetadata.state().equals(ChangelogState.COMPLETED))) {

Review Comment:
   In https://github.com/apache/kafka/pull/13300/files#r1123945673 , we discussed if the null check is necessary, and then decided to drop it. But it turns out it's necessary. Otherwise, NPE will be thrown in some test cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lucasbru commented on pull request #13512: MINOR: fix stream failing tests

Posted by "lucasbru (via GitHub)" <gi...@apache.org>.
lucasbru commented on PR #13512:
URL: https://github.com/apache/kafka/pull/13512#issuecomment-1499193135

   I couldn't change this PR, so I added a separate PR for the last failing test: https://github.com/apache/kafka/pull/13519
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #13512: MINOR: fix stream failing tests

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #13512:
URL: https://github.com/apache/kafka/pull/13512#discussion_r1159580916


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java:
##########
@@ -254,6 +255,7 @@ public void shouldGetRecordLatenessSensor() {
     }
 
     @Test
+    @Ignore
     public void shouldGetDroppedRecordsSensor() {

Review Comment:
   It failed with "Expected to happen once, However, there were exactly 2 interactions with this mock:". Skip it first.
   
   https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/1738/testReport/junit/org.apache.kafka.streams.processor.internals.metrics/TaskMetricsTest/Build___JDK_8_and_Scala_2_12___shouldGetDroppedRecordsSensor/



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #13512: MINOR: fix stream failing tests

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #13512:
URL: https://github.com/apache/kafka/pull/13512#issuecomment-1499022896

   @lucasbru @guozhangwang @mjsax , I can see there's still a related test failed: 
   ```
   org.apache.kafka.streams.processor.internals.ReadOnlyTaskTest.shouldThrowUnsupportedOperationExceptionForForbiddenMethods
   ```
   But overall, it looks better now. I think we should merge this PR soon, and make further improvement/fix later. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #13512: MINOR: fix stream failing tests

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #13512:
URL: https://github.com/apache/kafka/pull/13512#discussion_r1159579368


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -987,14 +988,6 @@ private void prepareChangelogs(final Map<TaskId, Task> tasks,
                 } catch (final Exception e) {
                     throw new StreamsException("State restore listener failed on batch restored", e);
                 }
-
-                final TaskId taskId = changelogs.get(partition).stateManager.taskId();
-                final StreamTask task = (StreamTask) tasks.get(taskId);
-                // if the log is truncated between when we get the log end offset and when we get the
-                // consumer position, then it's possible that the difference become negative and there's actually
-                // no records to restore; in this case we just initialize the sensor to zero
-                final long recordsToRestore = Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L);
-                task.initRemainingRecordsToRestore(time, recordsToRestore);

Review Comment:
   With these lines, we got some errors related to "cannot cast the task to `StreamTask`". Might need to investigate why it will happen. Remove them first to fix the tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang merged pull request #13512: MINOR: fix stream failing tests

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang merged PR #13512:
URL: https://github.com/apache/kafka/pull/13512


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lucasbru commented on pull request #13512: MINOR: fix stream failing tests

Posted by "lucasbru (via GitHub)" <gi...@apache.org>.
lucasbru commented on PR #13512:
URL: https://github.com/apache/kafka/pull/13512#issuecomment-1499182228

   LGTM, but we will need @mjsax or @guozhangwang to merge this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #13512: MINOR: fix stream failing tests

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #13512:
URL: https://github.com/apache/kafka/pull/13512#discussion_r1159573303


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -644,14 +644,15 @@ private int restoreChangelog(final Task task, final ChangelogMetadata changelogM
         final int numRecords = changelogMetadata.bufferedLimitIndex;
 
         if (numRecords != 0) {
-            final List<ConsumerRecord<byte[], byte[]>> records = changelogMetadata.bufferedRecords.subList(0, numRecords);
+            final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>(changelogMetadata.bufferedRecords.subList(0, numRecords));

Review Comment:
   We'll get concurrent modification for the list exception in L660 `records.size()` since there are also other threads updating buffer records. Fixing it by creating a clone of the list, and remove all from the `bufferedRecords` below. We can optimize it later. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #13512: MINOR: fix stream failing tests

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #13512:
URL: https://github.com/apache/kafka/pull/13512#discussion_r1159573303


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -644,14 +644,15 @@ private int restoreChangelog(final Task task, final ChangelogMetadata changelogM
         final int numRecords = changelogMetadata.bufferedLimitIndex;
 
         if (numRecords != 0) {
-            final List<ConsumerRecord<byte[], byte[]>> records = changelogMetadata.bufferedRecords.subList(0, numRecords);
+            final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>(changelogMetadata.bufferedRecords.subList(0, numRecords));

Review Comment:
   We'll get concurrent modification for the list exception in L660 `records.size()`. Fixing it by creating a clone of the list, and remove all from the `bufferedRecords` below. We can optimize it later. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org