You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/04/27 18:48:07 UTC

[GitHub] [hudi] santoshsb opened a new issue, #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

santoshsb opened a new issue, #5452:
URL: https://github.com/apache/hudi/issues/5452

   Hi Team,
   
   We are currently evaluating Hudi for our analytical use cases and as part of this exercise we are facing few issues with schema evolution and data loss. The current issue which we have encountered is while updating a record. We have currently inserted a single record with the following schema 
   `
   root
    |-- birthDate: string (nullable = true)
    |-- gender: string (nullable = true)
    |-- id: string (nullable = true)
    |-- lastUpdated: string (nullable = true)
    |-- maritalStatus: struct (nullable = true)
    |    |-- coding: array (nullable = true)
    |    |    |-- element: struct (containsNull = true)
    |    |    |    |-- code: string (nullable = true)
    |    |    |    |-- display: string (nullable = true)
    |    |    |    |-- system: string (nullable = true)
    |    |-- text: string (nullable = true)
    |-- resourceType: string (nullable = true)
    |-- source: string (nullable = true)`
   
   now when we insert the new data with the following schema
   
   `root
    |-- birthDate: string (nullable = true)
    |-- gender: string (nullable = true)
    |-- id: string (nullable = true)
    |-- lastUpdated: string (nullable = true)
    |-- multipleBirthBoolean: boolean (nullable = true)
    |-- resourceType: string (nullable = true)
    |-- source: string (nullable = true)`
   
   The update is successful but the schema is missing the  
   ` |-- maritalStatus: struct (nullable = true)
    |    |-- coding: array (nullable = true)
    |    |    |-- element: struct (containsNull = true)
    |    |    |    |-- code: string (nullable = true)
    |    |    |    |-- display: string (nullable = true)
    |    |    |    |-- system: string (nullable = true)
    |    |-- text: string (nullable = true)`
   
   field.  our expected behaviour was that after adding the second entry, the new column "multipleBirthBoolean" will be added to the overall schema and the previous column  "maritalStatus" struct will be retained and will be null for the second entry.  The final schema looks like this, 
   `root
    |-- _hoodie_commit_time: string (nullable = true)
    |-- _hoodie_commit_seqno: string (nullable = true)
    |-- _hoodie_record_key: string (nullable = true)
    |-- _hoodie_partition_path: string (nullable = true)
    |-- _hoodie_file_name: string (nullable = true)
    |-- birthDate: string (nullable = true)
    |-- gender: string (nullable = true)
    |-- id: string (nullable = true)
    |-- lastUpdated: string (nullable = true)
    |-- multipleBirthBoolean: boolean (nullable = true)
    |-- resourceType: string (nullable = true)
    |-- source: string (nullable = true)`
   
   Basically when a new entry is added and it is missing a column from the destination schema the update is successful and the missing column vanishes from the previous entries. Let us know if we are missing any configuration options.  We cannot control the schema as its defined by FHIR standards (https://www.hl7.org/fhir/patient.html#resource) most of the fields here are optional so the incoming data from our customers will be missing certain columns.
   
   **Environment Description**
   
   * Hudi version : 0.12.0-SNAPSHOT
   
   * Spark version : 3.2.1
   
   * Hive version :
   
   * Hadoop version :
   
   * Storage (HDFS/S3/GCS..) : Local
   
   * Running on Docker? (yes/no) : no
   
   Thanks for the help.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
codope commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1239058430

   Please try without this config `spark.hadoop.parquet.avro.write-old-list-structure`.
   cc @xiarixiaoyao 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xiarixiaoyao commented on issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
xiarixiaoyao commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1113004662

   @santoshsb  it was strange, let me try it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] santoshsb commented on issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
santoshsb commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1112864560

   Hi @xiarixiaoyao, thanks for the code. It worked like a charm for the reduced json as provided above. After successfully testing it with the reduced schema, we used the complete schema (https://www.hl7.org/fhir/patient.html#resource). Even though the source and target schema are matching the following error is thrown while updating a record (both the schemas are provided below for reference),
   
   `Driver stacktrace:
     at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
     at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
     at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
     at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
     at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
     at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
     at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
     at scala.Option.foreach(Option.scala:407)
     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
     at org.apache.spark.rdd.RDD.count(RDD.scala:1253)
     at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:646)
     at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:314)
     at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:163)
     at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
     at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
     at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
     at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
     at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
     at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
     at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
     at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
     at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
     at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
     at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
     at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
     at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
     at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
     at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
     at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
     at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
     at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
     at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
     at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
     at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
     at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
     at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
     at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
     at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
     at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
     at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
     at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
     ... 67 elided
   Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
     at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
     at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
     at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
     at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
     at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
     at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
     at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
     at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
     at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
     at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)
     at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
     at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
     at org.apache.spark.scheduler.Task.run(Task.scala:131)
     at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
     at java.lang.Thread.run(Thread.java:748)
   Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
     at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
     at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
     at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
     at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
     ... 28 more
   Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
     at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:160)
     at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:147)
     ... 31 more
   Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
     at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:154)
     ... 32 more
   Caused by: org.apache.hudi.exception.HoodieException: operation has failed
     at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:248)
     at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
     at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
     at org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:278)
     at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
     at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:134)
     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
     ... 3 more
   Caused by: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file
     at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
     at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
     at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:105)
     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
     ... 4 more
   Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema mismatch: Avro field 'coding' not found
     at org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:221)
     at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:126)
     at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:284)
     at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:228)
     at org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:74)
     at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:539)
     at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:489)
     at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:293)
     at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:137)
     at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:91)
     at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
     at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:142)
     at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:185)
     at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
     at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)`
     
     
   Current table schema is as follows,
   
   `scala> patienthudi.printSchema
   root
    |-- _hoodie_commit_time: string (nullable = true)
    |-- _hoodie_commit_seqno: string (nullable = true)
    |-- _hoodie_record_key: string (nullable = true)
    |-- _hoodie_partition_path: string (nullable = true)
    |-- _hoodie_file_name: string (nullable = true)
    |-- address: array (nullable = true)
    |    |-- element: struct (containsNull = true)
    |    |    |-- city: string (nullable = true)
    |    |    |-- country: string (nullable = true)
    |    |    |-- extension: array (nullable = true)
    |    |    |    |-- element: struct (containsNull = true)
    |    |    |    |    |-- extension: array (nullable = true)
    |    |    |    |    |    |-- element: struct (containsNull = true)
    |    |    |    |    |    |    |-- url: string (nullable = true)
    |    |    |    |    |    |    |-- valueDecimal: double (nullable = true)
    |    |    |    |    |-- url: string (nullable = true)
    |    |    |-- line: array (nullable = true)
    |    |    |    |-- element: string (containsNull = true)
    |    |    |-- postalCode: string (nullable = true)
    |    |    |-- state: string (nullable = true)
    |-- birthDate: string (nullable = true)
    |-- communication: array (nullable = true)
    |    |-- element: struct (containsNull = true)
    |    |    |-- language: struct (nullable = true)
    |    |    |    |-- coding: array (nullable = true)
    |    |    |    |    |-- element: struct (containsNull = true)
    |    |    |    |    |    |-- code: string (nullable = true)
    |    |    |    |    |    |-- display: string (nullable = true)
    |    |    |    |    |    |-- system: string (nullable = true)
    |    |    |    |-- text: string (nullable = true)
    |-- extension: array (nullable = true)
    |    |-- element: struct (containsNull = true)
    |    |    |-- extension: array (nullable = true)
    |    |    |    |-- element: struct (containsNull = true)
    |    |    |    |    |-- url: string (nullable = true)
    |    |    |    |    |-- valueCoding: struct (nullable = true)
    |    |    |    |    |    |-- code: string (nullable = true)
    |    |    |    |    |    |-- display: string (nullable = true)
    |    |    |    |    |    |-- system: string (nullable = true)
    |    |    |    |    |-- valueString: string (nullable = true)
    |    |    |-- url: string (nullable = true)
    |    |    |-- valueAddress: struct (nullable = true)
    |    |    |    |-- city: string (nullable = true)
    |    |    |    |-- country: string (nullable = true)
    |    |    |    |-- state: string (nullable = true)
    |    |    |-- valueCode: string (nullable = true)
    |    |    |-- valueDecimal: double (nullable = true)
    |    |    |-- valueString: string (nullable = true)
    |-- gender: string (nullable = true)
    |-- id: string (nullable = true)
    |-- identifier: array (nullable = true)
    |    |-- element: struct (containsNull = true)
    |    |    |-- system: string (nullable = true)
    |    |    |-- type: struct (nullable = true)
    |    |    |    |-- coding: array (nullable = true)
    |    |    |    |    |-- element: struct (containsNull = true)
    |    |    |    |    |    |-- code: string (nullable = true)
    |    |    |    |    |    |-- display: string (nullable = true)
    |    |    |    |    |    |-- system: string (nullable = true)
    |    |    |    |-- text: string (nullable = true)
    |    |    |-- value: string (nullable = true)
    |-- lastUpdated: string (nullable = true)
    |-- managingOrganization: struct (nullable = true)
    |    |-- reference: string (nullable = true)
    |    |-- type: string (nullable = true)
    |-- maritalStatus: struct (nullable = true)
    |    |-- coding: array (nullable = true)
    |    |    |-- element: struct (containsNull = true)
    |    |    |    |-- code: string (nullable = true)
    |    |    |    |-- display: string (nullable = true)
    |    |    |    |-- system: string (nullable = true)
    |    |-- text: string (nullable = true)
    |-- meta: struct (nullable = true)
    |    |-- extension: array (nullable = true)
    |    |    |-- element: struct (containsNull = true)
    |    |    |    |-- url: string (nullable = true)
    |    |    |    |-- valueString: string (nullable = true)
    |    |-- lastUpdated: string (nullable = true)
    |    |-- source: string (nullable = true)
    |    |-- versionId: string (nullable = true)
    |-- multipleBirthBoolean: boolean (nullable = true)
    |-- multipleBirthInteger: long (nullable = true)
    |-- name: array (nullable = true)
    |    |-- element: struct (containsNull = true)
    |    |    |-- family: string (nullable = true)
    |    |    |-- given: array (nullable = true)
    |    |    |    |-- element: string (containsNull = true)
    |    |    |-- prefix: array (nullable = true)
    |    |    |    |-- element: string (containsNull = true)
    |    |    |-- use: string (nullable = true)
    |-- resourceType: string (nullable = true)
    |-- source: string (nullable = true)
    |-- telecom: array (nullable = true)
    |    |-- element: struct (containsNull = true)
    |    |    |-- system: string (nullable = true)
    |    |    |-- use: string (nullable = true)
    |    |    |-- value: string (nullable = true)
    |-- text: struct (nullable = true)
    |    |-- div: string (nullable = true)
    |    |-- status: string (nullable = true)`
   
   Incoming/Update dataframe schema is as follows after using the code provided by you
   
   `scala> updatedStringDf.printSchema
   root
    |-- _hoodie_commit_time: string (nullable = true)
    |-- _hoodie_commit_seqno: string (nullable = true)
    |-- _hoodie_record_key: string (nullable = true)
    |-- _hoodie_partition_path: string (nullable = true)
    |-- _hoodie_file_name: string (nullable = true)
    |-- address: array (nullable = true)
    |    |-- element: struct (containsNull = true)
    |    |    |-- city: string (nullable = true)
    |    |    |-- country: string (nullable = true)
    |    |    |-- extension: array (nullable = true)
    |    |    |    |-- element: struct (containsNull = true)
    |    |    |    |    |-- extension: array (nullable = true)
    |    |    |    |    |    |-- element: struct (containsNull = true)
    |    |    |    |    |    |    |-- url: string (nullable = true)
    |    |    |    |    |    |    |-- valueDecimal: double (nullable = true)
    |    |    |    |    |-- url: string (nullable = true)
    |    |    |-- line: array (nullable = true)
    |    |    |    |-- element: string (containsNull = true)
    |    |    |-- postalCode: string (nullable = true)
    |    |    |-- state: string (nullable = true)
    |-- birthDate: string (nullable = true)
    |-- communication: array (nullable = true)
    |    |-- element: struct (containsNull = true)
    |    |    |-- language: struct (nullable = true)
    |    |    |    |-- coding: array (nullable = true)
    |    |    |    |    |-- element: struct (containsNull = true)
    |    |    |    |    |    |-- code: string (nullable = true)
    |    |    |    |    |    |-- display: string (nullable = true)
    |    |    |    |    |    |-- system: string (nullable = true)
    |    |    |    |-- text: string (nullable = true)
    |-- extension: array (nullable = true)
    |    |-- element: struct (containsNull = true)
    |    |    |-- extension: array (nullable = true)
    |    |    |    |-- element: struct (containsNull = true)
    |    |    |    |    |-- url: string (nullable = true)
    |    |    |    |    |-- valueCoding: struct (nullable = true)
    |    |    |    |    |    |-- code: string (nullable = true)
    |    |    |    |    |    |-- display: string (nullable = true)
    |    |    |    |    |    |-- system: string (nullable = true)
    |    |    |    |    |-- valueString: string (nullable = true)
    |    |    |-- url: string (nullable = true)
    |    |    |-- valueAddress: struct (nullable = true)
    |    |    |    |-- city: string (nullable = true)
    |    |    |    |-- country: string (nullable = true)
    |    |    |    |-- state: string (nullable = true)
    |    |    |-- valueCode: string (nullable = true)
    |    |    |-- valueDecimal: double (nullable = true)
    |    |    |-- valueString: string (nullable = true)
    |-- gender: string (nullable = true)
    |-- id: string (nullable = true)
    |-- identifier: array (nullable = true)
    |    |-- element: struct (containsNull = true)
    |    |    |-- system: string (nullable = true)
    |    |    |-- type: struct (nullable = true)
    |    |    |    |-- coding: array (nullable = true)
    |    |    |    |    |-- element: struct (containsNull = true)
    |    |    |    |    |    |-- code: string (nullable = true)
    |    |    |    |    |    |-- display: string (nullable = true)
    |    |    |    |    |    |-- system: string (nullable = true)
    |    |    |    |-- text: string (nullable = true)
    |    |    |-- value: string (nullable = true)
    |-- lastUpdated: string (nullable = true)
    |-- managingOrganization: struct (nullable = true)
    |    |-- reference: string (nullable = true)
    |    |-- type: string (nullable = true)
    |-- maritalStatus: struct (nullable = true)
    |    |-- coding: array (nullable = true)
    |    |    |-- element: struct (containsNull = true)
    |    |    |    |-- code: string (nullable = true)
    |    |    |    |-- display: string (nullable = true)
    |    |    |    |-- system: string (nullable = true)
    |    |-- text: string (nullable = true)
    |-- meta: struct (nullable = true)
    |    |-- extension: array (nullable = true)
    |    |    |-- element: struct (containsNull = true)
    |    |    |    |-- url: string (nullable = true)
    |    |    |    |-- valueString: string (nullable = true)
    |    |-- lastUpdated: string (nullable = true)
    |    |-- source: string (nullable = true)
    |    |-- versionId: string (nullable = true)
    |-- multipleBirthBoolean: boolean (nullable = true)
    |-- multipleBirthInteger: long (nullable = true)
    |-- name: array (nullable = true)
    |    |-- element: struct (containsNull = true)
    |    |    |-- family: string (nullable = true)
    |    |    |-- given: array (nullable = true)
    |    |    |    |-- element: string (containsNull = true)
    |    |    |-- prefix: array (nullable = true)
    |    |    |    |-- element: string (containsNull = true)
    |    |    |-- use: string (nullable = true)
    |-- resourceType: string (nullable = true)
    |-- source: string (nullable = true)
    |-- telecom: array (nullable = true)
    |    |-- element: struct (containsNull = true)
    |    |    |-- system: string (nullable = true)
    |    |    |-- use: string (nullable = true)
    |    |    |-- value: string (nullable = true)
    |-- text: struct (nullable = true)
    |    |-- div: string (nullable = true)
    |    |-- status: string (nullable = true)`
   
   We have seen this issue in the troubleshooting guide but thats when there is a schema, here both the schema are identical. Any pointers will be helpfull.
   
   Thanks,
   Santosh
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] santoshsb commented on issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
santoshsb commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1134144289

   thanks @xiarixiaoyao, our schema for storing data as defined by FHIR standards https://www.hl7.org/fhir/patient.schema.json.html seams to be complicated, as most of the fields here are optional the incoming data will always be missing few elements (nested as well as those on the root level). The missing root element is fixed by the code you provided. Thanks for looking into this issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] santoshsb commented on issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
santoshsb commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1117269863

   @xiarixiaoyao FYI, the createNewDF code throws the following error 
   `Caused by: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of array<string>
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.ValidateExternalType_2$(Unknown Source)
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects_1$(Unknown Source)
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.createNamedStruct_0_1$(Unknown Source)
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects_2$(Unknown Source)
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_1_3$(Unknown Source)
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
   	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:207)`
   
   With the following data, Inserted (Schema simplified to highlight the issue),
   `{
     "resourceType": "Patient",
     "id": "beca9a29-49bb-40e4-adff-4dbb4d664972",
     "lastUpdated": "2022-02-14T15:18:18.90836+05:30",
     "source": "4a0701fe-5c3b-482b-895d-875fcbd2148a",
     "name": [
       {
         "use": "official",
         "family": "Keeling57",
         "given": [
           "Serina556"
         ],
         "prefix": [
           "Ms."
         ]
       }
     ]
   }`
   
   Update 
   `{
     "resourceType": "Patient",
     "id": "beca9a29-49bb-40e4-adff-4dbb4d664972",
     "lastUpdated": "2022-02-14T15:18:18.90836+05:30",
     "source": "4a0701fe-5c3b-482b-895d-875fcbd2148a",
     "name": [
       {
         "use": "official",
         "family": "Keeling57",
         "given": [
           "Serina556"
         ]
       }
     ]
   }`
   
   While updating with the second JSON the prefix is missing and based on the createNewDF it should add that column (verified) with null.
   
   Thanks,
   Santosh


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] santoshsb commented on issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
santoshsb commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1113405600

   @xiarixiaoyao thanks for helping out, let me know if you need any more information. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xiarixiaoyao commented on issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
xiarixiaoyao commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1118100800

   @santoshsb 
   createNewDF  cannot support rewrite DataFrame with nested schema change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] santoshsb commented on issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
santoshsb commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1240290474

   @xiarixiaoyao thanks for the input, it works with these options `      .option("hoodie.schema.on.read.enable", "true")
         .option("hoodie.datasource.write.reconcile.schema", "true")` as expected.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] santoshsb commented on issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
santoshsb commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1111715248

   @xiarixiaoyao we are not concerned about the position as long as its there in the schema (either as the last column or somewhere else) along with all the existing columns.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xiarixiaoyao commented on issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
xiarixiaoyao commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1111963890

   @santoshsb  pls use follow code to solve your problem
   ```
       def createNewDF(df: DataFrame, oldTableSchema: StructType): DataFrame = {
         val writeSchema = df.schema
         val neededSchema = {
           val neededFields = mutable.ListBuffer[StructField]()
           oldTableSchema.foreach(neededFields.append(_))
           writeSchema.filterNot { col =>
             oldTableSchema.exists(targetCol => SQLConf.get.resolver(targetCol.name, col.name))
           }.foreach(neededFields.append(_))
           StructType(neededFields)
         }
   
         val missingCols = {
           neededSchema.zipWithIndex.map { case (field, index) =>
             (index,  writeSchema.indexWhere(p => SQLConf.get.resolver(p.name, field.name)))
           }.toMap
         }
   
         val filledRdd = df.rdd.mapPartitions { iter =>
           iter.map { row =>
             val tmp = new Array[Any](neededSchema.length)
             for (i <- (0 to tmp.length - 1)) {
               val index = missingCols.getOrDefault(i, -1)
               tmp.update(i, if (index != -1) row.get(index) else null)
             }
             Row.fromSeq(tmp)
           }
         }
         spark.createDataFrame(filledRdd, neededSchema)
       }
   
       val orgString = """{"resourceType":"Patient","id":"4ad86a5c-926e-439b-9352-f8ac9ab780f1","lastUpdated":"2022-03-11T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd21481","gender":"male","birthDate":"1974-01-05","maritalStatus":{"coding":[{"system":"http://terminology.hl7.org/CodeSystem/v3-MaritalStatus","code":"M","display":"M"}],"text":"M"}}"""
   
       val sqlContext = spark.sqlContext
       import sqlContext.implicits._
       val orgStringDf = spark.read.json(Seq(orgString).toDS())
   
       val hudiOptions = Map[String,String](
         HoodieWriteConfig.TABLE_NAME -> "patient_hudi",
         DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE",
         DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
         DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "source",
         DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "lastUpdated",
         DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true"
       )
   
       orgStringDf.write
         .format("org.apache.hudi")
         .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
         .options(hudiOptions)
         .mode(SaveMode.Overwrite)
         .save("/tmp/default/clustering/updateTst/json_schema_tst/hudi")
   
       val patienthudi = spark.read.format("hudi").load("/tmp/default/clustering/updateTst/json_schema_tst/hudi")
   
   
       val updatedString = """{"resourceType":"Patient","id":"596c7a94-bada-4303-85d4-7067c586999e","lastUpdated":"2022-04-20T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd2148a","gender":"female","birthDate":"2005-08-30","multipleBirthBoolean":true}"""
       val updatedStringDf = createNewDF(spark.read.json(Seq(updatedString).toDS), patienthudi.schema)
   
       updatedStringDf.write
         .format("org.apache.hudi")
         .options(hudiOptions)
         .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
         .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload")
         .mode(SaveMode.Append)
         .save("/tmp/default/clustering/updateTst/json_schema_tst/hudi")
       val patienthudi1 = spark.read.format("hudi").load("/tmp/default/clustering/updateTst/json_schema_tst/hudi")
       patienthudi1.select("id","gender","maritalStatus").show(false)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] santoshsb commented on issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
santoshsb commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1159671087

   @kazdy thanks for the followup, we had solved this issue at the root level of the schema by the code provided by @xiarixiaoyao. If you check the code (on the top of the post) it merges table columns into the incoming dataframe.
   As our schema was nested, we started facing similar issues with the nested columns which we have not resolved yet. And because we started adding null for the missing columns we faced this another issue, `Caused by: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of array<string> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.ValidateExternalType_2$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects_1$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.createNamedStruct_0_1$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects_2$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_1_3$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProject
 ion.apply(Unknown Source) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:207)` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] santoshsb commented on issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
santoshsb commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1229823108

   Thanks for fixing this, we can close the issue. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
codope commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1240323859

   Great! Gonna close this issue then. FYI, we also plan to flip the default for schema reconciliation in the next release. See #6196 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xiarixiaoyao commented on issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
xiarixiaoyao commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1240122505

   @santoshsb   you need use schema evolution and hoodie.datasource.write.reconcile.schema, see the follow codes
   
   ```
     def perf(spark: SparkSession) = {
       import org.apache.spark.sql.SaveMode
       import org.apache.spark.sql.functions._
       import org.apache.hudi.DataSourceWriteOptions
       import org.apache.hudi.DataSourceReadOptions
       import org.apache.hudi.config.HoodieWriteConfig
       import org.apache.hudi.hive.MultiPartKeysValueExtractor
   
       //Define a Patient FHIR resource, for simplicity have deleted most of the elements and retained a few
       val orgString = """{"resourceType":"Patient","id":"beca9a29-49bb-40e4-adff-4dbb4d664972","lastUpdated":"2022-02-14T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd2148a","name":[{"use":"official","family":"Keeling57","given":["Serina556"],"prefix":["Ms."]}]}"""
       val sqlContext = spark.sqlContext
       import sqlContext.implicits._
       val orgStringDf = spark.read.json(Seq(orgString).toDS)
   
       //Specify common DataSourceWriteOptions in the single hudiOptions variable
   
       val hudiOptions = Map[String,String](
         HoodieWriteConfig.TABLE_NAME -> "patient_hudi",
         DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE",
         DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
         DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "source",
         DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "lastUpdated",
         DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true")
   
       //Write the orgStringDf to a Hudi table
       orgStringDf.write
         .format("org.apache.hudi")
         .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
         .options(hudiOptions)
         .mode(SaveMode.Overwrite)
         .save("/work/data/updateTst/hudi/json_schema_tst")
       //Read the Hudi table
       val patienthudi = spark.read.format("hudi").load("/work/data/updateTst/hudi/json_schema_tst")
   
       //Printschema
       patienthudi.printSchema
       //Update: Based on our usecase add a new patient resource, this resource might contain new columns and might not have existing columns (normal use case with FHIR data)
   
       val updatedString = """{"resourceType":"Patient","id":"beca9a29-49bb-40e4-adff-4dbb4d664972","lastUpdated":"2022-02-14T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd2148a","name":[{"use":"official","family":"Keeling57","given":["Serina556"]}]}"""
   
       //Convert the new resource string into DF
       val updatedStringDf = spark.read.json(Seq(updatedString).toDS)
   
       //Check the schema of the new resource that is being added
       updatedStringDf.printSchema
   
       //Upsert the new resource
       spark.sql("set hoodie.schema.on.read.enable=true")
       updatedStringDf.write
         .format("org.apache.hudi")
         .options(hudiOptions)
         .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
         .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload")
         .option("hoodie.datasource.write.reconcile.schema", "true")
         .mode(SaveMode.Append)
         .save("/work/data/updateTst/hudi/json_schema_tst")
   
       //Read the Hudi table
       val patienthudiUpdated = spark.read.format("hudi").load("/work/data/updateTst/hudi/json_schema_tst")
   
       //Print the schema after adding the new record
       patienthudiUpdated.printSchema
   
     }
   ```
   patienthudiUpdated.schema:
     |-- _hoodie_commit_time: string (nullable = true)
    |-- _hoodie_commit_seqno: string (nullable = true)
    |-- _hoodie_record_key: string (nullable = true)
    |-- _hoodie_partition_path: string (nullable = true)
    |-- _hoodie_file_name: string (nullable = true)
    |-- id: string (nullable = true)
    |-- lastUpdated: string (nullable = true)
    |-- name: array (nullable = true)
    |    |-- element: struct (containsNull = true)
    |    |    |-- family: string (nullable = true)
    |    |    |-- given: array (nullable = true)
    |    |    |    |-- element: string (containsNull = true)
    |    |    |-- prefix: array (nullable = true)
    |    |    |    |-- element: string (containsNull = true)
    |    |    |-- use: string (nullable = true)
    |-- resourceType: string (nullable = true)
    |-- source: string (nullable = true)
   
   i think it should be ok , thanks
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] santoshsb commented on issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
santoshsb commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1161712629

   @xiarixiaoyao as mentioned earlier we didn't solve the nested column case, we are currently trying to finalize a fixed schema and while reading in the data with spark use this schema to avoid any schema evolution. Let me know if you need the sample examples which lead to the issue, here are the two messages.
   
   `{
     "resourceType": "Patient",
     "id": "beca9a29-49bb-40e4-adff-4dbb4d664972",
     "lastUpdated": "2022-02-14T15:18:18.90836+05:30",
     "source": "4a0701fe-5c3b-482b-895d-875fcbd2148a",
     "name": [
       {
         "use": "official",
         "family": "Keeling57",
         "given": [
           "Serina556"
         ],
         "prefix": [
           "Ms."
         ]
       }
     ]
   }`
   
   This one missing the prefix,
   `{
     "resourceType": "Patient",
     "id": "beca9a29-49bb-40e4-adff-4dbb4d664972",
     "lastUpdated": "2022-02-14T15:18:18.90836+05:30",
     "source": "4a0701fe-5c3b-482b-895d-875fcbd2148a",
     "name": [
       {
         "use": "official",
         "family": "Keeling57",
         "given": [
           "Serina556"
         ]
       }
     ]
   }`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] santoshsb commented on issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
santoshsb commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1112883931

   @xiarixiaoyao We did another test, we used this JSON string 
   `{"resourceType":"Patient","id":"596c7a94-bada-4303-85d4-7067c586999e","lastUpdated":"2022-04-20T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd2148a","meta":{"extension":[{"url":"http://navify.com/fhir/StructureDefinition/createdBy","valueString":"00u4o6bbvAeNZkXKL296"},{"url":"http://navify.com/fhir/StructureDefinition/modifiedBy","valueString":"00u4o6bbvAeNZkXKL296"}],"versionId":"9","lastUpdated":"2022-04-20T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd2148a"},"text":{"status":"generated","div":"<divxmlns=\"http://www.w3.org/1999/xhtml\">Generatedby<ahref=\"https://github.com/synthetichealth/synthea\">Synthea</a>.Versionidentifier:v2.4.0-483-gad18e5f2\n.Personseed:8166940888549305472Populationseed:1648664610882</div>"},"extension":[{"url":"http://hl7.org/fhir/us/core/StructureDefinition/us-core-race","extension":[{"url":"ombCategory","valueCoding":{"system":"urn:oid:2.16.840.1.113883.6.238","code":"2106-3","display":"White"}},{"url":"text","va
 lueString":"White"}]},{"url":"http://hl7.org/fhir/us/core/StructureDefinition/us-core-ethnicity","extension":[{"url":"ombCategory","valueCoding":{"system":"urn:oid:2.16.840.1.113883.6.238","code":"2186-5","display":"NotHispanicorLatino"}},{"url":"text","valueString":"NotHispanicorLatino"}]},{"url":"http://hl7.org/fhir/StructureDefinition/patient-mothersMaidenName","valueString":"Deloise241Orn563"},{"url":"http://hl7.org/fhir/us/core/StructureDefinition/us-core-birthsex","valueCode":"F"},{"url":"http://hl7.org/fhir/StructureDefinition/patient-birthPlace","valueAddress":{"city":"Westwood","state":"Massachusetts","country":"US"}},{"url":"http://synthetichealth.github.io/synthea/disability-adjusted-life-years","valueDecimal":0.03310522209092858},{"url":"http://synthetichealth.github.io/synthea/quality-adjusted-life-years","valueDecimal":15.966894777909072}],"identifier":[{"system":"https://github.com/synthetichealth/synthea","value":"e6a8e22f-7cf2-4f07-8ad3-ec34479124da"},{"type":{"codi
 ng":[{"system":"http://terminology.hl7.org/CodeSystem/v2-0203","code":"MR","display":"MedicalRecordNumber"}],"text":"MedicalRecordNumber"},"system":"http://hospital.smarthealthit.org","value":"e6a8e22f-7cf2-4f07-8ad3-ec34479124da"},{"type":{"coding":[{"system":"http://terminology.hl7.org/CodeSystem/v2-0203","code":"SS","display":"SocialSecurityNumber"}],"text":"SocialSecurityNumber"},"system":"http://hl7.org/fhir/sid/us-ssn","value":"999-35-9642"}],"name":[{"use":"official","family":"Haley279","given":["Vinita997"]}],"telecom":[{"system":"phone","value":"555-213-3658","use":"home"}],"gender":"female","birthDate":"2005-08-30","address":[{"extension":[{"url":"http://hl7.org/fhir/StructureDefinition/geolocation","extension":[{"url":"latitude","valueDecimal":42.09490663875005},{"url":"longitude","valueDecimal":-70.82289517957093}]}],"line":["691RunolfsdottirParadeApt5"],"city":"Hanson","state":"Massachusetts","country":"US"}],"maritalStatus":{"coding":[{"system":"http://terminology.hl7.
 org/CodeSystem/v3-MaritalStatus","code":"S","display":"NeverMarried"}],"text":"NeverMarried"},"multipleBirthBoolean":false,"communication":[{"language":{"coding":[{"system":"urn:ietf:bcp:47","code":"en-US","display":"English"}],"text":"English"}}],"managingOrganization":{"reference":"7b3f7052-123b-46b7-a8b6-a0e87daaea03","type":"Organization"}}`
   
   First we inserted it and later the same dataframe was used to update without any modifications. The same error mentioned above was thrown.
   
   Thanks,
   Santosh


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xiarixiaoyao commented on issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
xiarixiaoyao commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1111713903

   @santoshsb
   multipleBirthBoolean is a new column to be added,  but How to determine its added position?  is it added as the last column or somewhere  else ? 
   
   If the above questions can be answered , i think i can help you to solve the problem
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] santoshsb commented on issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
santoshsb commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1238356005

   @nsivabalan @xiarixiaoyao I tested this fix locally, checked out the latest master branch and built the code using the command `mvn clean package -DskipTests -Dspark3.2 -Dscala-2.12` used the generated jar and launched the spark-shell using the command 
   **./spark-3.2.1-bin-hadoop3.2/bin/spark-shell \
     --jars `ls packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.13.0-SNAPSHOT.jar` \
     --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' --conf 'spark.hadoop.parquet.avro.write-old-list-structure=false'**
   We still face the above mentioned issues, am I missing something here ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] santoshsb commented on issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
santoshsb commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1239394929

   @codope here is the output without the above mentioned config, have also added the code which am using for testing the fix.
   --------------ERROR--------------------
   `22/09/07 18:53:08 ERROR BoundedInMemoryExecutor: error producing records) / 200]
   org.apache.hudi.exception.HoodieException: unable to read next record from parquet file
   	at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
   	at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
   	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106)
   	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:748)
   Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema mismatch: Avro field 'prefix' not found
   	at org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:221)
   	at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:126)
   	at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:284)
   	at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:228)
   	at org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:74)
   	at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:480)
   	at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:293)
   	at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:137)
   	at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:91)
   	at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
   	at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:142)
   	at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:185)
   	at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
   	at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
   	at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
   	... 8 more
   22/09/07 18:53:09 ERROR BoundedInMemoryExecutor: error consuming records 1) / 1]`
   
   --------------CODE---------------------
   `~/work/spark-3.2.1-bin-hadoop3.2/bin/spark-shell  --jars `ls packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.13.0-SNAPSHOT.jar` --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog
   
   
   import org.apache.spark.sql.SaveMode
   import org.apache.spark.sql.functions._
   import org.apache.hudi.DataSourceWriteOptions
   import org.apache.hudi.DataSourceReadOptions
   import org.apache.hudi.config.HoodieWriteConfig
   import org.apache.hudi.hive.MultiPartKeysValueExtractor
   
   //Define a Patient FHIR resource, for simplicity have deleted most of the elements and retained a few
   val orgString = """{"resourceType":"Patient","id":"beca9a29-49bb-40e4-adff-4dbb4d664972","lastUpdated":"2022-02-14T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd2148a","name":[{"use":"official","family":"Keeling57","given":["Serina556"],"prefix":["Ms."]}]}"""
   
   
   val orgStringDf = spark.read.json(Seq(orgString).toDS)
   
   //Specify common DataSourceWriteOptions in the single hudiOptions variable
   
   val hudiOptions = Map[String,String](
   HoodieWriteConfig.TABLE_NAME -> "patient_hudi",
   DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE",
   DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
   DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "source",
   DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "lastUpdated",
   DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true")
   
   //Write the orgStringDf to a Hudi table
   orgStringDf.write
   .format("org.apache.hudi")
   .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
   .options(hudiOptions)
   .mode(SaveMode.Overwrite)
   .save("/work/data/updateTst/hudi/json_schema_tst")
   
   //Read the Hudi table
   val patienthudi = spark.read.format("hudi").load("/work/data/updateTst/hudi/json_schema_tst")
   
   //Printschema
   patienthudi.printSchema
   
   
   //Update: Based on our usecase add a new patient resource, this resource might contain new columns and might not have existing columns (normal use case with FHIR data)
   
   val updatedString = """{"resourceType":"Patient","id":"beca9a29-49bb-40e4-adff-4dbb4d664972","lastUpdated":"2022-02-14T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd2148a","name":[{"use":"official","family":"Keeling57","given":["Serina556"]}]}"""
   
   
   //Convert the new resource string into DF
   val updatedStringDf = spark.read.json(Seq(updatedString).toDS)
   
   //Check the schema of the new resource that is being added
   updatedStringDf.printSchema
   
   
   //Upsert the new resource
   updatedStringDf.write
   .format("org.apache.hudi")
   .options(hudiOptions)
   .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
   .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload")
   .mode(SaveMode.Append)
   .save("/work/data/updateTst/hudi/json_schema_tst")
   
   //Read the Hudi table
   val patienthudiUpdated = spark.read.format("hudi").load("/work/data/updateTst/hudi/json_schema_tst")
   
   //Print the schema after adding the new record
   patienthudiUpdated.printSchema`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xiarixiaoyao commented on issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
xiarixiaoyao commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1161139148

   @santoshsb 
   could you pls share me the nested columns case code. thanks
   i think may be we can solve this problem together with https://issues.apache.org/jira/browse/HUDI-4276


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] kazdy commented on issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
kazdy commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1172376069

   hi @santoshsb, take a look at this nice PR by @xiarixiaoyao:
   https://github.com/apache/hudi/pull/6017


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] santoshsb commented on issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
santoshsb commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1111697151

   Thanks @yihua, here are the detailed spark shell commands we used 
   
   `./spark-shell --jars '/Users/balamats/work/hudi/packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.12.0-SNAPSHOT.jar' --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
   
   import org.apache.spark.sql.SaveMode
   import org.apache.spark.sql.functions._
   import org.apache.hudi.DataSourceWriteOptions
   import org.apache.hudi.DataSourceReadOptions
   import org.apache.hudi.config.HoodieWriteConfig
   import org.apache.hudi.hive.MultiPartKeysValueExtractor
   
   //Define a Patient FHIR resource, for simplicity have deleted most of the elements and retained a few
   val orgString = """{"resourceType":"Patient","id":"4ad86a5c-926e-439b-9352-f8ac9ab780f1","lastUpdated":"2022-03-11T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd21481","gender":"male","birthDate":"1974-01-05","maritalStatus":{"coding":[{"system":"http://terminology.hl7.org/CodeSystem/v3-MaritalStatus","code":"M","display":"M"}],"text":"M"}}"""
   
   //Convert to dataframe
   val orgStringDf = spark.read.json(Seq(orgString).toDS)
   
   //Specify common DataSourceWriteOptions in the single hudiOptions variable
   val hudiOptions = Map[String,String](
     HoodieWriteConfig.TABLE_NAME -> "patient_hudi",
     DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", 
     DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
     DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "source",
     DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "lastUpdated",
    DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true"
   )
   
   //Write the orgStringDf to a Hudi table
   orgStringDf.write
       .format("org.apache.hudi")
       .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
       .options(hudiOptions)
       .mode(SaveMode.Overwrite)
       .save("/Users/balamats/work/data/updateTst/json_schema_tst/hudi")
   
   //Read the Hudi table
   val patienthudi  = spark.read.format("hudi").load("/Users/balamats/work/data/updateTst/json_schema_tst/hudi")
   
   //Printschema
   patienthudi.printSchema
   root
    |-- _hoodie_commit_time: string (nullable = true)
    |-- _hoodie_commit_seqno: string (nullable = true)
    |-- _hoodie_record_key: string (nullable = true)
    |-- _hoodie_partition_path: string (nullable = true)
    |-- _hoodie_file_name: string (nullable = true)
    |-- birthDate: string (nullable = true)
    |-- gender: string (nullable = true)
    |-- id: string (nullable = true)
    |-- lastUpdated: string (nullable = true)
    |-- maritalStatus: struct (nullable = true)
    |    |-- coding: array (nullable = true)
    |    |    |-- element: struct (containsNull = true)
    |    |    |    |-- code: string (nullable = true)
    |    |    |    |-- display: string (nullable = true)
    |    |    |    |-- system: string (nullable = true)
    |    |-- text: string (nullable = true)
    |-- resourceType: string (nullable = true)
    |-- source: string (nullable = true)
   
    //Select fields to verify
    patienthudi.select("id","gender","maritalStatus").show(false)
   +------------------------------------+------+---------------------------------------------------------------------+
   |id                                  |gender|maritalStatus                                                        |
   +------------------------------------+------+---------------------------------------------------------------------+
   |4ad86a5c-926e-439b-9352-f8ac9ab780f1|male  |{[{M, M, http://terminology.hl7.org/CodeSystem/v3-MaritalStatus}], M}|
   +------------------------------------+------+---------------------------------------------------------------------+
   
   //Update: Based on our usecase add a new patient resource, this resource might contain new columns and might not have existing columns (normal use case with FHIR data)
   
   val updatedString =  """{"resourceType":"Patient","id":"596c7a94-bada-4303-85d4-7067c586999e","lastUpdated":"2022-04-20T15:18:18.90836+05:30","source":"4a0701fe-5c3b-482b-895d-875fcbd2148a","gender":"female","birthDate":"2005-08-30","multipleBirthBoolean":true}"""
   
   //Convert the new resource string into DF
   val updatedStringDf = spark.read.json(Seq(updatedString).toDS)
   
   //Check the schema of the new resource that is being added
   updatedStringDf.printSchema
   root
    |-- birthDate: string (nullable = true)
    |-- gender: string (nullable = true)
    |-- id: string (nullable = true)
    |-- lastUpdated: string (nullable = true)
    |-- multipleBirthBoolean: boolean (nullable = true)
    |-- resourceType: string (nullable = true)
    |-- source: string (nullable = true)
   
   
   //Upsert the new resource
   updatedStringDf.write
       .format("org.apache.hudi")
       .options(hudiOptions)
       .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload")
       .mode(SaveMode.Append)
       .save("/Users/balamats/work/data/updateTst/json_schema_tst/hudi")
   
   //Read the Hudi table
   val patienthudi  = spark.read.format("hudi").load("/Users/balamats/work/data/updateTst/json_schema_tst/hudi")
   
   //Print the schema after adding the new record
   patienthudi.printSchema
   root
    |-- _hoodie_commit_time: string (nullable = true)
    |-- _hoodie_commit_seqno: string (nullable = true)
    |-- _hoodie_record_key: string (nullable = true)
    |-- _hoodie_partition_path: string (nullable = true)
    |-- _hoodie_file_name: string (nullable = true)
    |-- birthDate: string (nullable = true)
    |-- gender: string (nullable = true)
    |-- id: string (nullable = true)
    |-- lastUpdated: string (nullable = true)
    |-- multipleBirthBoolean: boolean (nullable = true)
    |-- resourceType: string (nullable = true)
    |-- source: string (nullable = true)
   
    //Select fields to verify
    patienthudi.select("id","gender","maritalStatus").show(false)
   org.apache.spark.sql.AnalysisException: cannot resolve 'maritalStatus' given input columns: [_hoodie_commit_seqno, _hoodie_commit_time, _hoodie_file_name, _hoodie_partition_path, _hoodie_record_key, birthDate, gender, id, lastUpdated, multipleBirthBoolean, resourceType, source];
   'Project [id#130, gender#129, 'maritalStatus]
   +- Relation [_hoodie_commit_time#123,_hoodie_commit_seqno#124,_hoodie_record_key#125,_hoodie_partition_path#126,_hoodie_file_name#127,birthDate#128,gender#129,id#130,lastUpdated#131,multipleBirthBoolean#132,resourceType#133,source#134] parquet
   
     at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:54)
     at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:179)
     at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:175)
     at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$2(TreeNode.scala:535)
     at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
     at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:535)
     at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUpWithPruning$1(QueryPlan.scala:181)
     at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:193)
     at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
     at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:193)
     at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:204)
     at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:209)
     at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
     at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
     at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
     at scala.collection.TraversableLike.map(TraversableLike.scala:286)
     at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
     at scala.collection.AbstractTraversable.map(Traversable.scala:108)
     at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:209)
     at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:214)
     at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:323)
     at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:214)
     at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUpWithPruning(QueryPlan.scala:181)
     at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:161)
     at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:175)
     at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:94)
     at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:263)
     at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:94)
     at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:91)
     at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:182)
     at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:205)
     at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
     at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:202)
     at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:88)
     at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
     at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
     at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196)
     at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:88)
     at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:86)
     at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:78)
     at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:90)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
     at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:88)
     at org.apache.spark.sql.Dataset.withPlan(Dataset.scala:3734)
     at org.apache.spark.sql.Dataset.select(Dataset.scala:1454)
     at org.apache.spark.sql.Dataset.select(Dataset.scala:1471)
     ... 49 elided`
   
   Our expectation after adding the second row was, 
   1. The new column "multipleBirthBoolean" should have been added to the schema and would be null for the previous entry.
   2. The existing "maritalStatus" column present in the destination schema added by the first entry should be present after adding the second entry and should have been null for the second entry. 
   
   We might be missing some config or we feel that when we add a new entry it should contain all the columns present in the destination schema regardless if they are NULL they should be present, If we do need a uber schema we didn't find the spark code to convert our second dataframe "updatedStringDf" to add those columns with NULL values, basically reading the uber schema and merging it into "updatedStringDf" with NULL values. We did try these commands while creating the second dataframe
   
   `val updatedStringDf = spark.read.schema(patientHudi.schema).json(Seq(updatedString).toDS)`
   
   But than the new schema for the updatedStringDf misses the "multipleBirthBoolean" column present in the second entry.
   
   `root
    |-- birthDate: string (nullable = true)
    |-- gender: string (nullable = true)
    |-- id: string (nullable = true)
    |-- lastUpdated: string (nullable = true)
    |-- maritalStatus: struct (nullable = true)
    |    |-- coding: array (nullable = true)
    |    |    |-- element: struct (containsNull = true)
    |    |    |    |-- code: string (nullable = true)
    |    |    |    |-- display: string (nullable = true)
    |    |    |    |-- system: string (nullable = true)
    |    |-- text: string (nullable = true)
    |-- resourceType: string (nullable = true)
    |-- source: string (nullable = true)`
   
   Thanks for the help.
   Santosh
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1229544993

   @santoshsb : is there anything more to be addressed, or can we close out the issue. I see the PR is merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xiarixiaoyao closed issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
xiarixiaoyao closed issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.
URL: https://github.com/apache/hudi/issues/5452


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] yihua commented on issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
yihua commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1111532666

   @santoshsb could you post the Hudi write config used to write the table and the commands to reproduce the problem?  @xiarixiaoyao could you provide some insights around schema evolution?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] santoshsb commented on issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
santoshsb commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1112964120

   We further took out the coding type from our JSON string one after the other the update worked for 2 elements (identifier and maritalstatus), it the coding type in the element "communication" which is breaking. So when we use this JSON string 
   `{
     "resourceType": "Patient",
     "id": "596c7a94-bada-4303-85d4-7067c586999e",
     "lastUpdated": "2022-04-20T15:18:18.90836+05:30",
     "source": "4a0701fe-5c3b-482b-895d-875fcbd2148a",
     "meta": {
       "extension": [
         {
           "url": "http://navify.com/fhir/StructureDefinition/createdBy",
           "valueString": "00u4o6bbvAeNZkXKL296"
         },
         {
           "url": "http://navify.com/fhir/StructureDefinition/modifiedBy",
           "valueString": "00u4o6bbvAeNZkXKL296"
         }
       ],
       "versionId": "9",
       "lastUpdated": "2022-04-20T15:18:18.90836+05:30",
       "source": "4a0701fe-5c3b-482b-895d-875fcbd2148a"
     },
     "text": {
       "status": "generated",
       "div": "<divxmlns=\"http://www.w3.org/1999/xhtml\">Generatedby<ahref=\"https://github.com/synthetichealth/synthea\">Synthea</a>.Versionidentifier:v2.4.0-483-gad18e5f2\n.Personseed:8166940888549305472Populationseed:1648664610882</div>"
     },
     "extension": [
       {
         "url": "http://hl7.org/fhir/us/core/StructureDefinition/us-core-race",
         "extension": [
           {
             "url": "ombCategory",
             "valueCoding": {
               "system": "urn:oid:2.16.840.1.113883.6.238",
               "code": "2106-3",
               "display": "White"
             }
           },
           {
             "url": "text",
             "valueString": "White"
           }
         ]
       },
       {
         "url": "http://hl7.org/fhir/us/core/StructureDefinition/us-core-ethnicity",
         "extension": [
           {
             "url": "ombCategory",
             "valueCoding": {
               "system": "urn:oid:2.16.840.1.113883.6.238",
               "code": "2186-5",
               "display": "NotHispanicorLatino"
             }
           },
           {
             "url": "text",
             "valueString": "NotHispanicorLatino"
           }
         ]
       },
       {
         "url": "http://hl7.org/fhir/StructureDefinition/patient-mothersMaidenName",
         "valueString": "Deloise241Orn563"
       },
       {
         "url": "http://hl7.org/fhir/us/core/StructureDefinition/us-core-birthsex",
         "valueCode": "F"
       },
       {
         "url": "http://hl7.org/fhir/StructureDefinition/patient-birthPlace",
         "valueAddress": {
           "city": "Westwood",
           "state": "Massachusetts",
           "country": "US"
         }
       },
       {
         "url": "http://synthetichealth.github.io/synthea/disability-adjusted-life-years",
         "valueDecimal": 0.03310522209092858
       },
       {
         "url": "http://synthetichealth.github.io/synthea/quality-adjusted-life-years",
         "valueDecimal": 15.966894777909072
       }
     ],
     "name": [
       {
         "use": "official",
         "family": "Haley279",
         "given": [
           "Vinita997"
         ]
       }
     ],
     "telecom": [
       {
         "system": "phone",
         "value": "555-213-3658",
         "use": "home"
       }
     ],
     "gender": "female",
     "birthDate": "2005-08-30",
     "address": [
       {
         "extension": [
           {
             "url": "http://hl7.org/fhir/StructureDefinition/geolocation",
             "extension": [
               {
                 "url": "latitude",
                 "valueDecimal": 42.09490663875005
               },
               {
                 "url": "longitude",
                 "valueDecimal": -70.82289517957093
               }
             ]
           }
         ],
         "line": [
           "691RunolfsdottirParadeApt5"
         ],
         "city": "Hanson",
         "state": "Massachusetts",
         "country": "US"
       }
     ],
     "multipleBirthBoolean": false,
     "communication": [
       {
         "language": {
           "coding": [
             {
               "system": "urn:ietf:bcp:47",
               "code": "en-US",
               "display": "English"
             }
           ],
           "text": "English"
         }
       }
     ],
     "managingOrganization": {
       "reference": "7b3f7052-123b-46b7-a8b6-a0e87daaea03",
       "type": "Organization"
     }
   }`
   
   Insert the single record and than update the same record we see the error.
   
   Thanks,
   Santosh


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] santoshsb commented on issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
santoshsb commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1115996169

   @xiarixiaoyao FYI, we just tested this issue by building the release branch 0.11.0, here is the JSON string and the schema.
   `{
     "resourceType": "Patient",
     "id": "596c7a94-bada-4303-85d4-7067c586999e",
     "lastUpdated": "2022-04-20T15:18:18.90836+05:30",
     "source": "4a0701fe-5c3b-482b-895d-875fcbd2148a",
     "name": [
       {
         "use": "official",
         "family": "Haley279",
         "given": [
           "Vinita997"
         ]
       }
     ],
     "gender": "female",
     "birthDate": "2005-08-30",
     "multipleBirthBoolean": false,
     "communication": [
       {
         "language": {
           "coding": [
             {
               "system": "urn:ietf:bcp:47",
               "code": "en-US",
               "display": "English"
             }
           ],
           "text": "English"
         }
       }
     ]
   }`
   
   SCHEMA
   
   scala> hudiPatientDF.printSchema
   root
    |-- _hoodie_commit_time: string (nullable = true)
    |-- _hoodie_commit_seqno: string (nullable = true)
    |-- _hoodie_record_key: string (nullable = true)
    |-- _hoodie_partition_path: string (nullable = true)
    |-- _hoodie_file_name: string (nullable = true)
    |-- birthDate: string (nullable = true)
    |-- communication: array (nullable = true)
    |    |-- element: struct (containsNull = true)
    |    |    |-- language: struct (nullable = true)
    |    |    |    |-- coding: array (nullable = true)
    |    |    |    |    |-- element: struct (containsNull = true)
    |    |    |    |    |    |-- code: string (nullable = true)
    |    |    |    |    |    |-- display: string (nullable = true)
    |    |    |    |    |    |-- system: string (nullable = true)
    |    |    |    |-- text: string (nullable = true)
    |-- gender: string (nullable = true)
    |-- id: string (nullable = true)
    |-- lastUpdated: string (nullable = true)
    |-- multipleBirthBoolean: boolean (nullable = true)
    |-- name: array (nullable = true)
    |    |-- element: struct (containsNull = true)
    |    |    |-- family: string (nullable = true)
    |    |    |-- given: array (nullable = true)
    |    |    |    |-- element: string (containsNull = true)
    |    |    |-- use: string (nullable = true)
    |-- resourceType: string (nullable = true)
    |-- source: string (nullable = true)
   
   We inserted the above JSON string and Updated the same, and it threw the following error which is same as above.
   
   `Caused by: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file
     at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
     at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
     at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:105)
     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
     ... 4 more
   Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema mismatch: Avro field 'coding' not found
     at org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:221)
     at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:126)
     at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:284)
     at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:228)
     at org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:74)
     at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:539)
     at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:489)
     at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:293)
     at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:137)
     at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:91)
     at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
     at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:142)
     at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:185)
     at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
     at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)`
   
   Thanks,
   Santosh


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] santoshsb commented on issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
santoshsb commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1117242801

   @xiarixiaoyao @yihua we currently don't see this issue when we use the following configuration option --conf 'spark.hadoop.parquet.avro.write-old-list-structure=false'. Should be good to close this, though we are facing another issue with nested columns, will raise a different ticket for that ?
   
   Thanks,
   Santosh


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] kazdy commented on issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
kazdy commented on issue #5452:
URL: https://github.com/apache/hudi/issues/5452#issuecomment-1159291914

   @santoshsb will something like this help with your use case:
   #5873 
   https://issues.apache.org/jira/browse/HUDI-4276
   ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope closed issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.

Posted by GitBox <gi...@apache.org>.
codope closed issue #5452: Schema Evolution: Missing column for previous records when new entry does not have the same while upsert.
URL: https://github.com/apache/hudi/issues/5452


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org