You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "nikoshet (via GitHub)" <gi...@apache.org> on 2023/03/31 15:32:55 UTC

[GitHub] [hudi] nikoshet opened a new issue, #8343: [SUPPORT] Hudi CustomKeyGenerator not working with timestamp based partitioned fields

nikoshet opened a new issue, #8343:
URL: https://github.com/apache/hudi/issues/8343

   **Describe the problem you faced**
   
   Hello, 
   I am experimenting with AWS DMS -> Hudi architecture using `DeltaStreamer` with parquet, and I want to partition the files in folders based on year, month and day, where the partition field is a column named _created_at_ that is of type _TIMESTAMP_ . When using a command to spawn the Spark Job and use partition only on year, it works as expected, and the folders inside the S3 bucket are partitioned correctly. 
   
   The command is:
   ```bash
   spark-submit \
   --jars local:///opt/spark/work-dir/hudi-spark3.3-bundle_2.12-0.13.0.jar,local:///opt/spark/work-dir/hudi-aws-bundle-0.13.0.jar,local:///opt/spark/work-dir/aws-java-sdk-bundle-1.12.398.jar,local:///opt/spark/work-dir/hadoop-aws-3.3.4.jar \
   --master k8s://http://localhost:8001 --deploy-mode cluster \
   --conf spark.kubernetes.container.image=stathisq/spark-hudi:3.3.1-0.13.0-slim \
   --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
   --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
   --conf spark.kubernetes.namespace=spark \
   --conf spark.kubernetes.executor.podTemplateFile=$(pwd)/pod-templates/podTemplateExecutor.yaml \
   --conf spark.kubernetes.driver.podTemplateFile=$(pwd)/pod-templates/podTemplateDriver.yaml \
   --conf spark.kubernetes.file.upload.path=s3a://cdc-spike/spark \
   --conf spark.ui.port=4040 \
   --conf spark.driver.extraJavaOptions="-Divy.cache.dir=/tmp -Divy.home=/tmp" \
   --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \
   --class "org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer" local:///opt/spark/work-dir/hudi-utilities-slim-bundle_2.12-0.13.0.jar \
   --table-type COPY_ON_WRITE --op BULK_INSERT \
   --target-base-path s3a://cdc-spike/hudi/postgres/employee \
   --target-table employee \
   --min-sync-interval-seconds 60 \
   --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
   --payload-class "org.apache.hudi.payload.AWSDmsAvroPayload" \
   --hoodie-conf "hoodie.deltastreamer.source.dfs.root=s3a://cdc-spike/dms/public/employee/" \
   --source-ordering-field _dms_ingestion_timestamp \
   --hoodie-conf auto.offset.reset=earliest \
   --hoodie-conf hoodie.datasource.write.recordkey.field=id \
   --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
   --hoodie-conf hoodie.datasource.write.partitionpath.field=created_at:TIMESTAMP \
   --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator \
   --hoodie-conf hoodie.deltastreamer.keygen.timebased.output.dateformat="yyyy" \
   --hoodie-conf hoodie.deltastreamer.keygen.timebased.timestamp.type="SCALAR" \
   --hoodie-conf hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit="microseconds"
   ```
   
   Also, when I start a Spark SQL client and load the hudi table, it is read without any errors.
   
   
   However, when I am trying to run a command that will use `year,month,day` partitioning , thus changing this line:
   ```bash
   --hoodie-conf hoodie.deltastreamer.keygen.timebased.output.dateformat="yyyy/MM/dd" \
   ```
   
   <details>
   <summary>
   i.e:</summary>
   <br>
   
   ```bash
   spark-submit \
   --jars local:///opt/spark/work-dir/hudi-spark3.3-bundle_2.12-0.13.0.jar,local:///opt/spark/work-dir/hudi-aws-bundle-0.13.0.jar,local:///opt/spark/work-dir/aws-java-sdk-bundle-1.12.398.jar,local:///opt/spark/work-dir/hadoop-aws-3.3.4.jar \
   --master k8s://http://localhost:8001 --deploy-mode cluster \
   --conf spark.kubernetes.container.image=stathisq/spark-hudi:3.3.1-0.13.0-slim \
   --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
   --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
   --conf spark.kubernetes.namespace=spark \
   --conf spark.kubernetes.executor.podTemplateFile=$(pwd)/pod-templates/podTemplateExecutor.yaml \
   --conf spark.kubernetes.driver.podTemplateFile=$(pwd)/pod-templates/podTemplateDriver.yaml \
   --conf spark.kubernetes.file.upload.path=s3a://cdc-spike/spark \
   --conf spark.ui.port=4040 \
   --conf spark.driver.extraJavaOptions="-Divy.cache.dir=/tmp -Divy.home=/tmp" \
   --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \
   --class "org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer" local:///opt/spark/work-dir/hudi-utilities-slim-bundle_2.12-0.13.0.jar \
   --table-type COPY_ON_WRITE --op BULK_INSERT \
   --target-base-path s3a://cdc-spike/hudi/postgres/employee \
   --target-table employee \
   --min-sync-interval-seconds 60 \
   --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
   --payload-class "org.apache.hudi.payload.AWSDmsAvroPayload" \
   --hoodie-conf "hoodie.deltastreamer.source.dfs.root=s3a://cdc-spike/dms/public/employee/" \
   --source-ordering-field _dms_ingestion_timestamp \
   --hoodie-conf auto.offset.reset=earliest \
   --hoodie-conf hoodie.datasource.write.recordkey.field=id \
   --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
   --hoodie-conf hoodie.datasource.write.partitionpath.field=created_at:TIMESTAMP \
   --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator \
   --hoodie-conf hoodie.deltastreamer.keygen.timebased.output.dateformat="yyyy/MM/dd" \
   --hoodie-conf hoodie.deltastreamer.keygen.timebased.timestamp.type="SCALAR" \
   --hoodie-conf hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit="microseconds"
   ```
   
   </br>
   </details>
   
   The folders are split correctly on the S3 bucket, as shown below:
   
   ```bash
   ws s3 ls s3://cdc-spike/hudi/postgres/employee --recursive
   2023-03-31 18:22:27          0 hudi/postgres/employee/.hoodie/.aux/.bootstrap/.fileids/
   2023-03-31 18:22:26          0 hudi/postgres/employee/.hoodie/.aux/.bootstrap/.partitions/
   2023-03-31 18:22:22          0 hudi/postgres/employee/.hoodie/.schema/
   2023-03-31 18:23:51          0 hudi/postgres/employee/.hoodie/.temp/
   2023-03-31 18:23:49       1721 hudi/postgres/employee/.hoodie/20230331152242137.commit
   2023-03-31 18:22:45          0 hudi/postgres/employee/.hoodie/20230331152242137.commit.requested
   2023-03-31 18:23:10          0 hudi/postgres/employee/.hoodie/20230331152242137.inflight
   2023-03-31 18:22:23          0 hudi/postgres/employee/.hoodie/archived/
   2023-03-31 18:23:07        697 hudi/postgres/employee/.hoodie/hoodie.properties
   2023-03-31 18:22:55          0 hudi/postgres/employee/.hoodie/metadata/.hoodie/.aux/.bootstrap/.fileids/
   2023-03-31 18:22:54          0 hudi/postgres/employee/.hoodie/metadata/.hoodie/.aux/.bootstrap/.partitions/
   2023-03-31 18:22:50          0 hudi/postgres/employee/.hoodie/metadata/.hoodie/.schema/
   2023-03-31 18:23:45          0 hudi/postgres/employee/.hoodie/metadata/.hoodie/.temp/
   2023-03-31 18:23:04       5615 hudi/postgres/employee/.hoodie/metadata/.hoodie/00000000000000.deltacommit
   2023-03-31 18:23:03        121 hudi/postgres/employee/.hoodie/metadata/.hoodie/00000000000000.deltacommit.inflight
   2023-03-31 18:23:00          0 hudi/postgres/employee/.hoodie/metadata/.hoodie/00000000000000.deltacommit.requested
   2023-03-31 18:23:42       6712 hudi/postgres/employee/.hoodie/metadata/.hoodie/20230331152242137.deltacommit
   2023-03-31 18:23:34       1502 hudi/postgres/employee/.hoodie/metadata/.hoodie/20230331152242137.deltacommit.inflight
   2023-03-31 18:23:30          0 hudi/postgres/employee/.hoodie/metadata/.hoodie/20230331152242137.deltacommit.requested
   2023-03-31 18:22:51          0 hudi/postgres/employee/.hoodie/metadata/.hoodie/archived/
   2023-03-31 18:22:56        672 hudi/postgres/employee/.hoodie/metadata/.hoodie/hoodie.properties
   2023-03-31 18:22:58        124 hudi/postgres/employee/.hoodie/metadata/files/.files-0000_00000000000000.log.1_0-0-0
   2023-03-31 18:23:40      11050 hudi/postgres/employee/.hoodie/metadata/files/.files-0000_00000000000000.log.2_0-10-10
   2023-03-31 18:23:36         93 hudi/postgres/employee/.hoodie/metadata/files/.hoodie_partition_metadata
   2023-03-31 18:23:15         96 hudi/postgres/employee/created_at=2023/03/31/.hoodie_partition_metadata
   2023-03-31 18:23:19     436276 hudi/postgres/employee/created_at=2023/03/31/30e66d5f-d5da-4206-9961-2b1c9f2967d1-0_0-3-3_20230331152242137.parquet
   ```
   
   ,but with the Spark SQL client I get the following error:
   ```bash
   java.lang.ClassCastException: class org.apache.spark.unsafe.types.UTF8String cannot be cast to class java.lang.Long (org.apache.spark.unsafe.types.UTF8String is in unnamed module of loader 'app'; java.lang.Long is in module java.base of loader 'bootstrap')
   	at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:107)
   	at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong(rows.scala:42)
   	at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong$(rows.scala:42)
   	at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getLong(rows.scala:195)
   	at org.apache.spark.sql.execution.vectorized.ColumnVectorUtils.populate(ColumnVectorUtils.java:100)
   	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:269)
   	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:280)
   	at org.apache.spark.sql.execution.datasources.parquet.Spark32PlusHoodieParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(Spark32PlusHoodieParquetFileFormat.scala:309)
   	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:209)
   	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270)
   	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
   	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:554)
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
   	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
   	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
   	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
   	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
   	at org.apache.spark.scheduler.Task.run(Task.scala:136)
   	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
   	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
   	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
   	at java.base/java.lang.Thread.run(Unknown Source)
   ```
   
   The same error occurs when using spark shell and loading the table using DataFrames.
   
   A sample of the data in the Hudi parquet (seem to be ok) file are:
   <details>
   <summary> drop down</summary>
   
   <br>
   
   ```bash
   +-----------------------+------------------------+----------------------+--------------------------+------------------------------------------------------------------------+------+----------------------------+------+-------------+----------+----------------------------------+
   |   _hoodie_commit_time |   _hoodie_commit_seqno |   _hoodie_record_key | _hoodie_partition_path   | _hoodie_file_name                                                      | Op   | _dms_ingestion_timestamp   |   id | name        |   salary | created_at                       |
   |-----------------------+------------------------+----------------------+--------------------------+------------------------------------------------------------------------+------+----------------------------+------+-------------+----------+----------------------------------|
   |     20230331152242137 |  20230331152242137_0_0 |                    1 | created_at=2023/03/31    | 30e66d5f-d5da-4206-9961-2b1c9f2967d1-0_0-3-3_20230331152242137.parquet | I    | 2023-03-31 13:06:13.492681 |    1 | Employee 1  |     2000 | 2023-03-31 14:13:40.973882+00:00 |
   |     20230331152242137 |  20230331152242137_0_1 |                    2 | created_at=2023/03/31    | 30e66d5f-d5da-4206-9961-2b1c9f2967d1-0_0-3-3_20230331152242137.parquet | I    | 2023-03-31 13:06:13.492721 |    2 | Employee 2  |     5000 | 2023-03-31 14:13:40.973882+00:00 |
   |     20230331152242137 |  20230331152242137_0_2 |                    3 | created_at=2023/03/31    | 30e66d5f-d5da-4206-9961-2b1c9f2967d1-0_0-3-3_20230331152242137.parquet | I    | 2023-03-31 13:06:13.492727 |    3 | Employee 3  |     1000 | 2023-03-31 14:13:40.973882+00:00 |
   ```
   
   </br>
   
   </details>
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Create a DMS instance that reads from an RDS and writes to S3
   2. Start a Hudi DeltaStreamer Job for a table with a _created_at_ column of type TIMESTAMP
   3. Start a Spark SQL client and load the Hudi table
   4. Run a `select` statement on the table
   
   **Expected behavior**
   
   I expected the Spark SQL client to show the table without any errors, since the data are partitioned correctly on S3.
   
   **Environment Description**
   
   * Hudi version : `0.13.0`
   
   * Spark version : `3.3.1`
   
   * Hive version : - 
   
   * Hadoop version : - 
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : yes, _Kubernetes_
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ad1happy2go commented on issue #8343: [SUPPORT] Hudi CustomKeyGenerator not working with timestamp based partitioned fields

Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8343:
URL: https://github.com/apache/hudi/issues/8343#issuecomment-1578144077

   @nikoshet 
   Able to reproduce this issue. Its inferring the data type for partition column as long which is the root cause of the issue.
   
   Created the JIRA for the same - https://issues.apache.org/jira/browse/HUDI-6320
   
   Below is the gist with all the details to reproduce - https://gist.github.com/ad1happy2go/62b7bece4a1062e1023e105d49fdc235
   
   <img width="1074" alt="image" src="https://github.com/apache/hudi/assets/63430370/9e24e1f5-9b72-4e2a-969b-88823c2bacb2">
   
   
   <img width="1351" alt="image" src="https://github.com/apache/hudi/assets/63430370/3865a737-397f-4aae-9ade-6c61210f258a">
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org