You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "stathismar (via GitHub)" <gi...@apache.org> on 2023/03/28 15:42:23 UTC

[GitHub] [hudi] stathismar opened a new issue, #8311: [SUPPORT] DeltaStreamer stops fetching changes after a SparkSQL Update query on its Hudi table

stathismar opened a new issue, #8311:
URL: https://github.com/apache/hudi/issues/8311

   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?
   
   - Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.
   
   - If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   
   
   Hello!
   I have the following setup and a question:
   I have a Postgres Database -> DMS -> Hudi in order to implement a CDC Solution (Right now I'm experimenting with a simple  example table called employee).
   Generally, this setup works fine. What I want to achieve to add a new column and add a default value to all old records. By default Hudi creates the new column and starts ingesting values for it.
   What I would like to achieve is to find a way to backfill this value for all old records. 
   More specifically,
   I have created the following simple table in Postgres:
   ```
   id     |name       |salary|
   -------+-----------+------+
   3004870|Employee 1 |  2000|
   3004871|Employee 2 |  5000|
   ...
   3004879|Employee 10|  2000|
   ```
   If run `DeltaStreamer` in `BULK_INSRERT`  mode and I can see the same data in the hudi table.
   Then I insert a new row ( i.e. 3004880|Employee 11|  1000| ) in the Postgres database and run DeltaStreamer in UPSERT  mode it will continue working as expected:
   ```
   20230328095858298	20230328095858298_0_0	3004872	salary=1000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-2_0-17-13_20230328100557913.parquet	I	2023-03-28 09:57:04.729440	3004872	Employee 3	1000
   20230328095858298	20230328095858298_0_1	3004875	salary=1000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-2_0-17-13_20230328100557913.parquet	I	2023-03-28 09:57:04.729453	3004875	Employee 6	1000
   20230328095858298	20230328095858298_0_2	3004878	salary=1000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-2_0-17-13_20230328100557913.parquet	I	2023-03-28 09:57:04.729464	3004878	Employee 9	1000
   20230328100557913	20230328100557913_0_3	3004880	salary=1000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-2_0-17-13_20230328100557913.parquet	I	2023-03-28 10:01:17.580412	3004880	Employee 11	1000
   20230328095858298	20230328095858298_0_0	3004870	salary=2000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-0_0-3-3_20230328095858298.parquet	I	2023-03-28 09:57:04.729397	3004870	Employee 1	2000
   20230328095858298	20230328095858298_0_1	3004873	salary=2000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-0_0-3-3_20230328095858298.parquet	I	2023-03-28 09:57:04.729445	3004873	Employee 4	2000
   20230328095858298	20230328095858298_0_2	3004876	salary=2000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-0_0-3-3_20230328095858298.parquet	I	2023-03-28 09:57:04.729456	3004876	Employee 7	2000
   20230328095858298	20230328095858298_0_3	3004879	salary=2000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-0_0-3-3_20230328095858298.parquet	I	2023-03-28 09:57:04.729470	3004879	Employee 10	2000
   20230328095858298	20230328095858298_0_0	3004871	salary=5000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-1_0-3-3_20230328095858298.parquet	I	2023-03-28 09:57:04.729435	3004871	Employee 2	5000
   20230328095858298	20230328095858298_0_1	3004874	salary=5000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-1_0-3-3_20230328095858298.parquet	I	2023-03-28 09:57:04.729449	3004874	Employee 5	5000
   20230328095858298	20230328095858298_0_2	3004877	salary=5000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-1_0-3-3_20230328095858298.parquet	I	2023-03-28 09:57:04.729460	3004877	Employee 8	5000
   ```
   After this if I add a Column in the Postgres (with a Default value) e.g. (bonus  field)
   ```
   id     |name       |salary|bonus|
   -------+-----------+------+-----+
   3004870|Employee 1 |  2000|  100|
   ...
   3004880|Employee 11|  1000|  100|
   ```
   and then add a single record(In the Postgres Database):
   ```
   3004881|Employee 12|  2000|  200|
   ```
   and run `DeltaStreeamer` in `UPSERT` Mode, then in the hudi table I have the following:
   ```
   20230328095858298	20230328095858298_0_0	3004870	salary=2000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-0_0-17-13_20230328101416065.parquet	I	2023-03-28 09:57:04.729397	3004870	Employee 1	NULL	2000
   20230328095858298	20230328095858298_0_1	3004873	salary=2000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-0_0-17-13_20230328101416065.parquet	I	2023-03-28 09:57:04.729445	3004873	Employee 4	NULL	2000
   20230328095858298	20230328095858298_0_2	3004876	salary=2000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-0_0-17-13_20230328101416065.parquet	I	2023-03-28 09:57:04.729456	3004876	Employee 7	NULL	2000
   20230328095858298	20230328095858298_0_3	3004879	salary=2000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-0_0-17-13_20230328101416065.parquet	I	2023-03-28 09:57:04.729470	3004879	Employee 10	NULL	2000
   20230328101416065	20230328101416065_0_4	3004881	salary=2000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-0_0-17-13_20230328101416065.parquet	I	2023-03-28 10:12:21.784188	3004881	Employee 12	200	2000
   20230328095858298	20230328095858298_0_0	3004872	salary=1000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-2_0-17-13_20230328100557913.parquet	I	2023-03-28 09:57:04.729440	3004872	Employee 3	NULL	1000
   20230328095858298	20230328095858298_0_1	3004875	salary=1000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-2_0-17-13_20230328100557913.parquet	I	2023-03-28 09:57:04.729453	3004875	Employee 6	NULL	1000
   20230328095858298	20230328095858298_0_2	3004878	salary=1000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-2_0-17-13_20230328100557913.parquet	I	2023-03-28 09:57:04.729464	3004878	Employee 9	NULL	1000
   20230328100557913	20230328100557913_0_3	3004880	salary=1000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-2_0-17-13_20230328100557913.parquet	I	2023-03-28 10:01:17.580412	3004880	Employee 11	NULL	1000
   20230328095858298	20230328095858298_0_0	3004871	salary=5000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-1_0-3-3_20230328095858298.parquet	I	2023-03-28 09:57:04.729435	3004871	Employee 2	NULL	5000
   20230328095858298	20230328095858298_0_1	3004874	salary=5000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-1_0-3-3_20230328095858298.parquet	I	2023-03-28 09:57:04.729449	3004874	Employee 5	NULL	5000
   20230328095858298	20230328095858298_0_2	3004877	salary=5000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-1_0-3-3_20230328095858298.parquet	I	2023-03-28 09:57:04.729460	3004877	Employee 8	NULL	5000
   ```
   What I would like to achieve is to backfill all of these `NULL`  values with the default value(using Spark Shell or SparkSQL).
   ```
   spark-sql> update employee set bonus=100 where id != 3004881;
   23/03/28 13:37:34 WARN HoodieWriteConfig: Embedded timeline server is disabled, fallback to use direct marker type for spark
   23/03/28 13:38:32 WARN HoodieWriteConfig: Embedded timeline server is disabled, fallback to use direct marker type for spark
   23/03/28 13:39:31 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
   23/03/28 13:39:31 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
   Time taken: 139.231 seconds
   spark-sql> select * from employee;
   20230328133728279	20230328133728279_1_0	3004872	salary=1000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-2_1-146-231_20230328133728279.parquet	I	2023-03-28 09:57:04.729440	3004872	Employee 3	100	1000
   20230328133728279	20230328133728279_1_1	3004875	salary=1000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-2_1-146-231_20230328133728279.parquet	I	2023-03-28 09:57:04.729453	3004875	Employee 6	100	1000
   20230328133728279	20230328133728279_1_2	3004878	salary=1000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-2_1-146-231_20230328133728279.parquet	I	2023-03-28 09:57:04.729464	3004878	Employee 9	100	1000
   20230328133728279	20230328133728279_1_3	3004880	salary=1000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-2_1-146-231_20230328133728279.parquet	I	2023-03-28 10:01:17.580412	3004880	Employee 11	100	1000
   20230328133728279	20230328133728279_0_0	3004870	salary=2000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-0_0-146-230_20230328133728279.parquet	I	2023-03-28 09:57:04.729397	3004870	Employee 1	100	2000
   20230328133728279	20230328133728279_0_1	3004873	salary=2000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-0_0-146-230_20230328133728279.parquet	I	2023-03-28 09:57:04.729445	3004873	Employee 4	100	2000
   20230328133728279	20230328133728279_0_2	3004876	salary=2000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-0_0-146-230_20230328133728279.parquet	I	2023-03-28 09:57:04.729456	3004876	Employee 7	100	2000
   20230328133728279	20230328133728279_0_3	3004879	salary=2000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-0_0-146-230_20230328133728279.parquet	I	2023-03-28 09:57:04.729470	3004879	Employee 10	100	2000
   20230328101416065	20230328101416065_0_4	3004881	salary=2000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-0_0-146-230_20230328133728279.parquet	I	2023-03-28 10:12:21.784188	3004881	Employee 12	200	2000
   20230328133728279	20230328133728279_2_0	3004871	salary=5000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-1_2-146-232_20230328133728279.parquet	I	2023-03-28 09:57:04.729435	3004871	Employee 2	100	5000
   20230328133728279	20230328133728279_2_1	3004874	salary=5000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-1_2-146-232_20230328133728279.parquet	I	2023-03-28 09:57:04.729449	3004874	Employee 5	100	5000
   20230328133728279	20230328133728279_2_2	3004877	salary=5000	ce6c1f0d-352b-440e-80d2-40a5dc989d3a-1_2-146-232_20230328133728279.parquet	I	2023-03-28 09:57:04.729460	3004877	Employee 8	100	5000
   Time taken: 15.774 seconds, Fetched 12 row(s)
   ```
   Now if I insert a new record in Postgres:
   ```
   3004882|Employee 13|  2000|  300|
   ```
   
   and run `DeltaStreamer` in `UPSERT` mode (which completes successfully) I stop seeing the updates in Hudi table (I've tried all of `INSERT`,`UPDATE`,`DELETE`)..
   I guess with has to do with the
   ```
   update employee set bonus=100 where id != 3004881;
   ```
   query I did. 
   I also see this warning in the SparkSQL:
   ```
   23/03/28 13:37:34 WARN HoodieWriteConfig: Embedded timeline server is disabled, fallback to use direct marker type for spark
   ```
   which I'm not sure if it is related to the actual problem.
   Are we able to write to a `Deltasteamer` table through SparkSQL?
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Create A Postgres instance
   2.  Create a DMS instance
   3. Start a Hudi Deltastreamer Job for a table
   4. Add a column to the Postgres table
   5. Try to backfill Hudi NULL data for the column using SparkSQL
   6. DMS will stop fetching changes
   
   **Expected behavior**
   
   I would have expected `DeltaStreamer` to keep fetching changes.
   
   **Environment Description**
   
   * Hudi version : `0.13.0`
   
   * Spark version : `3.3.1`
   
   * Hive version : -
   
   * Hadoop version : -
   
   * Storage (HDFS/S3/GCS..) : `S3`
   
   * Running on Docker? (yes/no) : Yes, Running on Kubernetes
   
   
   **Additional context**
   The command I use to spawn the Spark Job:
   ```bash
   ./spark-submit \
   --jars local:///opt/spark/work-dir/hudi-spark3.3-bundle_2.12-0.13.0.jar,local:///opt/spark/work-dir/hudi-aws-bundle-0.13.0.jar,local:///opt/spark/work-dir/aws-java-sdk-bundle-1.12.398.jar,local:///opt/spark/work-dir/hadoop-aws-3.3.4.jar \
   --master k8s://http://localhost:8001 --deploy-mode cluster \
   --conf spark.kubernetes.container.image=stathisq/spark-hudi:3.3.1-0.13.0-slim \
   --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
   --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
   --conf spark.kubernetes.namespace=hudi-example \
   --conf spark.kubernetes.executor.podTemplateFile=$(pwd)/pod_templates/podTemplateExecutor.yaml \
   --conf spark.kubernetes.driver.podTemplateFile=$(pwd)/pod_templates/podTemplateDriver.yaml \
   --conf spark.kubernetes.file.upload.path=s3a://cdc-spike/spark \
   --conf spark.ui.port=4040 \
   --conf spark.driver.extraJavaOptions="-Divy.cache.dir=/tmp -Divy.home=/tmp" \
   --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \
   --class "org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer" local:///opt/spark/work-dir/hudi-utilities-slim-bundle_2.12-0.13.0.jar \
   --table-type COPY_ON_WRITE --op UPSERT  \
   --target-base-path s3a://cdc-spike/hudi/postgres/employee \
   --target-table employee \
   --min-sync-interval-seconds 60 \
   --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
   --payload-class "org.apache.hudi.payload.AWSDmsAvroPayload" \
   --hoodie-conf "hoodie.deltastreamer.source.dfs.root=s3a://cdc-spike/dms/public/employee/" \
   --source-ordering-field _dms_ingestion_timestamp \
   --hoodie-conf auto.offset.reset=earliest \
   --hoodie-conf hoodie.datasource.write.recordkey.field=id \
   --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
   --hoodie-conf hoodie.datasource.write.partitionpath.field=salary
   ```
   **Stacktrace**
   
   ```Add the stacktrace of the error.```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] stathismar commented on issue #8311: [SUPPORT] DeltaStreamer stops fetching changes after a SparkSQL Update query on its Hudi table

Posted by "stathismar (via GitHub)" <gi...@apache.org>.
stathismar commented on issue #8311:
URL: https://github.com/apache/hudi/issues/8311#issuecomment-1488225736

   @yihua If I can further help by providing more info or logs please ping me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] yihua commented on issue #8311: [SUPPORT] DeltaStreamer stops fetching changes after a SparkSQL Update query on its Hudi table

Posted by "yihua (via GitHub)" <gi...@apache.org>.
yihua commented on issue #8311:
URL: https://github.com/apache/hudi/issues/8311#issuecomment-1487761250

   Hi @stathismar Thanks for raising this.  This seems to be unexpected.  We'll need to reproduce this and find the root cause.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] stathismar commented on issue #8311: [SUPPORT] DeltaStreamer stops fetching changes after a SparkSQL Update query on its Hudi table

Posted by "stathismar (via GitHub)" <gi...@apache.org>.
stathismar commented on issue #8311:
URL: https://github.com/apache/hudi/issues/8311#issuecomment-1490438916

   Hello again. I managed to overcome the issue. I started the SparkSQL console using the official spark image from dockerhub. I.e.:
   ```bash
   docker run --env-file aws-env.txt  -it apache/spark:v3.3.1 /opt/spark/bin/spark-sql \
   --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.398,org.apache.hudi:hudi-aws-bundle:0.13.0 \
   --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
   --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
   --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
   --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
   --conf spark.driver.extraJavaOptions="-Divy.cache.dir=/tmp -Divy.home=/tmp"
   ```
   and ran exactly the same commands. **Everything worked as expected**!
   Locally I have (almost) the same Java/Scala versions as the one present in the [official Spark docker image](https://hub.docker.com/layers/apache/spark/v3.3.1/images/sha256-7b38e8ec4f04c8dbcba38ac74212a92aeff9a55d0068631df6fe65b1397b400d?context=explore) (my system has a newer point release for both Java and Scala, but I do not believe this should be a problem)
   The other thing I suspected is that in the Docker image we use the `user` with `UUID 185`. 
   I tried to set `export SPARK_USER=185` locally but it also didn't help.
   
   Anyway my issue is solved, and I do not think there is any reason to reproduce and spend more time on this since most probably it has to do with some version incompatibility.
   
   cc. @yihua 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] stathismar closed issue #8311: [SUPPORT] DeltaStreamer stops fetching changes after a SparkSQL Update query on its Hudi table

Posted by "stathismar (via GitHub)" <gi...@apache.org>.
stathismar closed issue #8311: [SUPPORT] DeltaStreamer stops fetching changes after a SparkSQL Update query on its Hudi table
URL: https://github.com/apache/hudi/issues/8311


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org