You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/09/27 11:19:01 UTC

[GitHub] [hudi] kazdy opened a new issue #3724: [SUPPORT] Spark start reading stream from hudi dataset starting from given commit time

kazdy opened a new issue #3724:
URL: https://github.com/apache/hudi/issues/3724


   **Describe the problem you faced**
   I wanted to query hudi dataset incrementally using spark streaming and simply write stream to console with a trigger (processing time set to 3s). 
   I got it working but the problem I faced was that in the first batch I got all the data starting from the first commit. 
   I would like to start readStream from specific commit time (like in flink streaming query that hudi supports).
   
   I was looking at the code and it seems that there is no option I can specify to get this behavior.
   I know it's not documented yet and the work is in progress, are you planning to add such functionality?
   
   A clear and concise description of the problem.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Create non-empty hudi dataset
   2. Use spark.readStream.format("hudi").load(basePath) on the data set
   3. Use spark.writeStream.format("console") to write batches with changing data to console
   
   **Expected behavior**
   
   I would like to be able to specify from what commit time hudi creates a stream of records (like in spark incremental query or flink streaming query).
   First batch of data returned from spark.readStream.format("hudi").load(basePath) should start from specified commit time.
   
   **Environment Description**
   
   * Hudi version : 0.0.9
   
   * Spark version : 3.1.2
   
   * Hive version : -
   
   * Hadoop version : 3.2
   
   * Storage (HDFS/S3/GCS..) : local storage
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   What I'm trying to do is to obtain changes that are happening in one hudi dataset to then create incremental pipeline in spark and process them further. 
   
   If there is a better way of doing this in spark currently, could you please guide me? 
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] kazdy commented on issue #3724: [SUPPORT] Spark start reading stream from hudi dataset starting from given commit time

Posted by GitBox <gi...@apache.org>.
kazdy commented on issue #3724:
URL: https://github.com/apache/hudi/issues/3724#issuecomment-991237684


   Glad to hear that @alex-shchetkov :)
   There is one option that I want to try -  [hoodie.datasource.read.incr.path.glob](hoodie.datasource.read.incr.path.glob)
   I don't know how it works exactly yet, but maybe you'll find it useful :)
   
   Knowing how it works now, I'd keep commits for the last x days, and when there's a need for reprocessing I'd run it in batch mode, only for some partitions if needed. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] kazdy commented on issue #3724: [SUPPORT] Spark start reading stream from hudi dataset starting from given commit time

Posted by GitBox <gi...@apache.org>.
kazdy commented on issue #3724:
URL: https://github.com/apache/hudi/issues/3724#issuecomment-986309728


   I'm closing this issue after reading the Hudi code. 
   
   Hudi Incremental Query reads data only if available in commits, so you're not going to get all the data from the table (which was my concern) as the commit files are removed with new data coming to the table (depending on the configuration). You just can't read stream from the beginning of the table in all cases.
   
   For the newcomers reading the Spark Guide Incremental query section, this is not obvious. 
   Spark Structured Streaming is not documented at all, this is something that needs to be improved.
   
   Incremental query behavior that I was confused about is explained well here:
   https://hudi.apache.org/docs/configurations/#cleanretain_commits
   
   I think Hudi still is missing some functionality when it comes to the Spark Structured Streaming:
   - readStream from a given point in time,
   - readStream to a given point in time,
   - maxBytesPerTrigger,
   - maxRecordsPerTrigger.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] kazdy commented on issue #3724: [SUPPORT] Spark start reading stream from hudi dataset starting from given commit time

Posted by GitBox <gi...@apache.org>.
kazdy commented on issue #3724:
URL: https://github.com/apache/hudi/issues/3724#issuecomment-954619903


   Thanks for the answer :)
   
   So there's no such feature that allows for setting something like starting offset when using streaming hudi source but checkpointing is available. As far as I understand the code, streaming source does not reuse configs provided by "hoodie.datasource" so these are just ignored.
   
   What I'm after is doing same thing as in Flink using read.streaming.start-commit (https://hudi.apache.org/docs/querying_data/#flink-sql). 
   Do you think this is something that could be added to spark streaming hudi source in the future?
   
   Regarding the DeltaStreamer, I think that it might be the right solution. 
   I'm just wondering if I can add my own jar with transform class that does the transformation? In the docs it mentions that it's pluggable but I haven't found anything on that. 
   If my thinking is correct, where the jar goes (i want to run in on emr)? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] kazdy commented on issue #3724: [SUPPORT] Spark start reading stream from hudi dataset starting from given commit time

Posted by GitBox <gi...@apache.org>.
kazdy commented on issue #3724:
URL: https://github.com/apache/hudi/issues/3724#issuecomment-954863193


   I think `hoodie.datasource.read.begin.instanttime` in unit test is only used to assert that new data has been read from source hudi `sourcePath ` and then written to `destPath ` by the streaming query initialized by ` initStreamingWriteFuture`. 
   
   I tried these settings and it didn't start from the time i specified.
   
   I think that it's not configurable at all:
   `    metadataLog.get(0).getOrElse {
         metadataLog.add(0, INIT_OFFSET)
         INIT_OFFSET
       }`
   https://github.com/apache/hudi/blob/47ed91799943271f219419cf209793a98b3f09b5/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
   
   Where INIT_OFFSET is declared as:
   `val INIT_OFFSET = HoodieSourceOffset(HoodieTimeline.INIT_INSTANT_TS)`
   https://github.com/apache/hudi/blob/47ed91799943271f219419cf209793a98b3f09b5/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
   
   Which is:
   `  // Instant corresponding to pristine state of the table after its creation
     String INIT_INSTANT_TS = "00000000000000";`
     https://github.com/apache/hudi/blob/0223c442ec9a746834d1b2f2582c5267b692823a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
     
   
   > Yes that's correct. On EMR, you need to place it under `/usr/lib/hudi`
   So when running EMR ok EKS I'd need to provide custom container image to do this.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] kazdy edited a comment on issue #3724: [SUPPORT] Spark start reading stream from hudi dataset starting from given commit time

Posted by GitBox <gi...@apache.org>.
kazdy edited a comment on issue #3724:
URL: https://github.com/apache/hudi/issues/3724#issuecomment-954863193


   I think `hoodie.datasource.read.begin.instanttime` in unit test is only used to assert that new data has been read from source hudi `sourcePath ` and then written to `destPath ` by the streaming query initialized by ` initStreamingWriteFuture`. 
   
   I tried these settings and it didn't start from the time i specified.
   
   I think that it's not configurable at all:
   `    metadataLog.get(0).getOrElse {
         metadataLog.add(0, INIT_OFFSET)
         INIT_OFFSET
       }`
   https://github.com/apache/hudi/blob/47ed91799943271f219419cf209793a98b3f09b5/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
   
   Where INIT_OFFSET is declared as:
   `val INIT_OFFSET = HoodieSourceOffset(HoodieTimeline.INIT_INSTANT_TS)`
   https://github.com/apache/hudi/blob/47ed91799943271f219419cf209793a98b3f09b5/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
   
   Which is:
   `  // Instant corresponding to pristine state of the table after its creation
     String INIT_INSTANT_TS = "00000000000000";`
     https://github.com/apache/hudi/blob/0223c442ec9a746834d1b2f2582c5267b692823a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
     
   
   > Yes that's correct. On EMR, you need to place it under `/usr/lib/hudi`
   
   So when running EMR ok EKS I'd need to provide custom container image to do this.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codope commented on issue #3724: [SUPPORT] Spark start reading stream from hudi dataset starting from given commit time

Posted by GitBox <gi...@apache.org>.
codope commented on issue #3724:
URL: https://github.com/apache/hudi/issues/3724#issuecomment-954836086


   > What I'm after is doing same thing as in Flink using read.streaming.start-commit (https://hudi.apache.org/docs/querying_data/#flink-sql).
   Do you think this is something that could be added to spark streaming hudi source in the future?
   
   Wouldn't the setting the end instant time to current time and begin to current - avg_processing_time be suffcient? `hoodie.datasource.read.begin.instanttime` and `hoodie.datasource.read.end.instanttime` should be usable with structured streaming as well. We can see that in unit test for example.
   
   > If my thinking is correct, where the jar goes (i want to run in on emr)?
   
   Yes that's correct. On EMR, you need to place it under `/usr/lib/hudi`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] alex-shchetkov commented on issue #3724: [SUPPORT] Spark start reading stream from hudi dataset starting from given commit time

Posted by GitBox <gi...@apache.org>.
alex-shchetkov commented on issue #3724:
URL: https://github.com/apache/hudi/issues/3724#issuecomment-991137826


   This really threw me for a loop, thank you for looking into this and clarifying this @kazdy, otherwise I would've spent far longer trying to fiddle with the properties to make it read the entire hudi table. 
   
   The only workaround I found for reading the entire table during the first readStream trigger, is by re-writing the original table with itself. This creates a new commit containing all records, but since there's no "max{Bytes|Records}PerTrigger" it can be hard to process downstream


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] kazdy commented on issue #3724: [SUPPORT] Spark start reading stream from hudi dataset starting from given commit time

Posted by GitBox <gi...@apache.org>.
kazdy commented on issue #3724:
URL: https://github.com/apache/hudi/issues/3724#issuecomment-991237684


   Glad to hear that @alex-shchetkov :)
   There is one option that I want to try -  [hoodie.datasource.read.incr.path.glob](hoodie.datasource.read.incr.path.glob)
   I don't know how it works exactly yet, but maybe you'll find it useful :)
   
   Knowing how it works now, I'd keep commits for the last x days, and when there's a need for reprocessing I'd run it in batch mode, only for some partitions if needed. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codope commented on issue #3724: [SUPPORT] Spark start reading stream from hudi dataset starting from given commit time

Posted by GitBox <gi...@apache.org>.
codope commented on issue #3724:
URL: https://github.com/apache/hudi/issues/3724#issuecomment-931404620


   Generally, for incremental queries we need to set following configs:
   ```
   "hoodie.datasource.query.type" : "incremental",
   "hoodie.datasource.read.begin.instanttime" : "commit_time_to_read_from"
   ```
   Did you try using these configs? You can also take a look at [TestStructuredStreaming](https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala#L101) for an example usage.
   
   > What I'm trying to do is to obtain changes that are happening in one hudi dataset to then create incremental pipeline in spark and process them further.
   
   For this, I would also suggest to take a look at HoodieIncrSource and setup a deltastreamer job using that source. For an example, take a look at [TestHoodieDeltaStreamer](https://github.com/apache/hudi/blob/47ed91799943271f219419cf209793a98b3f09b5/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java#L1225).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] kazdy closed issue #3724: [SUPPORT] Spark start reading stream from hudi dataset starting from given commit time

Posted by GitBox <gi...@apache.org>.
kazdy closed issue #3724:
URL: https://github.com/apache/hudi/issues/3724


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org