You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@bahir.apache.org by GitBox <gi...@apache.org> on 2019/08/07 18:17:40 UTC

[GitHub] [bahir] abhishekd0907 opened a new pull request #91: [BAHIR-213] Faster S3 file Source for Structured Streaming with SQS

abhishekd0907 opened a new pull request #91: [BAHIR-213] Faster S3 file Source for Structured Streaming with SQS
URL: https://github.com/apache/bahir/pull/91
 
 
   ## What changes were proposed in this pull request?
   
   Using FileStreamSource to read files from a S3 bucket has problems both in terms of costs and latency:
   
   - **Latency**: Listing all the files in S3 buckets every microbatch can be both slow and resource intensive.
   - **Costs**: Making List API requests to S3 every microbatch can be costly.
   
   The solution is to use Amazon Simple Queue Service (SQS) which lets you find new files written to S3 bucket without the need to list all the files every microbatch.
   
   S3 buckets can be configured to send notification to an Amazon SQS Queue on Object Create / Object Delete events. For details see AWS documentation here [Configuring S3 Event Notifications](https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html) 
   
   Spark can leverage this to find new files written to S3 bucket by reading notifications from SQS queue instead of listing files every microbatch.
   
   This PR adds a new SQSSource which uses Amazon SQS queue to find new files every microbatch.
   
   ## Usage
   `val inputDf = spark
   .readStream
   .format("s3-sqs")
   .schema(schema)
   .option("fileFormat", "json")
   .option("sqsUrl", "https://QUEUE_URL")
   .option("region", "us-east-1")
   .load()`
   
   ## Implementation Details
   
   We create a scheduled thread which runs asynchronously with the streaming query thread and periodically fetches messages from the SQS Queue. Key information related to file path & timestamp is extracted from the SQS messages and the new files are stored in a thread safe SQS file cache.
   
   Streaming Query thread gets the files from SQS File Cache and filters out the new files. Based on the maxFilesPerTrigger condition, all or a part of the new files are added to the offset log and marked as processed in the SQS File Cache. The corresponding SQS messages for the processed files are deleted from the Amazon SQS Queue and the offset value is incremented and returned.
   
   ![SQS Design (1)](https://user-images.githubusercontent.com/43843989/59924375-3c842380-9453-11e9-9a4e-6f3fa6faf3fb.jpg)
   
   ## How was this patch tested?
   
   Added new unit tests in SqsSourceOptionsSuite which test various SqsSourceOptions. Will add more tests after some initial feedback on design approach and functionality.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services