You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "stathismar (via GitHub)" <gi...@apache.org> on 2023/03/28 15:42:23 UTC
[GitHub] [hudi] stathismar opened a new issue, #8311: [SUPPORT] DeltaStreamer stops fetching changes after a SparkSQL Update query on its Hudi table
stathismar opened a new issue, #8311:
URL: https://github.com/apache/hudi/issues/8311
**_Tips before filing an issue_**
- Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?
- Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.
- If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
**Describe the problem you faced**
Hello!
I have the following setup and a question:
I have a Postgres Database -> DMS -> Hudi in order to implement a CDC Solution (Right now I'm experimenting with a simple example table called employee).
Generally, this setup works fine. What I want to achieve to add a new column and add a default value to all old records. By default Hudi creates the new column and starts ingesting values for it.
What I would like to achieve is to find a way to backfill this value for all old records.
More specifically,
I have created the following simple table in Postgres:
```
id |name |salary|
-------+-----------+------+
3004870|Employee 1 | 2000|
3004871|Employee 2 | 5000|
...
3004879|Employee 10| 2000|
```
If run `DeltaStreamer` in `BULK_INSRERT` mode and I can see the same data in the hudi table.
Then I insert a new row ( i.e. 3004880|Employee 11| 1000| ) in the Postgres database and run DeltaStreamer in UPSERT mode it will continue working as expected:
```
20230328095858298 20230328095858298_0_0 3004872 salary=1000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-2_0-17-13_20230328100557913.parquet I 2023-03-28 09:57:04.729440 3004872 Employee 3 1000
20230328095858298 20230328095858298_0_1 3004875 salary=1000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-2_0-17-13_20230328100557913.parquet I 2023-03-28 09:57:04.729453 3004875 Employee 6 1000
20230328095858298 20230328095858298_0_2 3004878 salary=1000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-2_0-17-13_20230328100557913.parquet I 2023-03-28 09:57:04.729464 3004878 Employee 9 1000
20230328100557913 20230328100557913_0_3 3004880 salary=1000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-2_0-17-13_20230328100557913.parquet I 2023-03-28 10:01:17.580412 3004880 Employee 11 1000
20230328095858298 20230328095858298_0_0 3004870 salary=2000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-0_0-3-3_20230328095858298.parquet I 2023-03-28 09:57:04.729397 3004870 Employee 1 2000
20230328095858298 20230328095858298_0_1 3004873 salary=2000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-0_0-3-3_20230328095858298.parquet I 2023-03-28 09:57:04.729445 3004873 Employee 4 2000
20230328095858298 20230328095858298_0_2 3004876 salary=2000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-0_0-3-3_20230328095858298.parquet I 2023-03-28 09:57:04.729456 3004876 Employee 7 2000
20230328095858298 20230328095858298_0_3 3004879 salary=2000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-0_0-3-3_20230328095858298.parquet I 2023-03-28 09:57:04.729470 3004879 Employee 10 2000
20230328095858298 20230328095858298_0_0 3004871 salary=5000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-1_0-3-3_20230328095858298.parquet I 2023-03-28 09:57:04.729435 3004871 Employee 2 5000
20230328095858298 20230328095858298_0_1 3004874 salary=5000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-1_0-3-3_20230328095858298.parquet I 2023-03-28 09:57:04.729449 3004874 Employee 5 5000
20230328095858298 20230328095858298_0_2 3004877 salary=5000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-1_0-3-3_20230328095858298.parquet I 2023-03-28 09:57:04.729460 3004877 Employee 8 5000
```
After this if I add a Column in the Postgres (with a Default value) e.g. (bonus field)
```
id |name |salary|bonus|
-------+-----------+------+-----+
3004870|Employee 1 | 2000| 100|
...
3004880|Employee 11| 1000| 100|
```
and then add a single record(In the Postgres Database):
```
3004881|Employee 12| 2000| 200|
```
and run `DeltaStreeamer` in `UPSERT` Mode, then in the hudi table I have the following:
```
20230328095858298 20230328095858298_0_0 3004870 salary=2000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-0_0-17-13_20230328101416065.parquet I 2023-03-28 09:57:04.729397 3004870 Employee 1 NULL 2000
20230328095858298 20230328095858298_0_1 3004873 salary=2000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-0_0-17-13_20230328101416065.parquet I 2023-03-28 09:57:04.729445 3004873 Employee 4 NULL 2000
20230328095858298 20230328095858298_0_2 3004876 salary=2000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-0_0-17-13_20230328101416065.parquet I 2023-03-28 09:57:04.729456 3004876 Employee 7 NULL 2000
20230328095858298 20230328095858298_0_3 3004879 salary=2000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-0_0-17-13_20230328101416065.parquet I 2023-03-28 09:57:04.729470 3004879 Employee 10 NULL 2000
20230328101416065 20230328101416065_0_4 3004881 salary=2000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-0_0-17-13_20230328101416065.parquet I 2023-03-28 10:12:21.784188 3004881 Employee 12 200 2000
20230328095858298 20230328095858298_0_0 3004872 salary=1000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-2_0-17-13_20230328100557913.parquet I 2023-03-28 09:57:04.729440 3004872 Employee 3 NULL 1000
20230328095858298 20230328095858298_0_1 3004875 salary=1000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-2_0-17-13_20230328100557913.parquet I 2023-03-28 09:57:04.729453 3004875 Employee 6 NULL 1000
20230328095858298 20230328095858298_0_2 3004878 salary=1000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-2_0-17-13_20230328100557913.parquet I 2023-03-28 09:57:04.729464 3004878 Employee 9 NULL 1000
20230328100557913 20230328100557913_0_3 3004880 salary=1000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-2_0-17-13_20230328100557913.parquet I 2023-03-28 10:01:17.580412 3004880 Employee 11 NULL 1000
20230328095858298 20230328095858298_0_0 3004871 salary=5000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-1_0-3-3_20230328095858298.parquet I 2023-03-28 09:57:04.729435 3004871 Employee 2 NULL 5000
20230328095858298 20230328095858298_0_1 3004874 salary=5000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-1_0-3-3_20230328095858298.parquet I 2023-03-28 09:57:04.729449 3004874 Employee 5 NULL 5000
20230328095858298 20230328095858298_0_2 3004877 salary=5000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-1_0-3-3_20230328095858298.parquet I 2023-03-28 09:57:04.729460 3004877 Employee 8 NULL 5000
```
What I would like to achieve is to backfill all of these `NULL` values with the default value(using Spark Shell or SparkSQL).
```
spark-sql> update employee set bonus=100 where id != 3004881;
23/03/28 13:37:34 WARN HoodieWriteConfig: Embedded timeline server is disabled, fallback to use direct marker type for spark
23/03/28 13:38:32 WARN HoodieWriteConfig: Embedded timeline server is disabled, fallback to use direct marker type for spark
23/03/28 13:39:31 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
23/03/28 13:39:31 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
Time taken: 139.231 seconds
spark-sql> select * from employee;
20230328133728279 20230328133728279_1_0 3004872 salary=1000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-2_1-146-231_20230328133728279.parquet I 2023-03-28 09:57:04.729440 3004872 Employee 3 100 1000
20230328133728279 20230328133728279_1_1 3004875 salary=1000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-2_1-146-231_20230328133728279.parquet I 2023-03-28 09:57:04.729453 3004875 Employee 6 100 1000
20230328133728279 20230328133728279_1_2 3004878 salary=1000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-2_1-146-231_20230328133728279.parquet I 2023-03-28 09:57:04.729464 3004878 Employee 9 100 1000
20230328133728279 20230328133728279_1_3 3004880 salary=1000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-2_1-146-231_20230328133728279.parquet I 2023-03-28 10:01:17.580412 3004880 Employee 11 100 1000
20230328133728279 20230328133728279_0_0 3004870 salary=2000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-0_0-146-230_20230328133728279.parquet I 2023-03-28 09:57:04.729397 3004870 Employee 1 100 2000
20230328133728279 20230328133728279_0_1 3004873 salary=2000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-0_0-146-230_20230328133728279.parquet I 2023-03-28 09:57:04.729445 3004873 Employee 4 100 2000
20230328133728279 20230328133728279_0_2 3004876 salary=2000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-0_0-146-230_20230328133728279.parquet I 2023-03-28 09:57:04.729456 3004876 Employee 7 100 2000
20230328133728279 20230328133728279_0_3 3004879 salary=2000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-0_0-146-230_20230328133728279.parquet I 2023-03-28 09:57:04.729470 3004879 Employee 10 100 2000
20230328101416065 20230328101416065_0_4 3004881 salary=2000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-0_0-146-230_20230328133728279.parquet I 2023-03-28 10:12:21.784188 3004881 Employee 12 200 2000
20230328133728279 20230328133728279_2_0 3004871 salary=5000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-1_2-146-232_20230328133728279.parquet I 2023-03-28 09:57:04.729435 3004871 Employee 2 100 5000
20230328133728279 20230328133728279_2_1 3004874 salary=5000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-1_2-146-232_20230328133728279.parquet I 2023-03-28 09:57:04.729449 3004874 Employee 5 100 5000
20230328133728279 20230328133728279_2_2 3004877 salary=5000 ce6c1f0d-352b-440e-80d2-40a5dc989d3a-1_2-146-232_20230328133728279.parquet I 2023-03-28 09:57:04.729460 3004877 Employee 8 100 5000
Time taken: 15.774 seconds, Fetched 12 row(s)
```
Now if I insert a new record in Postgres:
```
3004882|Employee 13| 2000| 300|
```
and run `DeltaStreamer` in `UPSERT` mode (which completes successfully) I stop seeing the updates in Hudi table (I've tried all of `INSERT`,`UPDATE`,`DELETE`)..
I guess with has to do with the
```
update employee set bonus=100 where id != 3004881;
```
query I did.
I also see this warning in the SparkSQL:
```
23/03/28 13:37:34 WARN HoodieWriteConfig: Embedded timeline server is disabled, fallback to use direct marker type for spark
```
which I'm not sure if it is related to the actual problem.
Are we able to write to a `Deltasteamer` table through SparkSQL?
**To Reproduce**
Steps to reproduce the behavior:
1. Create A Postgres instance
2. Create a DMS instance
3. Start a Hudi Deltastreamer Job for a table
4. Add a column to the Postgres table
5. Try to backfill Hudi NULL data for the column using SparkSQL
6. DMS will stop fetching changes
**Expected behavior**
I would have expected `DeltaStreamer` to keep fetching changes.
**Environment Description**
* Hudi version : `0.13.0`
* Spark version : `3.3.1`
* Hive version : -
* Hadoop version : -
* Storage (HDFS/S3/GCS..) : `S3`
* Running on Docker? (yes/no) : Yes, Running on Kubernetes
**Additional context**
The command I use to spawn the Spark Job:
```bash
./spark-submit \
--jars local:///opt/spark/work-dir/hudi-spark3.3-bundle_2.12-0.13.0.jar,local:///opt/spark/work-dir/hudi-aws-bundle-0.13.0.jar,local:///opt/spark/work-dir/aws-java-sdk-bundle-1.12.398.jar,local:///opt/spark/work-dir/hadoop-aws-3.3.4.jar \
--master k8s://http://localhost:8001 --deploy-mode cluster \
--conf spark.kubernetes.container.image=stathisq/spark-hudi:3.3.1-0.13.0-slim \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.kubernetes.namespace=hudi-example \
--conf spark.kubernetes.executor.podTemplateFile=$(pwd)/pod_templates/podTemplateExecutor.yaml \
--conf spark.kubernetes.driver.podTemplateFile=$(pwd)/pod_templates/podTemplateDriver.yaml \
--conf spark.kubernetes.file.upload.path=s3a://cdc-spike/spark \
--conf spark.ui.port=4040 \
--conf spark.driver.extraJavaOptions="-Divy.cache.dir=/tmp -Divy.home=/tmp" \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \
--class "org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer" local:///opt/spark/work-dir/hudi-utilities-slim-bundle_2.12-0.13.0.jar \
--table-type COPY_ON_WRITE --op UPSERT \
--target-base-path s3a://cdc-spike/hudi/postgres/employee \
--target-table employee \
--min-sync-interval-seconds 60 \
--source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
--payload-class "org.apache.hudi.payload.AWSDmsAvroPayload" \
--hoodie-conf "hoodie.deltastreamer.source.dfs.root=s3a://cdc-spike/dms/public/employee/" \
--source-ordering-field _dms_ingestion_timestamp \
--hoodie-conf auto.offset.reset=earliest \
--hoodie-conf hoodie.datasource.write.recordkey.field=id \
--hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
--hoodie-conf hoodie.datasource.write.partitionpath.field=salary
```
**Stacktrace**
```Add the stacktrace of the error.```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] stathismar commented on issue #8311: [SUPPORT] DeltaStreamer stops fetching changes after a SparkSQL Update query on its Hudi table
Posted by "stathismar (via GitHub)" <gi...@apache.org>.
stathismar commented on issue #8311:
URL: https://github.com/apache/hudi/issues/8311#issuecomment-1488225736
@yihua If I can further help by providing more info or logs please ping me.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] yihua commented on issue #8311: [SUPPORT] DeltaStreamer stops fetching changes after a SparkSQL Update query on its Hudi table
Posted by "yihua (via GitHub)" <gi...@apache.org>.
yihua commented on issue #8311:
URL: https://github.com/apache/hudi/issues/8311#issuecomment-1487761250
Hi @stathismar Thanks for raising this. This seems to be unexpected. We'll need to reproduce this and find the root cause.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] stathismar commented on issue #8311: [SUPPORT] DeltaStreamer stops fetching changes after a SparkSQL Update query on its Hudi table
Posted by "stathismar (via GitHub)" <gi...@apache.org>.
stathismar commented on issue #8311:
URL: https://github.com/apache/hudi/issues/8311#issuecomment-1490438916
Hello again. I managed to overcome the issue. I started the SparkSQL console using the official spark image from dockerhub. I.e.:
```bash
docker run --env-file aws-env.txt -it apache/spark:v3.3.1 /opt/spark/bin/spark-sql \
--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.398,org.apache.hudi:hudi-aws-bundle:0.13.0 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.driver.extraJavaOptions="-Divy.cache.dir=/tmp -Divy.home=/tmp"
```
and ran exactly the same commands. **Everything worked as expected**!
Locally I have (almost) the same Java/Scala versions as the one present in the [official Spark docker image](https://hub.docker.com/layers/apache/spark/v3.3.1/images/sha256-7b38e8ec4f04c8dbcba38ac74212a92aeff9a55d0068631df6fe65b1397b400d?context=explore) (my system has a newer point release for both Java and Scala, but I do not believe this should be a problem)
The other thing I suspected is that in the Docker image we use the `user` with `UUID 185`.
I tried to set `export SPARK_USER=185` locally but it also didn't help.
Anyway my issue is solved, and I do not think there is any reason to reproduce and spend more time on this since most probably it has to do with some version incompatibility.
cc. @yihua
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] stathismar closed issue #8311: [SUPPORT] DeltaStreamer stops fetching changes after a SparkSQL Update query on its Hudi table
Posted by "stathismar (via GitHub)" <gi...@apache.org>.
stathismar closed issue #8311: [SUPPORT] DeltaStreamer stops fetching changes after a SparkSQL Update query on its Hudi table
URL: https://github.com/apache/hudi/issues/8311
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org