You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by GitBox <gi...@apache.org> on 2020/03/10 03:00:26 UTC

[GitHub] [beam] jfarr opened a new pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

jfarr opened a new pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090
 
 
   I believe that the way this test was written it may have been sensitive to the order of concurrent operations. This implementation should not be. r: @aromanenko-dev 
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-598122073
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev removed a comment on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev removed a comment on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-601097324
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev edited a comment on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev edited a comment on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-599641222
 
 
   > The test case waits for 5 elements from shardReadersPool:
   
   I think I disagree here - `fetchedRecords` will contain only 4 elements maximum. And they always  will be only "a", "b", "c" and "d" but maybe in different order. Empty records won't be returned by `shardReadersPool.nextRecord()`, however they will be passed to `rateLimiter.onSuccess()` as an argument (which is unnecessary I think). 
   Let me explain how I understand this test. @jfarr , please correct me if I'm wrong. Actually, we need this loop running until `fetchedRecords.size() < 4` only to make sure that 2 reader threads did read all these 4 records. Than we assert that `rateLimiter` worked correctly during this since this is a real purpose of this test.
   
   I think that new version of this test with timeout (last commit) should be fine. I tested it locally several times with 10k runs and no fails for the moment.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] suztomo edited a comment on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
suztomo edited a comment on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-598926854
 
 
   In the test case, the two iterators return the following data:
   
   - firstIterator: `throw(e)`;  `[a, b]`;   `[]`;   `[]`;   `[]`;   `[]` ...
   - secondIterator: `[c]`;   `[d]`;   `[]`;   `[]`;   `[]` ...
   
   The test case waits for 5 elements from shardReadersPool:
   
   ```
       List<KinesisRecord> fetchedRecords = new ArrayList<>();
       while (fetchedRecords.size() < 4) {
         CustomOptional<KinesisRecord> nextRecord = shardReadersPool.nextRecord();
         if (nextRecord.isPresent()) {
           fetchedRecords.add(nextRecord.get());
         }
       }
   ```
   
   It does not care what are the content of 5 elements.
   
   The test's expectation is to capture the following 4 elements:
   
   ```
       verify(customRateLimitPolicy).onThrottle(same(e));
       verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
       verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
       verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
   ```
   
   My assumption: there's no guarantee that 2 elements from firstIterator and secondIterator are served evenly when 5 elements are consumed from them.
   
   My suggestion: make the while loop until it confirms the expected 5 elements (at least [a,b] and [d]) , not just the counting of them, are in `fetchedRecords` variable.
   
    (Sorry if this is missing the point, I wrote this without checking @jfarr 's updated solution)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-601102227
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jfarr commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
jfarr commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r390700972
 
 

 ##########
 File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
 ##########
 @@ -324,10 +325,15 @@ public void shouldCallRateLimitPolicy()
       }
     }
 
+    ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class);
+    verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture());
+    List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+    assertThat(capturedRecords).contains(
+        ImmutableList.of(a, b),
+        singletonList(c),
+        singletonList(d),
+        Collections.emptyList()
+    );
     verify(customRateLimitPolicy).onThrottle(same(e));
-    verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
 
 Review comment:
   @suztomo My thought is that Mockito is expecting to see these parameters in this order, which may not always be the case due to the 2 ShardReadersPool worker threads. My suggested fix is to rewrite the Mockito verifications so that they are definitely not dependent on order.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev removed a comment on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev removed a comment on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-601117023
 
 
   Run Java_Examples_Dataflow PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-599641222
 
 
   > The test case waits for 5 elements from shardReadersPool:
   
   I think I disagree here - `fetchedRecords` will contain only 4 elements maximum. And they will always only "a", "b", "c" and "d" but maybe in different order. Empty records won't be returned by `shardReadersPool.nextRecord()`, however they will be passed to `rateLimiter.onSuccess()` as an argument (which is unnecessary I think). 
   Let me explain how I understand this test. @jfarr , please correct me if I'm wrong. Actually, we need this loop running until `fetchedRecords.size() < 4` only to make sure that 2 reader threads did read all these 4 records. Than we assert that `rateLimiter` worked correctly during this since this is a real purpose of this test.
   
   I think that new version of this test with timeout (last commit) should be fine. I tested it locally several times with 10k runs and no fails for the moment.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r393165574
 
 

 ##########
 File path: sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
 ##########
 @@ -126,9 +126,21 @@ void startReadingShards(Iterable<ShardRecordsIterator> shardRecordsIterators) {
   private void readLoop(ShardRecordsIterator shardRecordsIterator, RateLimitPolicy rateLimiter) {
     while (poolOpened.get()) {
       try {
-        List<KinesisRecord> kinesisRecords;
         try {
-          kinesisRecords = shardRecordsIterator.readNextBatch();
+          List<KinesisRecord> kinesisRecords = shardRecordsIterator.readNextBatch();
+          try {
+            for (KinesisRecord kinesisRecord : kinesisRecords) {
+              recordsQueue.put(kinesisRecord);
+              numberOfRecordsInAQueueByShard.get(kinesisRecord.getShardId()).incrementAndGet();
+            }
+          } finally {
+            // One of the paths into this finally block is recordsQueue.put() throwing
+            // InterruptedException so we should check the thread's interrupted status before
+            // calling onSuccess().
+            if (!Thread.currentThread().isInterrupted()) {
+              rateLimiter.onSuccess(kinesisRecords);
 
 Review comment:
   Do we really need to pass an empty list of records if it's happen to receive from `shardRecordsIterator.readNextBatch()`?  

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] suztomo commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
suztomo commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r390376749
 
 

 ##########
 File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
 ##########
 @@ -324,10 +325,15 @@ public void shouldCallRateLimitPolicy()
       }
     }
 
+    ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class);
+    verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture());
+    List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+    assertThat(capturedRecords).contains(
+        ImmutableList.of(a, b),
+        singletonList(c),
+        singletonList(d),
+        Collections.emptyList()
+    );
     verify(customRateLimitPolicy).onThrottle(same(e));
-    verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
 
 Review comment:
   Would you share your thought on in why customRateLimitPolicy did not take `[d]` even when it took `[a, b]` and `[c]`?
   I'd like to know more about the problem in "the order of concurrent operations" in this test case.
   
   https://builds.apache.org/job/beam_PreCommit_Java_Commit/10277/testReport/junit/org.apache.beam.sdk.io.kinesis/ShardReadersPoolTest/shouldCallRateLimitPolicy/
   
   > Argument(s) are different! Wanted:
   > customRateLimitPolicy.onSuccess(
   >    [d]
   > );
   > -> at org.apache.beam.sdk.io.kinesis.ShardReadersPoolTest.shouldCallRateLimitPolicy(ShardReadersPoolTest.java:330)
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] suztomo commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
suztomo commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r391685329
 
 

 ##########
 File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
 ##########
 @@ -324,10 +325,15 @@ public void shouldCallRateLimitPolicy()
       }
     }
 
+    ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class);
+    verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture());
+    List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+    assertThat(capturedRecords).contains(
+        ImmutableList.of(a, b),
+        singletonList(c),
+        singletonList(d),
+        Collections.emptyList()
+    );
     verify(customRateLimitPolicy).onThrottle(same(e));
-    verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
 
 Review comment:
   @aromanenko-dev Thank you very much. I can now reproduce the problem.
   
   ![image](https://user-images.githubusercontent.com/28604/76535379-23902800-6451-11ea-8803-e0aedd85c15b.png)
   
   Everyone is in different time zones.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jfarr commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
jfarr commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r391914284
 
 

 ##########
 File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
 ##########
 @@ -324,10 +325,15 @@ public void shouldCallRateLimitPolicy()
       }
     }
 
+    ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class);
+    verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture());
+    List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+    assertThat(capturedRecords).contains(
+        ImmutableList.of(a, b),
+        singletonList(c),
+        singletonList(d),
+        Collections.emptyList()
+    );
     verify(customRateLimitPolicy).onThrottle(same(e));
-    verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
 
 Review comment:
   OK, I see what's happening here. It's basically a race condition. This loop will exit as soon as nextRecord() returns the last record:
   https://github.com/apache/beam/blob/b0e7afbc3baf4ac17f7159cb8aa1fabe326be3e2/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java#L320-L325
   
   I believe that sometimes the test is hitting the verify() call for that last record in the split second between shardReadersPool putting the record into the blocking queue and calling onSuccess(). The fix is to use Mockito's timeout() to ensure that we wait long enough for that last onSuccess() call. It took me about 30,000 iterations to reproduce with the original code but I have run the new test code over 100,000 times without a failure.
   
   I do still think it's a good idea to make these changes to ShardReadersPool to ensure that onSuccess() always gets called (as long as the thread wasn't interrupted).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-601117023
 
 
   Run Java_Examples_Dataflow PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] suztomo edited a comment on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
suztomo edited a comment on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-598926854
 
 
   In the test case, the two iterators return the following data:
   
   - firstIterator: `throw(e)`;  `[a, b]`;   `[]`;   `[]`;   `[]`;   `[]` ...
   - secondIterator: `[c]`;   `[d]`;   `[]`;   `[]`;   `[]` ...
   
   The test case waits for 5 elements from shardReadersPool:
   
   ```
       List<KinesisRecord> fetchedRecords = new ArrayList<>();
       while (fetchedRecords.size() < 4) {
         CustomOptional<KinesisRecord> nextRecord = shardReadersPool.nextRecord();
         if (nextRecord.isPresent()) {
           fetchedRecords.add(nextRecord.get());
         }
       }
   ```
   
   The test's expectation was:
   
   ```
       verify(customRateLimitPolicy).onThrottle(same(e));
       verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
       verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
       verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
   ```
   
   My assumption: there's no guarantee that 2 elements from firstIterator and secondIterator are served evenly when 5 elements are consumed.
   
   My suggestion: make the while loop until it confirms the expected 5 elements are in `fetchedRecords` variable.
   
    (Sorry if this is missing the point, I wrote this without checking @jfarr 's updated solution)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] suztomo edited a comment on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
suztomo edited a comment on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-598926854
 
 
   In the test case, the two iterators return the following data:
   
   - firstIterator: `throw(e)`;  `[a, b]`;   `[]`;   `[]`;   `[]`;   `[]` ...
   - secondIterator: `[c]`;   `[d]`;   `[]`;   `[]`;   `[]` ...
   
   The test case waits for 5 elements from shardReadersPool:
   
   ```
       List<KinesisRecord> fetchedRecords = new ArrayList<>();
       while (fetchedRecords.size() < 4) {
         CustomOptional<KinesisRecord> nextRecord = shardReadersPool.nextRecord();
         if (nextRecord.isPresent()) {
           fetchedRecords.add(nextRecord.get());
         }
       }
   ```
   
   It does not care what are the content of 5 elements.
   
   The test's expectation is to capture the following 4 elements:
   
   ```
       verify(customRateLimitPolicy).onThrottle(same(e));
       verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
       verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
       verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
   ```
   
   My assumption: there's no guarantee that 2 elements from firstIterator and secondIterator are served evenly when 5 elements are consumed from them.
   
   My suggestion: make the while loop condition to confirm the expected 5 elements (at least [a,b] and [d]) , not just the counting of the number of items, are in `fetchedRecords` variable.
   
    (Sorry if this is missing the point, I wrote this without checking @jfarr 's updated solution)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-601121044
 
 
   Run Java_Examples_Dataflow PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] suztomo commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
suztomo commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r391365716
 
 

 ##########
 File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
 ##########
 @@ -324,10 +325,15 @@ public void shouldCallRateLimitPolicy()
       }
     }
 
+    ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class);
+    verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture());
+    List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+    assertThat(capturedRecords).contains(
+        ImmutableList.of(a, b),
+        singletonList(c),
+        singletonList(d),
+        Collections.emptyList()
+    );
     verify(customRateLimitPolicy).onThrottle(same(e));
-    verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
 
 Review comment:
   bug: sometimes customRateLimitPolicy does not receive items, "[d]" in the test case.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jfarr commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
jfarr commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r391390299
 
 

 ##########
 File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
 ##########
 @@ -324,10 +325,15 @@ public void shouldCallRateLimitPolicy()
       }
     }
 
+    ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class);
+    verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture());
+    List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+    assertThat(capturedRecords).contains(
+        ImmutableList.of(a, b),
+        singletonList(c),
+        singletonList(d),
+        Collections.emptyList()
+    );
     verify(customRateLimitPolicy).onThrottle(same(e));
-    verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
 
 Review comment:
   @aromanenko-dev If you could share with us how you got @RepeatedTest working I'd be happy to retest.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jfarr commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
jfarr commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r391291506
 
 

 ##########
 File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
 ##########
 @@ -324,10 +325,15 @@ public void shouldCallRateLimitPolicy()
       }
     }
 
+    ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class);
+    verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture());
+    List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+    assertThat(capturedRecords).contains(
+        ImmutableList.of(a, b),
+        singletonList(c),
+        singletonList(d),
+        Collections.emptyList()
+    );
     verify(customRateLimitPolicy).onThrottle(same(e));
-    verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
 
 Review comment:
   Yes, thanks Alexey. I was just working on something similar. Maybe the test is just overspecified then. I propose we change the validations to just:
   ```
   verify(customRateLimitPolicy).onThrottle(any(KinesisClientThrottledException.class));
   verify(customRateLimitPolicy, atLeastOnce()).onSuccess(any(List.class));
   ```
   If you both agree I'll make that change.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r391779556
 
 

 ##########
 File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
 ##########
 @@ -324,10 +325,15 @@ public void shouldCallRateLimitPolicy()
       }
     }
 
+    ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class);
+    verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture());
+    List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+    assertThat(capturedRecords).contains(
+        ImmutableList.of(a, b),
+        singletonList(c),
+        singletonList(d),
+        Collections.emptyList()
+    );
     verify(customRateLimitPolicy).onThrottle(same(e));
-    verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
 
 Review comment:
   I did some testing on my side and I think that the problem is with using the same mock of `RateLimitPolicy` for both `ShardRecordsIterator`s calling in different threads. For example, when failed assertion says that "d" record is absent in `capturedRecords`, actually I see (using debug output) that it was read with `shardRecordsIterator.readNextBatch()` in `readLoop()`.
   Since, in case of using the real `RateLimitPolicy` in normal code, the instances of them will be created per every `readLoop()` thread (right?). So, we need to test it in the same way with mock policies.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r391151296
 
 

 ##########
 File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
 ##########
 @@ -324,10 +325,15 @@ public void shouldCallRateLimitPolicy()
       }
     }
 
+    ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class);
+    verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture());
+    List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+    assertThat(capturedRecords).contains(
+        ImmutableList.of(a, b),
+        singletonList(c),
+        singletonList(d),
+        Collections.emptyList()
+    );
     verify(customRateLimitPolicy).onThrottle(same(e));
-    verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
 
 Review comment:
   I did run this test thousands of times locally (with `@RepeatedTest(10000)` and some hacks to run it on Junit5) and I've managed to reproduce it only twice with the similar but a bit different error as it was on Jenkins.  
   
   Jenkins error: https://gist.github.com/aromanenko-dev/f9714c3dd5a47a9439b9245f3ef7b04b
   My local error (old test version): https://gist.github.com/aromanenko-dev/9a8acf265b96e010ad0ad1d09bb3ab92
   
   The common thing there is that it starts to emit empty records list after it returned a list with `c` value.
   
   Also, I did the same thing for new version of test and it's also reproducible (2 fails over 1000 runs):
   My local error (new test version) 1st fail: https://gist.github.com/aromanenko-dev/faa4229a486117d7504e486282503009
   My local error (new test version) 2nd fail: https://gist.github.com/aromanenko-dev/751cd007bb89a4f6e67f1ce9f9ef3d25
   
   As you can see actually the errors with old and new versions of tests are similar. So, it seems that it's not related to `verify` using and more related to how every mock `ShardRecordsIterator` returns the records. For some reasons it starts to return just empty list.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev removed a comment on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev removed a comment on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-601121044
 
 
   Run Java_Examples_Dataflow PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r391151296
 
 

 ##########
 File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
 ##########
 @@ -324,10 +325,15 @@ public void shouldCallRateLimitPolicy()
       }
     }
 
+    ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class);
+    verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture());
+    List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+    assertThat(capturedRecords).contains(
+        ImmutableList.of(a, b),
+        singletonList(c),
+        singletonList(d),
+        Collections.emptyList()
+    );
     verify(customRateLimitPolicy).onThrottle(same(e));
-    verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
 
 Review comment:
   I did run this test thousands of times locally (with `@RepeatedTest(n)` and some hacks to run it on Junit5) and I've managed to reproduce it only twice with the similar but a bit different error as it was on Jenkins.  
   
   Jenkins error: https://gist.github.com/aromanenko-dev/f9714c3dd5a47a9439b9245f3ef7b04b
   My local error (old test version): https://gist.github.com/aromanenko-dev/9a8acf265b96e010ad0ad1d09bb3ab92
   
   The common thing there is that it starts to emit empty records list after it returned a list with `c` value.
   
   Also, I did the same thing for new version of test and it's also reproducible (2 fails over 1000 runs):
   My local error (new test version) 1st fail: https://gist.github.com/aromanenko-dev/faa4229a486117d7504e486282503009
   My local error (new test version) 2nd fail: https://gist.github.com/aromanenko-dev/751cd007bb89a4f6e67f1ce9f9ef3d25
   
   As you can see actually the errors with old and new versions of tests are similar. So, it seems that it's not related to `verify` using and more related to how every mock `ShardRecordsIterator` returns the records. For some reasons it starts to return just empty list.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev edited a comment on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev edited a comment on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-597027748
 
 
   Thanks! Please, fix Spotless issue and add `[BEAM-9470]` as a prefix to commit message.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-601124492
 
 
   Run Java_Examples_Dataflow PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jfarr commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
jfarr commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r391917813
 
 

 ##########
 File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
 ##########
 @@ -324,10 +325,15 @@ public void shouldCallRateLimitPolicy()
       }
     }
 
+    ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class);
+    verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture());
+    List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+    assertThat(capturedRecords).contains(
+        ImmutableList.of(a, b),
+        singletonList(c),
+        singletonList(d),
+        Collections.emptyList()
+    );
     verify(customRateLimitPolicy).onThrottle(same(e));
-    verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
 
 Review comment:
   > Since, in case of using the real RateLimitPolicy in normal code, the instances of them will be created per every readLoop() thread (right?). So, we need to test it in the same way with mock policies.
   
   It's possible for a custom RateLimitPolicyFactory to return a singleton instance of RateLimitPolicy, and I think that should be fine (as long as it's thread-safe). So maybe we shouldn't assume that they will always be different instances.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r391151296
 
 

 ##########
 File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
 ##########
 @@ -324,10 +325,15 @@ public void shouldCallRateLimitPolicy()
       }
     }
 
+    ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class);
+    verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture());
+    List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+    assertThat(capturedRecords).contains(
+        ImmutableList.of(a, b),
+        singletonList(c),
+        singletonList(d),
+        Collections.emptyList()
+    );
     verify(customRateLimitPolicy).onThrottle(same(e));
-    verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
 
 Review comment:
   I did run this test thousands of times locally (with `@RepeatedTest(n)` and some hacks to run it on Junit5) and I've managed to reproduce it only twice with the similar but a bit different error as it was on Jenkins.  
   
   Jenkins error: https://gist.github.com/aromanenko-dev/f9714c3dd5a47a9439b9245f3ef7b04b
   My local error (old test version): https://gist.github.com/aromanenko-dev/9a8acf265b96e010ad0ad1d09bb3ab92
   
   The common thing there is that it starts to emit empty records list after it returned a list with `c` value.
   
   Also, I did the same thing for new version of test and it's also reproducible (2 fails over 1000 runs):
   My local error (new test version) 1st fail: https://gist.github.com/aromanenko-dev/faa4229a486117d7504e486282503009
   My local error (new test version) 2nd fail: https://gist.github.com/aromanenko-dev/751cd007bb89a4f6e67f1ce9f9ef3d25
   
   As you can see the actually the errors with old and new versions tests are similar. So, it seems that it's not related to `verify` using and more related to how every mock `ShardRecordsIterator` returns the records. For some reasons it starts to return just empty list.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-600158638
 
 
   I think it's almost LGTM, just requires some Javadoc updates on using rate limiter in multiple threads. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r391484239
 
 

 ##########
 File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
 ##########
 @@ -324,10 +325,15 @@ public void shouldCallRateLimitPolicy()
       }
     }
 
+    ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class);
+    verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture());
+    List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+    assertThat(capturedRecords).contains(
+        ImmutableList.of(a, b),
+        singletonList(c),
+        singletonList(d),
+        Collections.emptyList()
+    );
     verify(customRateLimitPolicy).onThrottle(same(e));
-    verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
 
 Review comment:
   Sure, this patch is enough to run only this test `shouldCallRateLimitPolicy()` with Junit 5:
   https://gist.github.com/aromanenko-dev/02c4b206bd7c64bec3ee7a4e3588a5b6
   I didn't test other tests, so plz don't blame if something doesn't work =) Sorry for delay, I'm in a different time zone.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] suztomo commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
suztomo commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r391300387
 
 

 ##########
 File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
 ##########
 @@ -324,10 +325,15 @@ public void shouldCallRateLimitPolicy()
       }
     }
 
+    ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class);
+    verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture());
+    List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+    assertThat(capturedRecords).contains(
+        ImmutableList.of(a, b),
+        singletonList(c),
+        singletonList(d),
+        Collections.emptyList()
+    );
     verify(customRateLimitPolicy).onThrottle(same(e));
-    verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
 
 Review comment:
   I disagree (I’m not a committer). We just revealed a bug, which is very hard to reproduce.
   
   @aromanenko-dev 
   
   > @RepeatedTest(10000) and some hacks to run it on Junit5
   
   Would you share this?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] suztomo commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
suztomo commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r391228832
 
 

 ##########
 File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
 ##########
 @@ -324,10 +325,15 @@ public void shouldCallRateLimitPolicy()
       }
     }
 
+    ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class);
+    verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture());
+    List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+    assertThat(capturedRecords).contains(
+        ImmutableList.of(a, b),
+        singletonList(c),
+        singletonList(d),
+        Collections.emptyList()
+    );
     verify(customRateLimitPolicy).onThrottle(same(e));
-    verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
 
 Review comment:
   @aromanenko-dev Thank you for sharing your result.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jfarr commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
jfarr commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r390762635
 
 

 ##########
 File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
 ##########
 @@ -324,10 +325,15 @@ public void shouldCallRateLimitPolicy()
       }
     }
 
+    ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class);
+    verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture());
+    List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+    assertThat(capturedRecords).contains(
+        ImmutableList.of(a, b),
+        singletonList(c),
+        singletonList(d),
+        Collections.emptyList()
+    );
     verify(customRateLimitPolicy).onThrottle(same(e));
-    verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
 
 Review comment:
   Thanks, that's how I thought Mockito.verify() worked too but this made me doubt it. I didn't do any detailed testing, though, so thanks for taking the time to validate. I'm honestly at a loss for what could be causing intermittent failures then. All I can say is this version of the test is semantically equivalent and might not be subject to the same failure mode. I have not been able to reproduce the failure myself.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev merged pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev merged pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jfarr commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
jfarr commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-600950516
 
 
   > I think it's almost LGTM, just requires some Javadoc updates on using rate limiter in multiple threads.
   
   Thanks @aromanenko-dev, that's done now. And thank you @suztomo for pushing me on this and keeping me honest.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-601120924
 
 
   Run Java_Examples_Dataflow PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] suztomo commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
suztomo commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-598926854
 
 
   In the test case, the two iterators return the following data:
   
   - firstIterator: `throw(e)`;  `[a, b]`;   `[]`;   `[]`;   `[]`;   `[]` ...
   - secondIterator: `[c]`;   `[d]`;   `[]`;   `[]`;   `[]` ...
   
   The test case waits for 5 elements from shardReadersPool:
   
   ```
       List<KinesisRecord> fetchedRecords = new ArrayList<>();
       while (fetchedRecords.size() < 4) {
         CustomOptional<KinesisRecord> nextRecord = shardReadersPool.nextRecord();
         if (nextRecord.isPresent()) {
           fetchedRecords.add(nextRecord.get());
         }
       }
   ```
   
   The test's expectation was:
   
   ```
       verify(customRateLimitPolicy).onThrottle(same(e));
       verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
       verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
       verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
   ```
   
   My assumption: there's no guarantee that 2 elements from firstIterator and secondIterator are served evenly when 5 elements are consumed.
   
   My suggestion: make the while loop until it confirms the expected 5 elements are in `fetchedRecords` variable.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-598122164
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r391484239
 
 

 ##########
 File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
 ##########
 @@ -324,10 +325,15 @@ public void shouldCallRateLimitPolicy()
       }
     }
 
+    ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class);
+    verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture());
+    List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+    assertThat(capturedRecords).contains(
+        ImmutableList.of(a, b),
+        singletonList(c),
+        singletonList(d),
+        Collections.emptyList()
+    );
     verify(customRateLimitPolicy).onThrottle(same(e));
-    verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
 
 Review comment:
   Sure, this patch is enough to run only this test `shouldCallRateLimitPolicy()` with Junit 5:
   https://gist.github.com/aromanenko-dev/02c4b206bd7c64bec3ee7a4e3588a5b6
   I know it's a not perfect solution, very hacky, but it allowed to run this test as much time as I need. I didn't test other tests, so plz don't blame me if something doesn't work =) 
   Sorry for delay, I guess I'm in a different time zone with you.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jfarr commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
jfarr commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r393216743
 
 

 ##########
 File path: sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
 ##########
 @@ -126,9 +126,21 @@ void startReadingShards(Iterable<ShardRecordsIterator> shardRecordsIterators) {
   private void readLoop(ShardRecordsIterator shardRecordsIterator, RateLimitPolicy rateLimiter) {
     while (poolOpened.get()) {
       try {
-        List<KinesisRecord> kinesisRecords;
         try {
-          kinesisRecords = shardRecordsIterator.readNextBatch();
+          List<KinesisRecord> kinesisRecords = shardRecordsIterator.readNextBatch();
+          try {
+            for (KinesisRecord kinesisRecord : kinesisRecords) {
+              recordsQueue.put(kinesisRecord);
+              numberOfRecordsInAQueueByShard.get(kinesisRecord.getShardId()).incrementAndGet();
+            }
+          } finally {
+            // One of the paths into this finally block is recordsQueue.put() throwing
+            // InterruptedException so we should check the thread's interrupted status before
+            // calling onSuccess().
+            if (!Thread.currentThread().isInterrupted()) {
+              rateLimiter.onSuccess(kinesisRecords);
 
 Review comment:
   Yes, in order to rate limit correctly onSuccess() needs to be called after every successful getRecords() call even if it returned no records. Otherwise we would be right back to exceeding our API rate limit when the shard has no more records to process.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-601124858
 
 
   Run Java_Examples_Dataflow PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-599536461
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jfarr commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
jfarr commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r393222652
 
 

 ##########
 File path: sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
 ##########
 @@ -126,9 +126,21 @@ void startReadingShards(Iterable<ShardRecordsIterator> shardRecordsIterators) {
   private void readLoop(ShardRecordsIterator shardRecordsIterator, RateLimitPolicy rateLimiter) {
 
 Review comment:
   I don't see an inherent problem with it, as long as the rate limiter is thread-safe it can be shared. If you want I could add a note in the Javadoc for RateLimitPolicyFactory to indicate that it will be used from multiple threads, so if it returns a singleton instance of RateLimitPolicy then that instance should be thread-safe, otherwise it should return separate RateLimitPolicy instances.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r391779556
 
 

 ##########
 File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
 ##########
 @@ -324,10 +325,15 @@ public void shouldCallRateLimitPolicy()
       }
     }
 
+    ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class);
+    verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture());
+    List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+    assertThat(capturedRecords).contains(
+        ImmutableList.of(a, b),
+        singletonList(c),
+        singletonList(d),
+        Collections.emptyList()
+    );
     verify(customRateLimitPolicy).onThrottle(same(e));
-    verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
 
 Review comment:
   I did some testing on my side and I think that the problem is with using the same mock of `RateLimitPolicy` for both `ShardRecordsIterator`s calling in different threads. For example, when failed assertion says that "d" record is absent in `capturedRecords`, actually I see (using debug output) that it was read with `shardRecordsIterator.readNextBatch()` in `readLoop()`.
   Since, in case of using real `RateLimitPolicy` they will be created per `readLoop()` thread (right?), then we need to test it in the same way with mock policies.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev removed a comment on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev removed a comment on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-601123987
 
 
   Run Java_Examples_Dataflow PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r393167162
 
 

 ##########
 File path: sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
 ##########
 @@ -126,9 +126,21 @@ void startReadingShards(Iterable<ShardRecordsIterator> shardRecordsIterators) {
   private void readLoop(ShardRecordsIterator shardRecordsIterator, RateLimitPolicy rateLimiter) {
 
 Review comment:
   Don't you think that `rateLimiter` should not be shared across different reader threads?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev removed a comment on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev removed a comment on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-601120924
 
 
   Run Java_Examples_Dataflow PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-601097324
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev removed a comment on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev removed a comment on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-598122073
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-597027748
 
 
   Please, fix Spotless issue

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-596970579
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jfarr commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
jfarr commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r391341830
 
 

 ##########
 File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
 ##########
 @@ -324,10 +325,15 @@ public void shouldCallRateLimitPolicy()
       }
     }
 
+    ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class);
+    verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture());
+    List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+    assertThat(capturedRecords).contains(
+        ImmutableList.of(a, b),
+        singletonList(c),
+        singletonList(d),
+        Collections.emptyList()
+    );
     verify(customRateLimitPolicy).onThrottle(same(e));
-    verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
 
 Review comment:
   > Would you share this?
    +1, please
   
   @suztomo What bug are you suggesting this exposes?
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev removed a comment on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev removed a comment on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-598122164
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-598123021
 
 
   Run Java PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-601123987
 
 
   Run Java_Examples_Dataflow PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] suztomo commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
suztomo commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-599653019
 
 
   @aromanenko-dev You're right. `fetchedRecords` only contains 4 elements. My understanding for the test was incorrect.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r391779556
 
 

 ##########
 File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
 ##########
 @@ -324,10 +325,15 @@ public void shouldCallRateLimitPolicy()
       }
     }
 
+    ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class);
+    verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture());
+    List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+    assertThat(capturedRecords).contains(
+        ImmutableList.of(a, b),
+        singletonList(c),
+        singletonList(d),
+        Collections.emptyList()
+    );
     verify(customRateLimitPolicy).onThrottle(same(e));
-    verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
 
 Review comment:
   I did some testing on my side and I think that the problem is with using the same mock of `RateLimitPolicy` for both `ShardRecordsIterator`s. For example, when failed assertion says that "d" record is absent in `capturedRecords`, actually I see (using debug output) that it was read with `shardRecordsIterator.readNextBatch()` in `readLoop()`.
   Since, in case of using real `RateLimitPolicy` they will be created per `readLoop()` thread (right?), then we need to test it in the same way with mock policies.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jfarr commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
jfarr commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r391341830
 
 

 ##########
 File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
 ##########
 @@ -324,10 +325,15 @@ public void shouldCallRateLimitPolicy()
       }
     }
 
+    ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class);
+    verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture());
+    List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+    assertThat(capturedRecords).contains(
+        ImmutableList.of(a, b),
+        singletonList(c),
+        singletonList(d),
+        Collections.emptyList()
+    );
     verify(customRateLimitPolicy).onThrottle(same(e));
-    verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
 
 Review comment:
   > Would you share this?
   
   +1, please
   
   @suztomo What bug are you suggesting this exposes?
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jfarr commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
jfarr commented on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-597400266
 
 
   > Thanks! Please, fix Spotless issue and add `[BEAM-9470]` as a prefix to commit message.
   
   @aromanenko-dev Sure, sorry about that. It's fixed now.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r391484239
 
 

 ##########
 File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
 ##########
 @@ -324,10 +325,15 @@ public void shouldCallRateLimitPolicy()
       }
     }
 
+    ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class);
+    verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture());
+    List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+    assertThat(capturedRecords).contains(
+        ImmutableList.of(a, b),
+        singletonList(c),
+        singletonList(d),
+        Collections.emptyList()
+    );
     verify(customRateLimitPolicy).onThrottle(same(e));
-    verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
 
 Review comment:
   Sure, this patch is enough to run only this test `shouldCallRateLimitPolicy()` with Junit 5:
   https://gist.github.com/aromanenko-dev/02c4b206bd7c64bec3ee7a4e3588a5b6
   I know it's a not perfect solution, very hacky, but it allowed to run this test as much time as I need. I didn't test other tests, so plz don't blame if something doesn't work =) 
   Sorry for delay, I guess I'm in a different time zone with you.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev removed a comment on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev removed a comment on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-601124492
 
 
   Run Java_Examples_Dataflow PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] suztomo commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
suztomo commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r390376749
 
 

 ##########
 File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
 ##########
 @@ -324,10 +325,15 @@ public void shouldCallRateLimitPolicy()
       }
     }
 
+    ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class);
+    verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture());
+    List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+    assertThat(capturedRecords).contains(
+        ImmutableList.of(a, b),
+        singletonList(c),
+        singletonList(d),
+        Collections.emptyList()
+    );
     verify(customRateLimitPolicy).onThrottle(same(e));
-    verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
 
 Review comment:
   Would you share your thought on in what situation (the order of concurrent operations) customRateLimitPolicy did not take `[d]` even when it takes `[a, b]` and `[c]`?
   
   https://builds.apache.org/job/beam_PreCommit_Java_Commit/10277/testReport/junit/org.apache.beam.sdk.io.kinesis/ShardReadersPoolTest/shouldCallRateLimitPolicy/
   
   > Argument(s) are different! Wanted:
   > customRateLimitPolicy.onSuccess(
   >    [d]
   > );
   > -> at org.apache.beam.sdk.io.kinesis.ShardReadersPoolTest.shouldCallRateLimitPolicy(ShardReadersPoolTest.java:330)
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jfarr commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
jfarr commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r391389538
 
 

 ##########
 File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
 ##########
 @@ -324,10 +325,15 @@ public void shouldCallRateLimitPolicy()
       }
     }
 
+    ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class);
+    verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture());
+    List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+    assertThat(capturedRecords).contains(
+        ImmutableList.of(a, b),
+        singletonList(c),
+        singletonList(d),
+        Collections.emptyList()
+    );
     verify(customRateLimitPolicy).onThrottle(same(e));
-    verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
 
 Review comment:
   @suztomo You have a good point. The intention of this test is to validate that every time readNextBatch() returns a list of records onSuccess() gets called and every time it throws `KinesisClientThrottledException` onThrottle() gets called. I can see a couple of code paths where readNextBatch() could return records without onSuccess() getting called, namely if any exceptions are thrown along in here: https://github.com/apache/beam/blob/d62521f69ead4b58924043f041978d49e9beeb62/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java#L146-L149
   
   I haven't seen evidence that this is what's happening, but let's try my latest changes in ShardReadersPool and see if that fixes it. With this change it should not be possible for readNextBatch() to return without onSuccess() being called.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] suztomo commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
suztomo commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r390736457
 
 

 ##########
 File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
 ##########
 @@ -324,10 +325,15 @@ public void shouldCallRateLimitPolicy()
       }
     }
 
+    ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class);
+    verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture());
+    List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+    assertThat(capturedRecords).contains(
+        ImmutableList.of(a, b),
+        singletonList(c),
+        singletonList(d),
+        Collections.emptyList()
+    );
     verify(customRateLimitPolicy).onThrottle(same(e));
-    verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
 
 Review comment:
   Thank you for response. I still need your help here. Mockito is not supposed to check the order of the method calls (unless [configured to use inOrde](https://stackoverflow.com/questions/21901368/mockito-verify-order-sequence-of-method-calls)r). I checked its behavior with the following code:
   <img width="1019" alt="Screen Shot 2020-03-11 at 12 03 34 AM" src="https://user-images.githubusercontent.com/28604/76381431-da928380-632b-11ea-97d1-0fc0bac12083.png">
   
   Do you know something special in this test case that makes mockito's `verify` order sensitive?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r393796823
 
 

 ##########
 File path: sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
 ##########
 @@ -126,9 +126,21 @@ void startReadingShards(Iterable<ShardRecordsIterator> shardRecordsIterators) {
   private void readLoop(ShardRecordsIterator shardRecordsIterator, RateLimitPolicy rateLimiter) {
 
 Review comment:
   Yes, please add this into Javadoc to make it clear for users. Other than that we are fine, I think.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev edited a comment on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev edited a comment on issue #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#issuecomment-599641222
 
 
   > The test case waits for 5 elements from shardReadersPool:
   
   @suztomo I think I disagree here - `fetchedRecords` will contain only 4 elements maximum. And they always  will be only "a", "b", "c" and "d" but maybe in different order. Empty records won't be returned by `shardReadersPool.nextRecord()`, however they will be passed to `rateLimiter.onSuccess()` as an argument (which is unnecessary I think). 
   Let me explain how I understand this test. @jfarr , please correct me if I'm wrong. Actually, we need this loop running until `fetchedRecords.size() < 4` only to make sure that 2 reader threads did read all these 4 records. Than we assert that `rateLimiter` worked correctly during this since this is a real purpose of this test.
   
   I think that new version of this test with timeout (last commit) should be fine. I tested it locally several times with 10k runs and no fails for the moment.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] suztomo commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
suztomo commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r390902538
 
 

 ##########
 File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
 ##########
 @@ -324,10 +325,15 @@ public void shouldCallRateLimitPolicy()
       }
     }
 
+    ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class);
+    verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture());
+    List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+    assertThat(capturedRecords).contains(
+        ImmutableList.of(a, b),
+        singletonList(c),
+        singletonList(d),
+        Collections.emptyList()
+    );
     verify(customRateLimitPolicy).onThrottle(same(e));
-    verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
 
 Review comment:
   How about writing one-off code that invokes this problematic test case, say, 100 times and count test failures?  We can verify whether your version fixes the flakiness.
   
   I don’t expect that one-off code to be checked in, but attaching the observation and the code snippet to PR is helpful.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] aromanenko-dev commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r391692082
 
 

 ##########
 File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
 ##########
 @@ -324,10 +325,15 @@ public void shouldCallRateLimitPolicy()
       }
     }
 
+    ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class);
+    verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture());
+    List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+    assertThat(capturedRecords).contains(
+        ImmutableList.of(a, b),
+        singletonList(c),
+        singletonList(d),
+        Collections.emptyList()
+    );
     verify(customRateLimitPolicy).onThrottle(same(e));
-    verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
 
 Review comment:
   What I can guess for now is that `firstIterator` and `secondIterator` are executed in different threads (since `readLoop()` is called for every topic shard in the separate thread) and for some reasons **sometimes** one thread takes more priority over the second while running and the `secondIterator` doesn't have a chance to return "d" record since `shardReadersPool` got closed.
   PS: Interesting that on Jenkins it's much easier reproducible than locally.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] suztomo commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky

Posted by GitBox <gi...@apache.org>.
suztomo commented on a change in pull request #11090: [BEAM-9470] :sdks:java:io:kinesis:test is flaky
URL: https://github.com/apache/beam/pull/11090#discussion_r390902538
 
 

 ##########
 File path: sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
 ##########
 @@ -324,10 +325,15 @@ public void shouldCallRateLimitPolicy()
       }
     }
 
+    ArgumentCaptor<List<KinesisRecord>> recordsCaptor = ArgumentCaptor.forClass(List.class);
+    verify(customRateLimitPolicy, atLeastOnce()).onSuccess(recordsCaptor.capture());
+    List<List<KinesisRecord>> capturedRecords = recordsCaptor.getAllValues();
+    assertThat(capturedRecords).contains(
+        ImmutableList.of(a, b),
+        singletonList(c),
+        singletonList(d),
+        Collections.emptyList()
+    );
     verify(customRateLimitPolicy).onThrottle(same(e));
-    verify(customRateLimitPolicy).onSuccess(eq(ImmutableList.of(a, b)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(c)));
-    verify(customRateLimitPolicy).onSuccess(eq(singletonList(d)));
 
 Review comment:
   How about writing one-off code that invokes the test, say, 100 times and count test failures?  We can verify whether your version fixes the flakiness.
   
   I don’t expect that one-off code to be checked in, but attaching the observation and the code snippet to PR is helpful.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services