You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mm...@apache.org on 2023/07/26 14:04:48 UTC

[beam] branch master updated: [AWS] Fix bug in KinesisIOReadTest (closes #27666) (#27686)

This is an automated email from the ASF dual-hosted git repository.

mmack pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 21e040c14e4 [AWS] Fix bug in KinesisIOReadTest (closes #27666) (#27686)
21e040c14e4 is described below

commit 21e040c14e4c1e6f0b993a3040ca304d800c126c
Author: Moritz Mack <mm...@talend.com>
AuthorDate: Wed Jul 26 16:04:41 2023 +0200

    [AWS] Fix bug in KinesisIOReadTest (closes #27666) (#27686)
---
 .../org/apache/beam/sdk/io/aws2/kinesis/KinesisIOReadTest.java   | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)

diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIOReadTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIOReadTest.java
index 02d28eaad58..82a31b9d037 100644
--- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIOReadTest.java
+++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIOReadTest.java
@@ -111,12 +111,13 @@ public class KinesisIOReadTest {
   @Test
   public void testReadWithEFOFromShards() {
     SubscribeToShardEvent shard0event = eventWithRecords(3);
-    SubscribeToShardEvent shard1event = eventWithRecords(3);
-    SubscribeToShardEvent shard2event = eventWithRecords(3);
+    SubscribeToShardEvent shard1event = eventWithRecords(4);
+    SubscribeToShardEvent shard2event = eventWithRecords(5);
     EFOStubbedKinesisAsyncClient asyncClientStub = new EFOStubbedKinesisAsyncClient(10);
     asyncClientStub.stubSubscribeToShard("0", shard0event);
     asyncClientStub.stubSubscribeToShard("1", shard1event);
-    asyncClientStub.stubSubscribeToShard("2", shard1event);
+    asyncClientStub.stubSubscribeToShard("2", shard2event);
+
     MockClientBuilderFactory.set(p, KinesisAsyncClientBuilder.class, asyncClientStub);
     Iterable<Record> expectedRecords =
         concat(shard0event.records(), shard1event.records(), shard2event.records());
@@ -128,7 +129,7 @@ public class KinesisIOReadTest {
             .withConsumerArn("consumer")
             .withInitialPositionInStream(TRIM_HORIZON)
             .withArrivalTimeWatermarkPolicy()
-            .withMaxNumRecords(9);
+            .withMaxNumRecords(12);
 
     PCollection<Record> result = p.apply(read).apply(ParDo.of(new KinesisIOReadTest.ToRecord()));
     PAssert.that(result).containsInAnyOrder(expectedRecords);