You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/09/08 01:48:50 UTC

[GitHub] [hudi] xiarixiaoyao commented on issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

xiarixiaoyao commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1240122505

   @santoshsb   you need use schema evolution and hoodie.datasource.write.reconcile.schema, see the follow codes
   
   ```
     def perf(spark: SparkSession) = {
       import org.apache.spark.sql.SaveMode
       import org.apache.spark.sql.functions._
       import org.apache.hudi.DataSourceWriteOptions
       import org.apache.hudi.DataSourceReadOptions
       import org.apache.hudi.config.HoodieWriteConfig
       import org.apache.hudi.hive.MultiPartKeysValueExtractor
   
       //Define a Patient FHIR resource, for simplicity have deleted most of the elements and retained a few
       val orgString = """{"resourceType":"Patient","id":"beca9a29-49bb-40e4-adff-4dbb4d664972","lastUpdated":"2022-02-14T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd2148a","name":[{"use":"official","family":"Keeling57","given":["Serina556"],"prefix":["Ms."]}]}"""
       val sqlContext = spark.sqlContext
       import sqlContext.implicits._
       val orgStringDf = spark.read.json(Seq(orgString).toDS)
   
       //Specify common DataSourceWriteOptions in the single hudiOptions variable
   
       val hudiOptions = Map[String,String](
         HoodieWriteConfig.TABLE_NAME -> "patient_hudi",
         DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE",
         DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
         DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "source",
         DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "lastUpdated",
         DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true")
   
       //Write the orgStringDf to a Hudi table
       orgStringDf.write
         .format("org.apache.hudi")
         .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
         .options(hudiOptions)
         .mode(SaveMode.Overwrite)
         .save("/work/data/updateTst/hudi/json_schema_tst")
       //Read the Hudi table
       val patienthudi = spark.read.format("hudi").load("/work/data/updateTst/hudi/json_schema_tst")
   
       //Printschema
       patienthudi.printSchema
       //Update: Based on our usecase add a new patient resource, this resource might contain new columns and might not have existing columns (normal use case with FHIR data)
   
       val updatedString = """{"resourceType":"Patient","id":"beca9a29-49bb-40e4-adff-4dbb4d664972","lastUpdated":"2022-02-14T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd2148a","name":[{"use":"official","family":"Keeling57","given":["Serina556"]}]}"""
   
       //Convert the new resource string into DF
       val updatedStringDf = spark.read.json(Seq(updatedString).toDS)
   
       //Check the schema of the new resource that is being added
       updatedStringDf.printSchema
   
       //Upsert the new resource
       spark.sql("set hoodie.schema.on.read.enable=true")
       updatedStringDf.write
         .format("org.apache.hudi")
         .options(hudiOptions)
         .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
         .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload")
         .option("hoodie.datasource.write.reconcile.schema", "true")
         .mode(SaveMode.Append)
         .save("/work/data/updateTst/hudi/json_schema_tst")
   
       //Read the Hudi table
       val patienthudiUpdated = spark.read.format("hudi").load("/work/data/updateTst/hudi/json_schema_tst")
   
       //Print the schema after adding the new record
       patienthudiUpdated.printSchema
   
     }
   ```
   patienthudiUpdated.schema:
     |-- _hoodie_commit_time: string (nullable = true)
    |-- _hoodie_commit_seqno: string (nullable = true)
    |-- _hoodie_record_key: string (nullable = true)
    |-- _hoodie_partition_path: string (nullable = true)
    |-- _hoodie_file_name: string (nullable = true)
    |-- id: string (nullable = true)
    |-- lastUpdated: string (nullable = true)
    |-- name: array (nullable = true)
    |    |-- element: struct (containsNull = true)
    |    |    |-- family: string (nullable = true)
    |    |    |-- given: array (nullable = true)
    |    |    |    |-- element: string (containsNull = true)
    |    |    |-- prefix: array (nullable = true)
    |    |    |    |-- element: string (containsNull = true)
    |    |    |-- use: string (nullable = true)
    |-- resourceType: string (nullable = true)
    |-- source: string (nullable = true)
   
   i think it should be ok , thanks
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org