You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/09/07 00:38:32 UTC

[GitHub] [hudi] nsivabalan commented on issue #6341: [SUPPORT] Hudi delete not working via spark apis

nsivabalan commented on issue #6341:
URL: https://github.com/apache/hudi/issues/6341#issuecomment-1238781965

   I reused our quick start guide and could able to get it working. 
   ```
   import org.apache.hudi.QuickstartUtils._
   import scala.collection.JavaConversions._
   import org.apache.spark.sql.SaveMode._
   import org.apache.hudi.DataSourceReadOptions._
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.config.HoodieWriteConfig._
   import org.apache.hudi.common.model.HoodieRecord
   
   val tableName = "hudi_trips_cow"
   val basePath = "file:///tmp/hudi_trips_cow"
   val dataGen = new DataGenerator
   
   
   // spark-shell
   val inserts = convertToStringList(dataGen.generateInserts(10))
   val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
   df.write.format("hudi").
     options(getQuickstartWriteConfigs).
     option(PRECOMBINE_FIELD_OPT_KEY, "ts").
     option(RECORDKEY_FIELD_OPT_KEY, "uuid").
     option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
     option(TABLE_NAME, tableName).
     option("hoodie.datasource.write.table.type","MERGE_ON_READ").
     mode(Overwrite).
     save(basePath)
   
     // spark-shell
   val tripsSnapshotDF = spark.
     read.
     format("hudi").
     load(basePath)
   tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
   
   spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
   spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()
   
   
   spark.sql("select distinct partitionpath, uuid, fare from hudi_trips_snapshot").show(false)
   
   // picked two random UUIDs from previous output. 
   
   val dfToDelete = spark.sql("select * from inputDf1 where uuid in ('151d3208-18d7-4b88-9e8a-4994f44bc1a9','3bbf759d-e5d7-43f0-a924-01d253b263d5') ")
   dfToDelete.show()
   
   
   dfToDelete.write.format("hudi").
     options(getQuickstartWriteConfigs).
     option(OPERATION_OPT_KEY,"delete").
     option(PRECOMBINE_FIELD_OPT_KEY, "ts").
     option(RECORDKEY_FIELD_OPT_KEY, "uuid").
     option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
     option(TABLE_NAME, tableName).
     option("hoodie.datasource.write.table.type","MERGE_ON_READ").
     mode(Append).
     save(basePath)
   
   val tripsSnapshotDF = spark.
     read.
     format("hudi").
     load(basePath)
   tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
   
   spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
   spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()
   
   
   spark.sql("select distinct partitionpath, uuid, fare from hudi_trips_snapshot").show()
   
   
   spark.sql("select distinct partitionpath, uuid, fare from hudi_trips_snapshot where uuid in ('151d3208-18d7-4b88-9e8a-4994f44bc1a9','3bbf759d-e5d7-43f0-a924-01d253b263d5')").show(false)
   +-------------+----+----+
   |partitionpath|uuid|fare|
   +-------------+----+----+
   +-------------+----+----+
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org