You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/12/20 16:40:28 UTC

[GitHub] [beam] mosche commented on a change in pull request #16286: [BEAM-13443] Avoid blocking put to Kinesis records queue to shutdown readers faster

mosche commented on a change in pull request #16286:
URL: https://github.com/apache/beam/pull/16286#discussion_r772513555



##########
File path: sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/ShardReadersPool.java
##########
@@ -145,8 +148,15 @@ private void readLoop(ShardRecordsIterator shardRecordsIterator, RateLimitPolicy
           List<KinesisRecord> kinesisRecords = shardRecordsIterator.readNextBatch();
           try {
             for (KinesisRecord kinesisRecord : kinesisRecords) {
-              recordsQueue.put(kinesisRecord);
-              numberOfRecordsInAQueueByShard.get(kinesisRecord.getShardId()).incrementAndGet();
+              while (true) {
+                if (!poolOpened.get()) {

Review comment:
       Goal here is to stop as soon as possible to not delay shutting down the reader. The loop to enqueue all new record can take very long, especially using aggregated records. Therefore it's important to check if the pool is still open before every new attempt to offer a record to the queue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org