You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "sambhav-jain-16 (via GitHub)" <gi...@apache.org> on 2023/04/27 07:22:19 UTC

[GitHub] [kafka] sambhav-jain-16 opened a new pull request, #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

sambhav-jain-16 opened a new pull request, #13646:
URL: https://github.com/apache/kafka/pull/13646

   Fixing `org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest#testConnectorBoundary` test by using `consume()` in place of `consumeAll()` since we can specify the minimum records to consume in `consume()` which is sufficient to assert our use case.
   
   
   Ran the test successfully ~100 times
   ```
   BUILD SUCCESSFUL in 11s
   75 actionable tasks: 1 executed, 74 up-to-date
   Completed run: 110
   ```
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1602941862

   Thanks for the detailed analysis, everyone!
   
   ## Root cause
   
   I believe @sudeshwasnik's latest theory is correct: the Connect runtime invokes `SourceTask::commitRecord` even for records in aborted transactions, which causes `ConnectorHandle::awaitCommits` to return before the expected number of (non-aborted) records has been produced to Kafka.
   
   ## Consume-all implementation
   
   I haven't been able to find any issues with `EmbeddedKafkaCluster::consumeAll`. The use of the `read_uncommitted` isolation level for fetching end offsets and the `read_committed` isolation level for consuming is intentional, and mirrors logic we use elsewhere in the Connect runtime (see [TopicAdmin::endOffsets](https://github.com/apache/kafka/blob/9c8aaa2c35aabb09bd2d5c3d28d1b4587818b419/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java#L711), which is used by the `KafkaBasedLog` class when reading to the end of a topic, even if the log's consumer is configured with the `read_committed` isolation level). This ensures that, when reading to the end of a topic, we reach the end of any in-progress transactions on the topic.
   
   ## Consume vs. consume all
   
   With regards to the change in the PR--the concern with consuming a fixed number of records from the topic is that we can potentially see a gap in sequence numbers if the topic has multiple partitions, since we wouldn't be guaranteed to consume records in the same order they were produced (which is why I implemented and used `EmbeddedKafkaCluster::consumeAll` when writing these tests initially; you can find the discussion [here](https://github.com/apache/kafka/pull/11782#discussion_r912235126)).
   
   Could we stick with using `consumeAll` and instead bump the number of expected records/commits? I drafted this change locally and it seemed to work well:
   
   ```java
           // the connector aborts some transactions, which causes records that it has emitted (and for which
           // SourceTask::commitRecord has been invoked) to be invisible to consumers; we expect the task to
           // emit at most 233 records in total before 100 records have been emitted as part of one or more
           // committed transactions
           connectorHandle.expectedRecords(233);
           connectorHandle.expectedCommits(233);
   ```
   
   (This would replace the existing code [here](https://github.com/apache/kafka/blob/9c8aaa2c35aabb09bd2d5c3d28d1b4587818b419/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java#L397-L399).)
   
   ## Transaction size logging
   
   Also, as an aside--it was really helpful to know how many records were in each aborted/committed transaction while investigating this test. I tweaked `MonitorableSourceConnector.MonitorableSourceTask::maybeDefineTransactionBoundary` to provide this info; if you agree that it'd be useful, feel free to add it to this PR (regardless of which fix we use):
   
   ```java
           private void maybeDefineTransactionBoundary(SourceRecord record) {
               if (context.transactionContext() == null || seqno != nextTransactionBoundary) {
                   return;
               }
               long transactionSize = nextTransactionBoundary - priorTransactionBoundary;
               // If the transaction boundary ends on an even-numbered offset, abort it
               // Otherwise, commit
               boolean abort = nextTransactionBoundary % 2 == 0;
               calculateNextBoundary();
               if (abort) {
                   log.info("Aborting transaction of {} records", transactionSize);
                   context.transactionContext().abortTransaction(record);
               } else {
                   log.info("Committing transaction of {} records", transactionSize);
                   context.transactionContext().commitTransaction(record);
               }
           }
       }
   ```
   
   And in fact, if we believe this would be useful for all connectors, we could even add this kind of logging to the `ExactlyOnceWorkerSourceTask` class. But that should be done in a separate PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1591623006

   Thanks @sudeshwasnik , for the assessment. Continuing the line of thought, if you check the `MonitorableSourceConnector#poll` method, it appears to me that we decrement the `recordsRemainingLatch` latch equal to the number of records in the batch irrespective of the fact that whether the records are going to be part of a transaction that can be aborted here: https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java#L215-L216
   
   Because of this, the `awaitRecords` method is able to have the latch decremented enough number of times to be counted down to 0 within the timeout. 
   
   However, as you also rightly pointed out, the `recordToCommitLatch` is decremented for either cases of abort or commit of the txn which means `awaitCommits` also passes within the timeout.
   
   Could this make the test flaky since the bound on the `recordsRemainingLatch` doesn't seem to be strong enough to ensure that the actual number of records in the topic equals that? Would it help if we decrement the `recordsRemainingLatch` for cases of committed transactions only?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1562913728

   @sambhav-jain-16 thanks for looking into this. I'm prioritizing a few other KIP-related PRs at the moment but will try to make time for this sometime in the next couple of weeks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] sudeshwasnik commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

Posted by "sudeshwasnik (via GitHub)" <gi...@apache.org>.
sudeshwasnik commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1592302321

   hi @sagarrao12 , small correction in my comment earlier  - 
   ```
   Since assumption 1 is incorrect, we should change the test where it doesn't expect every record that decrements recordsToCommitLatch also must have been produced.
   ```
   it seems that `recordsRemainingLatch` only tries to wait until X records have been returned by source-task to framework. Say MINIMUM_MESSAGES = 1000, but MESSAGES_PER_POLL is configured 100, so we need to wait until source-task delivers 1000 messages to framework. This helps in not including `production` time in `awaitCommits` assertion timeout (?).... wdyt ? 
   
   Also, the reason this PR passes the test now is because,  it doesn't validate X records are present in topic `when` X countDown for `awaitCommits` is done. It'll wait `until` X records are produced into the topic -> by then there could've been many more `commitRecord`s.  
   This assertion doesn't help now because connector is running continously, so there `WILL` be MINIMUM_MESSAGES produced eventually. 
   ``
           assertTrue("Not enough records produced by source connector. Expected at least: " + MINIMUM_MESSAGES + " + but got " + sourceRecords.count(),
                   sourceRecords.count() >= MINIMUM_MESSAGES);
   ``
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1532658288

   Thanks @sambhav-jain-16 . So, one thing that I note is that the `readEndOffsets` method is using `IsolationLevel.READ_UNCOMMITTED` when reading the messages while the test uses `read_committed`. Ideally, read_uncommitted should yield more records than read_committed so this is a bit weird. Can you try setting `readEndOffsets` to use read_committed and see if that has the same behaviour as you described above?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] sambhav-jain-16 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

Posted by "sambhav-jain-16 (via GitHub)" <gi...@apache.org>.
sambhav-jain-16 commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1579930237

   Hi @C0urante, Did you get a chance to take a look at it? TIA


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] sambhav-jain-16 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

Posted by "sambhav-jain-16 (via GitHub)" <gi...@apache.org>.
sambhav-jain-16 commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1527055473

   >  How would this fix be any different from that i.e if consume can't get 100 messages within the timeout. Won't it still fail?
   
   Yes it will fail, but `consumeAll` is not failing due to timeout here but rather due to its nature of storing the end offsets before consuming.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] sambhav-jain-16 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

Posted by "sambhav-jain-16 (via GitHub)" <gi...@apache.org>.
sambhav-jain-16 commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1531223773

   hi @vamossagar12 
   > For starters, can you try increasing CONSUME_RECORDS_TIMEOUT_MS to 100s or as such and see if it even works? 
   
   Actually I tried it and it failed for me that's why I started to look into the `consumeAll` method.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] sambhav-jain-16 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

Posted by "sambhav-jain-16 (via GitHub)" <gi...@apache.org>.
sambhav-jain-16 commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1527052521

   @sagarrao12 
   The `consumeAll` method consumes the available records at the moment we start consuming, i.e. It will store the end offsets initially before consuming and then start to consume up until the end offsets stored (for each partition).
   The `consume` method has no such restriction, it will consume until it receives the specified number of records or the timeout.
   .
   .
   What I suspect is happening is that when the method is initially storing the offsets of the partitions, the connector hasn't produced 100 records till then and therefore the method doesn't consume fully even though messages are being produced by the connector.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1548015263

   Thanks @sambhav-jain-16 , @kirktrue , just to give more context, there are ITs in connect for EOS which rely upon the position() API to assert some messages ebing read. But they seem to have be failing now. What the tests do is to read the end offsets for Topic Partitions and keep consuming messages. The moment the last consumed offset exceeds the end offset, the test exits. Basically this block of code: https://github.com/apache/kafka/blob/512fd6e5cbc49371e9e761bc1973342f639abeb4/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L552-L580
   
   In this case, the assertion fails because even though the last consumed offsets zooms past the endOffset read upfront, the number of records aren't what we are expecting it to be (even though we have already ensured that those many number of records are produced successfully). We can go ahead and look at other ways of `fixing` this tests but wanted to understand if there's a fundamental thing which has changed or we are missing? 
   
   PS: this is the flaky test in question https://github.com/apache/kafka/blob/512fd6e5cbc49371e9e761bc1973342f639abeb4/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java#L376
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] sambhav-jain-16 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

Posted by "sambhav-jain-16 (via GitHub)" <gi...@apache.org>.
sambhav-jain-16 commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1547732780

   Hi @kirktrue 
   I have looked upon the changes from your refactoring PR [here](https://github.com/apache/kafka/pull/13425/files#diff-4d70ec8fc84ee669e88d436de64d66f0d621596d98e6a379df42ede4a0eaf23a). Although i couldn't find any changes in the business logic of the code from before, Can you confirm here that there are no changes in the logic of `fetchRecords()` method. TIA


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] sambhav-jain-16 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

Posted by "sambhav-jain-16 (via GitHub)" <gi...@apache.org>.
sambhav-jain-16 commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1557655038

   Hi @C0urante @kirktrue
   Can you PTAL at the above comments when available. TIA


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] sambhav-jain-16 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

Posted by "sambhav-jain-16 (via GitHub)" <gi...@apache.org>.
sambhav-jain-16 commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1547640800

   I was able to know the reason why the position reported by `consumer.position()` is more than the actual records consumed.
   
   Transactional producer produces commit messages into the user topic itself which is marked as a "Control" batch. However when these messages are consumed, the consumer deliberately skips these messages and doesn't add them to the returned `records` list. However, the offsets are moved forward to continue for further batches.
   
   https://github.com/apache/kafka/blob/fa7818dff5a28048401654a7497e56dbc988b755/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java#L214-L222
   
   Now the issue with `consumeAll` method is that it exits when it reaches particular offsets rather than consuming certain number of messages which `consume` does. I think if our use-case of this test  is to consume minimum messages rather than offsets, we should use `consume`. However, if the intention of `consumeAll` is to do something in the same lines (which looks like from the java doc of the method), we should change the `consumeAll` method. @C0urante can you comment on this?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

Posted by "yashmayya (via GitHub)" <gi...@apache.org>.
yashmayya commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1527292922

   Thanks @sambhav-jain-16 
   
   > What I suspect is happening is that when the method is initially storing the end offsets of the partitions, the connector hasn't produced 100 records till then and therefore the method doesn't consume fully even though messages are being produced by the connector.
   
   I'm not sure how this is is possible given that we're waiting for `MINIMUM_MESSAGES` to be committed first?https://github.com/apache/kafka/blob/c6ad151ac3bac0d8d1d6985d230eacaa170b8984/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java#L399-L410 (i.e. `SourceTask::commitRecord` is called `MINIMUM_MESSAGES` number of times).
   
   Records are "committed" only after the producer transaction is committed successfully -
   
   https://github.com/apache/kafka/blob/c6ad151ac3bac0d8d1d6985d230eacaa170b8984/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L302-L332


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] sambhav-jain-16 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

Posted by "sambhav-jain-16 (via GitHub)" <gi...@apache.org>.
sambhav-jain-16 commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1533239690

   Hi @vamossagar12,
   I can set that but actually `endoffsets` is getting populated with right values. The issue is with the way `lastConsumedOffset` is being used.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] sudeshwasnik commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

Posted by "sudeshwasnik (via GitHub)" <gi...@apache.org>.
sudeshwasnik commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1590627003

   hi @sambhav-jain-16 @C0urante , here's what i think may be happening, 
   the test is written in a way that , 
   1. Assumes/expects that when `recordsToCommitLatch` is decremented X times (in awaitCommits), there should be X records in topics. 
   2. Source connector is never stopped. It is continously running in the background, producing more records into the topic. 
   
   The test doesn't do `awaitCommit` and asserts consumed-records at the same time. If awaitCommit was checked at timestamp t1, and records were consumed at t2 ->  between (t1, t2) timegap there might have been more records produced into the topic. Thus the assertion is not strong enough. (we should try to stop source-connector at-or-before t1). IMO (t1,t2) is reason for flakiness, because assumption `1` is wrong itself. But may have passed `some` time. 
   
   Reason why assumption `1` is wrong, and we can't expect X records to be present in topic when `recordsToCommitLatch` is decremented X times. 
   
   Say r1, ... rY were sent by producer sucessfully in a transaction `txn1` (not completed yet). Then all r1..rY are stored in `[commitableRecords](https://github.com/apache/kafka/blob/9e74f91e56dbc06f17c95fe80dd3923f7b713457/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L194)` from this [line](https://github.com/apache/kafka/blob/3ddb62316f287d74b9649e1821c62fe8f68bc6e6/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L422). 
   Now, if `rY` says `txn1` [has to be aborted](https://github.com/apache/kafka/blob/9e74f91e56dbc06f17c95fe80dd3923f7b713457/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L461), `txn1` is aborted, and all records `r1, ..rY` are dropped. (essentially unreadble). 
   But we still return `true` ([link](https://github.com/apache/kafka/blob/9e74f91e56dbc06f17c95fe80dd3923f7b713457/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L467)) for `shouldCommitTransactionForRecord`. Which means we still try to do commitTransaction. In commitTransaction -> we end up calling `commitTaskRecord` for each record in `commitableRecords` (r1, .. rY).  For each such `commitRecord` - eventually recordsToCommitLatch is [decremented](https://github.com/apache/kafka/blob/3ae1afa43838066e44ea78918050c6780c208042/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java#L166-L170).
   So for the above case -> [r1, .. rY] were never produced (since no consumers can read them, they are from aborted transaction), ... but this [assertion will still be true](https://github.com/apache/kafka/blob/9e74f91e56dbc06f17c95fe80dd3923f7b713457/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java#L405). 
   
   Since assumption `1` is incorrect, we should change the test where it doesn't expect every record that decrements `recordsRemainingLatch` also must have been produced. 
   
   imo flakiness is just due to source connector running in background continuously producing records.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] sambhav-jain-16 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

Posted by "sambhav-jain-16 (via GitHub)" <gi...@apache.org>.
sambhav-jain-16 commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1532489606

   I think I get what is happening.
   The consumer is polling the records here https://github.com/apache/kafka/blob/512fd6e5cbc49371e9e761bc1973342f639abeb4/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L577
   
   However while getting the `lastConsumedOffset` https://github.com/apache/kafka/blob/512fd6e5cbc49371e9e761bc1973342f639abeb4/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L567
   
   The consumer might have consumed more records in the background meanwhile and that's why it is stopping prematurely.
   IMO we should fix the `consumeAll` method itself in this PR.
   @yashmayya @sagarrao12 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] sagarrao12 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

Posted by "sagarrao12 (via GitHub)" <gi...@apache.org>.
sagarrao12 commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1527038626

   Thanks @sambhav-jain-16 . This seems to have fixed the test as such going by the build but I am still curious as to why it used to fail expecting 100+ but got 72. How would this fix be any different from that i.e if consume can't get 100 messages within the timeout. Won't it still fail?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vamossagar12 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

Posted by "vamossagar12 (via GitHub)" <gi...@apache.org>.
vamossagar12 commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1527703722

   Yeah I agree with @yashmayya . Moreover this 
   
   ```
   Yes it will fail, but consumeAll is not failing due to timeout here but rather due to its nature of storing the end offsets before consuming.
   ```
   
   is not entirely correct i think. I agree what gets thrown in an AssertionError but thats because the number of sourceRecords returned by `consumeAll` didn't meet the desired number of records within 60s. For starters, can you try increasing `CONSUME_RECORDS_TIMEOUT_MS` to 100s or as such and see if it even works? Basically, we need to check if consumer is lagging or are enough records being produced? I i think  it would mostly be the former because as Yash said, we are anyways waiting for 100 records to be committed. It's not an ideal fix but let's first see if it works and if needed we can dig deeper.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] sambhav-jain-16 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

Posted by "sambhav-jain-16 (via GitHub)" <gi...@apache.org>.
sambhav-jain-16 commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1562368522

   Hi @kirktrue @C0urante 
   Can you PTAL at the above comments when available. TIA


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] sambhav-jain-16 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

Posted by "sambhav-jain-16 (via GitHub)" <gi...@apache.org>.
sambhav-jain-16 commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1632121695

   Thanks @C0urante for the response.
   
   I have applied the suggested changes. PTAL
   
   > And in fact, if we believe this would be useful for all connectors, we could even add this kind of logging to the ExactlyOnceWorkerSourceTask class. But that should be done in a separate PR.
   
   I'll create a ticket for the same and attach it in this PR comments'
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante merged pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante merged PR #13646:
URL: https://github.com/apache/kafka/pull/13646


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org