You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/08/03 19:39:20 UTC

[GitHub] [hudi] mithalee commented on issue #3336: [SUPPORT] Delete not functioning with deltastreamer

mithalee commented on issue #3336:
URL: https://github.com/apache/hudi/issues/3336#issuecomment-892112742


   @codope Hi, I did try the HoodieDeltaStreamer on below version of EMR:
   Release label:emr-6.3.0
   Hadoop distribution:Amazon 3.2.1
   Applications:Tez 0.9.2, Spark 3.1.1, Hive 3.1.2, JupyterHub 1.2.0, Sqoop 1.4.7, Zeppelin 0.9.0, Hue 4.9.0, Presto 0.245.1
   HUDI Utility: https://mvnrepository.com/artifact/org.apache.hudi/hudi-utilities-bundle_2.12/0.8.0
   The HoodieDeltaStreamer works as expected(upserts/deletes).  But the same data set and same spark submit configuration does not work on the Spark on K8 binaries.
   https://spark.apache.org/docs/3.1.1/submitting-applications.html
   I can perform the initial insert into Hudi table through the below spark submit but the upserts/deletes are throwing error. Can you confirm if the upserts /deletes works using the DS in Spark on K8s.
   
   **SPARK SUBMIT**
   spark-submit --master k8s://https://...sk1.us-west-1.eks.amazonaws.com 
   --deploy-mode cluster 
   --name spark-hudi 
   --jars https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.1/spark-avro_2.12-3.1.1.jar 
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
   --conf spark.executor.instances=1 
   --conf spark.kubernetes.container.image=d.../spark:spark-hudi-0.2 
   --conf spark.kubernetes.namespace=spark-k8 
   --conf spark.kubernetes.container.image.pullSecrets=dockercloud-secret 
   --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark 
   --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem 
   --conf spark.hadoop.fs.s3a.endpoint=s3.us-west-1.amazonaws.com 
   --conf spark.hadoop.fs.s3a.access.key='A........' 
   --conf spark.hadoop.fs.s3a.secret.key='L.........' 
   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
   s3a://l...v/hudi-root/spark-submit-jars/hudi-utilities-bundle_2.12-0.8.0.jar 
   --table-type COPY_ON_WRITE --source-ordering-field one 
   --source-class org.apache.hudi.utilities.sources.ParquetDFSSource 
   --target-base-path s3a://......./hudi-root/transformed-tables/hudi_writer_mm7/ 
   --target-table test_table --base-file-format PARQUET 
   --hoodie-conf hoodie.datasource.write.recordkey.field=one 
   --hoodie-conf hoodie.datasource.write.partitionpath.field=two 
   --hoodie-conf hoodie.deltastreamer.source.dfs.root=s3a://....../jen/example_4.parquet  
   
   **Parquet file example_4.parquet  :**
   import pyarrow.parquet as pq
   
   import numpy as np
   import pandas as pd
   import pyarrow as pa
   import uuid
   
   df = pd.DataFrame({'one': [-1, 3, 2.5],
   'two': [100, 200, 300],
   'three': [True, True, True],
   '_hoodie_is_deleted': [False, False, False]},
   index=list('abc'))
   table = pa.Table.from_pandas(df)
   print(table)
   pq.write_table(table, 'example_4.parquet')
   
   **Delete file:**
   import pyarrow.parquet as pq
   
   import numpy as np
   import pandas as pd
   import pyarrow as pa
   import uuid
   df = pd.DataFrame({'one': [-1, 3, 2.5],
   'two': [100, 200, 300],
   'three': [True, True, True],
   '_hoodie_is_deleted': [True, False, False]},
   index=list('abc'))
   table = pa.Table.from_pandas(df)
   print(table)
   pq.write_table(table, 'example_delete4.parquet')


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org