You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mm...@apache.org on 2023/07/26 14:04:48 UTC
[beam] branch master updated: [AWS] Fix bug in KinesisIOReadTest (closes #27666) (#27686)
This is an automated email from the ASF dual-hosted git repository.
mmack pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 21e040c14e4 [AWS] Fix bug in KinesisIOReadTest (closes #27666) (#27686)
21e040c14e4 is described below
commit 21e040c14e4c1e6f0b993a3040ca304d800c126c
Author: Moritz Mack <mm...@talend.com>
AuthorDate: Wed Jul 26 16:04:41 2023 +0200
[AWS] Fix bug in KinesisIOReadTest (closes #27666) (#27686)
---
.../org/apache/beam/sdk/io/aws2/kinesis/KinesisIOReadTest.java | 9 +++++----
1 file changed, 5 insertions(+), 4 deletions(-)
diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIOReadTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIOReadTest.java
index 02d28eaad58..82a31b9d037 100644
--- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIOReadTest.java
+++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIOReadTest.java
@@ -111,12 +111,13 @@ public class KinesisIOReadTest {
@Test
public void testReadWithEFOFromShards() {
SubscribeToShardEvent shard0event = eventWithRecords(3);
- SubscribeToShardEvent shard1event = eventWithRecords(3);
- SubscribeToShardEvent shard2event = eventWithRecords(3);
+ SubscribeToShardEvent shard1event = eventWithRecords(4);
+ SubscribeToShardEvent shard2event = eventWithRecords(5);
EFOStubbedKinesisAsyncClient asyncClientStub = new EFOStubbedKinesisAsyncClient(10);
asyncClientStub.stubSubscribeToShard("0", shard0event);
asyncClientStub.stubSubscribeToShard("1", shard1event);
- asyncClientStub.stubSubscribeToShard("2", shard1event);
+ asyncClientStub.stubSubscribeToShard("2", shard2event);
+
MockClientBuilderFactory.set(p, KinesisAsyncClientBuilder.class, asyncClientStub);
Iterable<Record> expectedRecords =
concat(shard0event.records(), shard1event.records(), shard2event.records());
@@ -128,7 +129,7 @@ public class KinesisIOReadTest {
.withConsumerArn("consumer")
.withInitialPositionInStream(TRIM_HORIZON)
.withArrivalTimeWatermarkPolicy()
- .withMaxNumRecords(9);
+ .withMaxNumRecords(12);
PCollection<Record> result = p.apply(read).apply(ParDo.of(new KinesisIOReadTest.ToRecord()));
PAssert.that(result).containsInAnyOrder(expectedRecords);