You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Dave Hagman (Jira)" <ji...@apache.org> on 2022/02/22 15:17:00 UTC

[jira] [Commented] (HUDI-1307) spark datasource load path format is confused for snapshot and increment read mode

    [ https://issues.apache.org/jira/browse/HUDI-1307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17496163#comment-17496163 ] 

Dave Hagman commented on HUDI-1307:
-----------------------------------

[~vinoth] [~309637554] [~xushiyan]   I'd like to +1 what [~vho] said. We heavily use hive-style partitioning w/ glob paths in order to allow spark to optimize the plan for us.

 

I wanted to ensure that this issue is not recommending the removal of support for queries such as these:

 
{code:java}
# Single load
spark.read \
    .format("org.apache.hudi") \
    .option("hoodie.datasource.query.type", "snapshot") \
    .option("hoodie.file.index.enable", False)\
    .load("file://example/hudi-table/year=2022/month=2/day=*/hour=*") 

# Load multiple years
read_paths = [f"{datalake_uri}/year=2020/month=*/day=*/hour=*", f"{datalake_uri}/year=2021/month=*/day=*/hour=*", ...]

base_df = spark.read \
    .format("org.apache.hudi") \
    .option("hoodie.datasource.query.type", "snapshot") \
    .option("hoodie.file.index.enable", False)\
    .option("hoodie.datasource.read.paths", ",".join(read_paths)) \
    .load(){code}
In these examples we have a table partitioned by year/month/day/hour. 

I am currently testing Hudi version 0.10.x and getting errors when constructing queries they way we used to in hudi 0.9.0. Have changes already been made to the way the default datasource parses glob patterns?

 

Example error:
{code:java}
java.lang.NullPointerException
	at org.apache.hudi.HoodieSparkUtils$.$anonfun$globPath$2(HoodieSparkUtils.scala:82)
	at org.apache.hudi.HoodieSparkUtils$.$anonfun$globPath$2$adapted(HoodieSparkUtils.scala:78)
	at scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:256)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255)
	at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249)
	at scala.collection.mutable.ArrayOps$ofRef.filterImpl(ArrayOps.scala:198)
	at scala.collection.TraversableLike.filterNot(TraversableLike.scala:355)
	at scala.collection.TraversableLike.filterNot$(TraversableLike.scala:355)
	at scala.collection.mutable.ArrayOps$ofRef.filterNot(ArrayOps.scala:198)
	at org.apache.hudi.HoodieSparkUtils$.$anonfun$globPath$1(HoodieSparkUtils.scala:78)
	at scala.Option.map(Option.scala:230)
	at org.apache.hudi.HoodieSparkUtils$.globPath(HoodieSparkUtils.scala:77)
	at org.apache.hudi.HoodieSparkUtils$.globPathIfNecessary(HoodieSparkUtils.scala:97)
	at org.apache.hudi.HoodieSparkUtils$.$anonfun$checkAndGlobPathIfNecessary$1(HoodieSparkUtils.scala:111)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
	at scala.collection.immutable.List.flatMap(List.scala:355)
	at org.apache.hudi.HoodieSparkUtils$.checkAndGlobPathIfNecessary(HoodieSparkUtils.scala:109)
	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:97)
	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:69)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:354)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:326)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:308)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:308)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:240)
... {code}
 

 

> spark datasource load path format is confused for snapshot and increment read mode
> ----------------------------------------------------------------------------------
>
>                 Key: HUDI-1307
>                 URL: https://issues.apache.org/jira/browse/HUDI-1307
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: reader-core, spark
>            Reporter: liwei
>            Assignee: liwei
>            Priority: Major
>              Labels: hudi-on-call, query-eng, sev:high, user-support-issues
>
> as spark datasource read hudi table
> 1、snapshot mode
> {code:java}
>  val readHudi = spark.read.format("org.apache.hudi").load(basePath + "/*");
> should add "/*" ,otherwise will fail, because in org.apache.hudi.DefaultSource.
> createRelation() will use fs.globStatus(). if do not have "/*" will not get .hoodie and default dir
> val globPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs){code}
>  
> 2、increment mode
> both basePath and  basePath + "/*"  is ok.This is because in org.apache.hudi.DefaultSource  
> DataSourceUtils.getTablePath can support both the two format.
> {code:java}
>  val incViewDF = spark.read.format("org.apache.hudi").
>  option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
>  option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
>  option(END_INSTANTTIME_OPT_KEY, endTime).
>  load(basePath){code}
>  
> {code:java}
>  val incViewDF = spark.read.format("org.apache.hudi").
>  option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
>  option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
>  option(END_INSTANTTIME_OPT_KEY, endTime).
>  load(basePath + "/*")
>  {code}
>  
> as  increment mode and snapshot mode not coincide, user will confuse .Also load use basepath +"/*"  *or "/***/*"* is  confuse. I know this is to support partition.
> but i think this api will more clear for user
>  
> {code:java}
>  partition = "year = '2019'"
> spark.read .format("hudi") .load(path) .where(partition) {code}
>  
>  ```



--
This message was sent by Atlassian Jira
(v8.20.1#820001)