You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/07/06 17:57:08 UTC

[GitHub] [iceberg] SreeramGarlapati opened a new issue #2789: Implement rate limiting while reading stream from Iceberg table as Spark3 DSv2 source

SreeramGarlapati opened a new issue #2789:
URL: https://github.com/apache/iceberg/issues/2789


   This is a continuation of work done in PR #2660: reading from iceberg table as an incremental streaming source in spark.
   
   Here's an [idea proposal](https://github.com/apache/iceberg/pull/2660#discussion_r650153016) by @aokolnychyi - to use limit API once we move to 3.1.
   
   [Please refer to the Concepts section to fully understand the issue]
   
   ## The Problem
   Current implementation of Iceberg DSv2 micro_batch source - hands off all available data on the table to Spark3 DSv2. This could result in highly unpredictable/irregular stream sizes. This issue is an ask to explore - what are the concepts available in Iceberg - to rate limit the stream size.
   
   ## Implementation proposal
   
   Flow control can be maintained at `File Level` / `Record Level`.
   Iceberg table metadata layer mainly operates at `File Level` - i.e., each table is a list of data (/delete) files. 
   While ideal implementation of flow control should be at Record level - keeping in mind the ROI, complexity of implementation & our current requirement - we only need `File Level` flow control.
   
   Implementation idea is that:
   1. introduce a spark option to enable rate limiting - `max-bytes-per-micro-batch` - which switches to this implementation
   2. change the implementation of `SparkMicroBatchStream` to remember the tables stream position - the `latestReturnedOffset`
   3. change the implementation of `latestoffset()` to **NOT** return the iceberg tables' actual `latestOffset` - but, apply flow control and measure what the NEW `latestReturnedOffset` can be.
   4. change the OffsetStoreProvider to also add the `latestCommittedOffset` to be able to resume stream.
   
   ### Concepts needed to grok the above idea
   
   #### Background on Spark 3 - MicroBatchStream
   
   1. Spark expects micro batch streaming source to implement the interface `MicroBatchStream`
   2. Spark driver - as part of `MicroBatchExecution` - instantiates the Iceberg's `MicroBatchStream` implementation & invokes the **methods** in this `MicroBatchStream` class **in the below order**:
             - `latestOffset()` - asks Iceberg streaming source to return what is the `latestOffset` availabe on the stream 
             - `initialOffset()` - asks iceberg streaming source to return what is the first ever `initialOffset` when this stream started
             - `planInputPartitions(start, end)` - spark runtime picks a `start` offset and `end` offset based on the stream position it is at - & asks iceberg streaming source to return a dataStructure `InputPartition[]` - which will later be distributed across spark executors
             - `createReaderFactory()` - iceberg micro batch streaming source should implement this method - to educate spark executor - as to - how to read a given `InputPartition`
             - there are other standard methods on the `MicroBatchStream` interface - which will be invoked by spark - like `deserializeOffset`, `commit`, `stop` - which are self explanatory.
   
   #### Iceberg's Spark stream offset - StreamingOffset
   
   Spark expects the streaming source - to implement `Offset` class. `Offset` - is a logical representation of position of the stream - at which spark is reading.
   Iceberg's implementation (which already existed before this PR) is: `StreamingOffset`.
   It stores 3 properties about the Iceberg table position:
   1. **SnapshotId**: the Iceberg table Snapshot Id
   2. **position**: file position in a given iceberg snapshot
   3. **ScanAllFiles**: whether to stream 
       - All files in that Snapshot - which includes - (a) files added in older Snapshots + (b) **net NEW** files added in the current Snapshot
       - or to stream - only the files that are newly added in the current Snapshot.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] himanshpal commented on issue #2789: Implement rate limiting while reading stream from Iceberg table as Spark3 DSv2 source

Posted by GitBox <gi...@apache.org>.
himanshpal commented on issue #2789:
URL: https://github.com/apache/iceberg/issues/2789#issuecomment-875435777


   @SreeramGarlapati - Can the rate-limiting be also be extended to be used in batch usecases ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org