You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/10/13 05:48:11 UTC

[GitHub] [hudi] PhantomHunt opened a new issue, #6936: [SUPPORT] NPE when trying to upsert with option hoodie.metadata.index.column.stats.enable : true.

PhantomHunt opened a new issue, #6936:
URL: https://github.com/apache/hudi/issues/6936

   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? - Yes
   
   - Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.
   
   - If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   
   We are upserting records in non-partioned table using following options primarily to test multi-modal indexing where we are using Apache Hudi latest version - 0.12.0 via AWS Glue.
   The job is working fine when records are inserted for the first time but from second iteration onwards  with same records, we are getting an error - "An error occurred while calling o130.save. java.lang.NullPointerException"
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   Upsert data in a non partitioned Hudi table with following options -
   
   hudi_write_options_no_partition = {
       "hoodie.table.name": noPartitionHudiTableName,
       "hoodie.datasource.write.recordkey.field": "VODSRID",
       'hoodie.datasource.write.table.name': noPartitionHudiTableName,
       'hoodie.datasource.write.precombine.field': 'LastUpdatedOn',
       'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
       'hoodie.metadata.enable':'true',
       'hoodie.metadata.index.bloom.filter.enable' : 'true',
       'hoodie.metadata.index.column.stats.enable' : 'true'
   }
   (
     df_DB_dept.write.format("org.apache.hudi")
     .option('hoodie.datasource.write.operation', 'upsert')
     .options(**hudi_write_options_no_partition)
     .mode("append")
     .save(table_path)
   )
   
   **Expected behavior**
   
   We are only able to insert fresh records. Getting error from second iteration with same set of records. We want successful upsert to happen from second iteration onwards which is not happening currently. 
   
   **Environment Description**
   
   * Hudi version : 0.12.0
   
   * AWS Glue version :  Glue 3.0
   
   * Spark version :  3.1
   
   * Python version :  3
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : No
   
   **Additional context**
   
   When we remove the option - 'hoodie.metadata.index.column.stats.enable' : 'true', upserts work properly for all iterations.
   
   **Stacktrace**
   
   2022-10-12 11:14:05,442 ERROR [main] glueexceptionanalysis.GlueExceptionAnalysisListener (Logging.scala:logError(9)): [Glue Exception Analysis] {"Event":"GlueETLJobExceptionEvent","Timestamp":1665573245439,"Failure Reason":"Traceback (most recent call last):\n  File \"/tmp/Indexing_test_5billion.py\", line 106, in <module>\n    .save(table_path)\n  File \"/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py\", line 1109, in save\n    self._jwrite.save(path)\n  File \"/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n    answer, self.gateway_client, self.target_id, self.name)\n  File \"/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py\", line 111, in deco\n    return f(*a, **kw)\n  File \"/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 328, in get_return_value\n    format(target_id, \".\", name), value)\npy4j.protocol.Py4JJavaError: An error occurred while calling o130.save.\n: org.ap
 ache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 23.0 failed 4 times, most recent failure: Lost task 0.3 in stage 23.0 (TID 654) (10.218.20.37 executor 1): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0\n\tat org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)\n\tat org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)\n\tat org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)\n\tat org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(M
 apPartitionsRDD.scala:52)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:337)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)\n\tat org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)\n\tat org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)\n\tat org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)\n\tat org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)\n\tat org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)\n\tat org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:335)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD
 .scala:373)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:337)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:131)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)\n\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:750)\nCaused by: org.apache.hudi.exception.HoodieAppendException: Failed while appending records to s3 PATH
   org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:410)\n\tat org.apache.hudi.io.HoodieAppendHandle.doAppend(HoodieAppendHandle.java:382)\n\tat org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:84)\n\tat org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)\n\t... 28 more\nCaused by: java.lang.NullPointerException\n\tat org.apache.hudi.avro.HoodieAvroUtils.convertValueForAvroLogicalTypes(HoodieAvroUtils.java:646)\n\tat org.apache.hudi.avro.HoodieAvroUtils.convertValueForSpecificDataTypes(HoodieAvroUtils.java:620)\n\tat org.apache.hudi.metadata.HoodieTableMetadataUtil.lambda$null$1(HoodieTableMetadataUtil.java:147)\n\tat java.util.ArrayList.forEach(ArrayList.java:1259)\n\tat org.apache.hudi.metadata.HoodieTableMetadataUtil.lambda$collectColumnRangeMetadata$2(HoodieTableMetadataUtil.java:142)\n\t
 at java.util.ArrayList.forEach(ArrayList.java:1259)\n\tat org.apache.hudi.metadata.HoodieTableMetadataUtil.collectColumnRangeMetadata(HoodieTableMetadataUtil.java:139)\n\tat org.apache.hudi.io.HoodieAppendHandle.processAppendResult(HoodieAppendHandle.java:363)\n\tat org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:405)\n\t... 31 more\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2465)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2414)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2413)\n\tat scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)\n\tat scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala
 :2413)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)\n\tat scala.Option.foreach(Option.scala:257)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2679)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2621)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2610)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2278
 )\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)\n\tat org.apache.spark.rdd.RDD.count(RDD.scala:1253)\n\tat org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:696)\n\tat org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:338)\n\tat org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:183)\n\tat org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)\n\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)\n\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)\n\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)\n\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)\n\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)\n\tat or
 g.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)\n\tat org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)\n\tat org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)\n\tat org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)\n\tat org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)\n\tat org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)\n\tat org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)\n\tat org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)\n\tat org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)\n\tat 
 org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)\n\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)\n\tat org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)\n\tat org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)\n\tat org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)\n\tat org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(N
 ativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:238)\n\tat java.lang.Thread.run(Thread.java:750)\nCaused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0\n\tat org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)\n\tat org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244
 )\n\tat org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)\n\tat org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:337)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)\n\tat org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)\n\tat org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)\n\tat org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManage
 r.scala:1350)\n\tat org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)\n\tat org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)\n\tat org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:335)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:337)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:131)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)\n\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor
 $Worker.run(ThreadPoolExecutor.java:624)\n\t... 1 more\nCaused by: org.apache.hudi.exception.HoodieAppendException: Failed while appending records to S3 PATH org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:410)\n\tat org.apache.hudi.io.HoodieAppendHandle.doAppend(HoodieAppendHandle.java:382)\n\tat org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:84)\n\tat org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)\n\t... 28 more\nCaused by: java.lang.NullPointerException\n\tat org.apache.hudi.avro.HoodieAvroUtils.convertValueForAvroLogicalTypes(HoodieAvroUtils.java:646)\n\tat org.apache.hudi.avro.HoodieAvroUtils.convertValueForSpecificDataTypes(HoodieAvroUtils.java:620)\n\tat org.apache.hudi.metadata.HoodieTableMetadataUtil.lambda$null$1(HoodieTableMetadataUtil.java:147)\n\tat java.util.ArrayList.f
 orEach(ArrayList.java:1259)\n\tat org.apache.hudi.metadata.HoodieTableMetadataUtil.lambda$collectColumnRangeMetadata$2(HoodieTableMetadataUtil.java:142)\n\tat java.util.ArrayList.forEach(ArrayList.java:1259)\n\tat org.apache.hudi.metadata.HoodieTableMetadataUtil.collectColumnRangeMetadata(HoodieTableMetadataUtil.java:139)\n\tat org.apache.hudi.io.HoodieAppendHandle.processAppendResult(HoodieAppendHandle.java:363)\n\tat org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:405)\n\t... 31 more\n","Stack Trace":[{
       "Declaring Class": "get_return_value",
       "Method Name": "format(target_id, \".\", name), value)",
       "File Name": "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py",
       "Line Number": 328
   },{
       "Declaring Class": "deco",
       "Method Name": "return f(*a, **kw)",
       "File Name": "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
       "Line Number": 111
   },{
       "Declaring Class": "__call__",
       "Method Name": "answer, self.gateway_client, self.target_id, self.name)",
       "File Name": "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
       "Line Number": 1305
   },{
       "Declaring Class": "save",
       "Method Name": "self._jwrite.save(path)",
       "File Name": "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py",
       "Line Number": 1109
   },{
       "Declaring Class": "<module>",
       "Method Name": ".save(table_path)",
       "File Name": "/tmp/Indexing_test_5billion.py",
       "Line Number": 106
   }],"Last Executed Line number":106,"script":"Indexing_test_5billion.py"}
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on issue #6936: [SUPPORT] NPE when trying to upsert with option hoodie.metadata.index.column.stats.enable : true.

Posted by GitBox <gi...@apache.org>.
alexeykudinkin commented on issue #6936:
URL: https://github.com/apache/hudi/issues/6936#issuecomment-1332910935

   Closing this. Please follow HUDI-5291.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] yihua commented on issue #6936: [SUPPORT] NPE when trying to upsert with option hoodie.metadata.index.column.stats.enable : true.

Posted by GitBox <gi...@apache.org>.
yihua commented on issue #6936:
URL: https://github.com/apache/hudi/issues/6936#issuecomment-1278600500

   @PhantomHunt Thanks for reporting the issue.  If possible, could you share the schema and sample data that can reproduce this issue?  The issue is likely related to a specific data type.  You may strip out any sensitive columns and data that are irrelevant to the issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin closed issue #6936: [SUPPORT] NPE when trying to upsert with option hoodie.metadata.index.column.stats.enable : true.

Posted by GitBox <gi...@apache.org>.
alexeykudinkin closed issue #6936: [SUPPORT] NPE when trying to upsert with option hoodie.metadata.index.column.stats.enable : true.
URL: https://github.com/apache/hudi/issues/6936


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] PhantomHunt commented on issue #6936: [SUPPORT] NPE when trying to upsert with option hoodie.metadata.index.column.stats.enable : true.

Posted by GitBox <gi...@apache.org>.
PhantomHunt commented on issue #6936:
URL: https://github.com/apache/hudi/issues/6936#issuecomment-1332212753

   Ok Thanks Team Hudi! Will try the solution and let you know.
   
   On Tue, Nov 29, 2022 at 12:45 PM Sagar Sumit ***@***.***>
   wrote:
   
   > I think the issue is due to timestamp type field with null value. The
   > reason it is not reproducible during first insert is that the records go
   > through HoodieCreateHandle which does not merge column stats in the first
   > insert. Upon subsequent upsert, records go through HoodieAppendHandle
   > which attempts to merge column stats but then fails for timestamp type if
   > value is null. See below script to repro:
   >
   > >>> from pyspark.sql.types import StructType,StructField, TimestampType, StringType
   > >>> schema = StructType([
   > ...   StructField('TimePeriod', StringType(), True),
   > ...   StructField('StartTimeStamp', TimestampType(), True),
   > ...   StructField('EndTimeStamp', TimestampType(), True)
   > ... ])
   > >>> import time
   > >>> import datetime
   > >>> timestamp = datetime.datetime.strptime('16:00:00:00',"%H:%M:%S:%f")
   > >>> timestamp2 = datetime.datetime.strptime('18:59:59:59',"%H:%M:%S:%f")
   > >>> columns = ['TimePeriod', 'StartTimeStamp', 'EndTimeStamp']
   > >>> data = [("16:00:00:00 -> 18:59:59:59", timestamp, timestamp2 )]
   > >>> df2 = spark.createDataFrame(data,schema)
   > >>> df2.printSchema()
   > root
   >  |-- TimePeriod: string (nullable = true)
   >  |-- StartTimeStamp: timestamp (nullable = true)
   >  |-- EndTimeStamp: timestamp (nullable = true)
   >
   > >>> hudi_write_options_no_partition = {
   > ... "hoodie.table.name": tableName,
   > ... "hoodie.datasource.write.recordkey.field": "TimePeriod",
   > ... 'hoodie.datasource.write.table.name': tableName,
   > ... 'hoodie.datasource.write.precombine.field': 'EndTimeStamp',
   > ... 'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
   > ... 'hoodie.metadata.enable':'true',
   > ... 'hoodie.metadata.index.bloom.filter.enable' : 'true',
   > ... 'hoodie.metadata.index.column.stats.enable' : 'true'
   > ... }
   > >>> df2.write.format("org.apache.hudi").options(**hudi_write_options_no_partition).mode("overwrite").save(basePath)
   > 22/11/29 07:00:33 WARN config.DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
   > 22/11/29 07:00:33 WARN config.DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
   > 22/11/29 07:00:34 WARN metadata.HoodieBackedTableMetadata: Metadata table was not found at path file:/tmp/hudi_trips_cow/.hoodie/metadata
   > [Stage 7:>                                                          (0 + 1) / 1]
   >
   > // update data with non-null timestamp will succeed
   > >>> data = [("16:00:00:00 -> 18:59:59:59", timestamp, datetime.datetime.strptime('19:59:59:59',"%H:%M:%S:%f"))]
   > >>> updateDF = spark.createDataFrame(data,schema)
   > >>> updateDF.write.format("org.apache.hudi").options(**hudi_write_options_no_partition).mode("append").save(basePath)
   >
   > // update data with null timestamp will throw exception
   > >>> data = [("16:00:00:00 -> 18:59:59:59", timestamp, None)]
   > >>> updateDF = spark.createDataFrame(data,schema)
   > >>> updateDF.write.format("org.apache.hudi").options(**hudi_write_options_no_partition).mode("append").save(basePath)
   >
   > I would suggest to clean the data if possible, replace nulls in the
   > dataframe by oldest unix timestamp or some default value that is suitable
   > to usecase. Ideally, this should be handled in
   > HoodieTableMetadataUtil#collectColumnRangeMetadata.
   >
   > —
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/hudi/issues/6936#issuecomment-1330186251>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AITJTTIUBF3DPAT5WETW65DWKWUPHANCNFSM6AAAAAARD5FFWQ>
   > .
   > You are receiving this because you were mentioned.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on issue #6936: [SUPPORT] NPE when trying to upsert with option hoodie.metadata.index.column.stats.enable : true.

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #6936:
URL: https://github.com/apache/hudi/issues/6936#issuecomment-1283158854

   @PhantomHunt : gentle ping.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] PhantomHunt commented on issue #6936: [SUPPORT] NPE when trying to upsert with option hoodie.metadata.index.column.stats.enable : true.

Posted by GitBox <gi...@apache.org>.
PhantomHunt commented on issue #6936:
URL: https://github.com/apache/hudi/issues/6936#issuecomment-1286006358

   Hi,
   
   Apologies for delayed response.
   
   Since the table contains my organization's sensitive information, I can
   only share the schema of the table.
   
   Postgresql create table script:
   
   CREATE TABLE IF NOT EXISTS myschema."VODProgramme"
   (
       "VODSRID" bigint,
       "ProgramVODSRID" bigint,
       "SeasonVODSRID" bigint,
       "ProgrammeId" character varying(400) COLLATE pg_catalog."default",
       "ProgrammeName" character varying(400) COLLATE pg_catalog."default",
       "ProgrammeLanguage" character varying(400) COLLATE pg_catalog."default",
       "ProgrammeGenre" character varying(400) COLLATE pg_catalog."default",
       "Synopsis" character varying(4000) COLLATE pg_catalog."default",
       "OperatorId" integer,
       "ProgramImage" character varying(1000) COLLATE pg_catalog."default",
       "ProgrammeUrl" character varying(1000) COLLATE pg_catalog."default",
       "Director" character varying(600) COLLATE pg_catalog."default",
       "ContentType" character varying(100) COLLATE pg_catalog."default",
       "ContentTypeID" smallint,
       "Cast" text COLLATE pg_catalog."default",
       "ProgramReleaseDate" date,
       "ChannelName" character varying(100) COLLATE pg_catalog."default",
       "SourceSeasonid" character varying(500) COLLATE pg_catalog."default",
       "SeasonName" character varying(400) COLLATE pg_catalog."default",
       "SeasonImage" character varying(1000) COLLATE pg_catalog."default",
       "EpisodeCount" integer,
       "SeasonCount" integer,
       "SourceEpisodeid" character varying(500) COLLATE pg_catalog."default",
       "EpisodeNumber" double precision,
       "EpisodeTitle" character varying(400) COLLATE pg_catalog."default",
       "EpisodeDescription" character varying(4000) COLLATE
   pg_catalog."default",
       "EpisodeReleasedate" date,
       "EpisodeImage" character varying(1000) COLLATE pg_catalog."default",
       "EpisodeVideoUrl" character varying(1000) COLLATE pg_catalog."default",
       "SocialType" character varying(50) COLLATE pg_catalog."default",
       "SocialCount" character varying(50) COLLATE pg_catalog."default",
       "ParentalRating" character varying(50) COLLATE pg_catalog."default",
       "SeasonNumber" integer,
       "Duration" character varying(50) COLLATE pg_catalog."default",
       "EpisodeGenre" character varying(400) COLLATE pg_catalog."default",
       "ProductionYear" character varying(50) COLLATE pg_catalog."default",
       "CommercialModel" character varying(50) COLLATE pg_catalog."default",
       "Commercials" character varying(50) COLLATE pg_catalog."default",
       "StartDate" date,
       "EndDate" date,
       "LastUpdatedOn" timestamp without time zone,
       "CreatedDate" timestamp without time zone,
       "CMSProgrammeID" bigint,
       "CatalogStatusID" smallint,
       "CMSLastUpdatedOn" timestamp without time zone,
       "VCMSLastUpdatedOn" timestamp without time zone,
       "ProgramAndroidDeeplink" character varying(1000) COLLATE
   pg_catalog."default",
       "SeasonAndroidDeeplink" character varying(1000) COLLATE
   pg_catalog."default",
       "EpisodeAndroidDeeplink" character varying(1000) COLLATE
   pg_catalog."default",
       "UDISNoofEpisodes" integer,
       "ApprovedStatus" smallint,
       "EpisodeiosDeeplink" character varying(1000) COLLATE
   pg_catalog."default",
       "SeasoniosDeeplink" character varying(1000) COLLATE
   pg_catalog."default",
       "ProgramiosDeeplink" character varying(1000) COLLATE
   pg_catalog."default"
   )
   
   We have created the table via Athena. PFB the DDL of the same:
   
   CREATE EXTERNAL TABLE `vodprogramme_nonpartition`(
     `vodsrid` bigint,
     `programvodsrid` bigint,
     `seasonvodsrid` bigint,
     `programmeid` string,
     `programmename` string,
     `programmelanguage` string,
     `programmegenre` string,
     `synopsis` string,
     `operatorid` int,
     `programimage` string,
     `programmeurl` string,
     `director` string,
     `contenttype` string,
     `contenttypeid` int,
     `cast` string,
     `programreleasedate` date,
     `channelname` string,
     `sourceseasonid` string,
     `seasonname` string,
     `seasonimage` string,
     `episodecount` int,
     `seasoncount` int,
     `sourceepisodeid` string,
     `episodenumber` double,
     `episodetitle` string,
     `episodedescription` string,
     `episodereleasedate` date,
     `episodeimage` string,
     `episodevideourl` string,
     `socialtype` string,
     `socialcount` string,
     `parentalrating` string,
     `seasonnumber` int,
     `duration` string,
     `episodegenre` string,
     `productionyear` string,
     `commercialmodel` string,
     `commercials` string,
     `startdate` date,
     `enddate` date,
     `lastupdatedon` timestamp,
     `createddate` timestamp,
     `cmsprogrammeid` bigint,
     `catalogstatusid` int,
     `cmslastupdatedon` timestamp,
     `vcmslastupdatedon` timestamp,
     `programandroiddeeplink` string,
     `seasonandroiddeeplink` string,
     `episodeandroiddeeplink` string,
     `udisnoofepisodes` int,
     `approvedstatus` int,
     `episodeiosdeeplink` string,
     `seasoniosdeeplink` string,
     `programiosdeeplink` string)
   ROW FORMAT SERDE
     'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
   STORED AS INPUTFORMAT
     'org.apache.hudi.hadoop.HoodieParquetInputFormat'
   OUTPUTFORMAT
     'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
   LOCATION
     'S3_PATH'
   
   
   Note: There are around 2 million records in this table. For fresh
   insertion, we are not getting any errors but when we are upserting the
   same table again, we are getting this error. This happens for
   partitioned tables -> PARTITIONED BY (`operatorid` int)) as well for
   same dataset.
   
   *Warm regards*
   
   *B M Vinjit *
   
   Software Developer
   
   B.Sc(Hons.) CS (DU) | MCA (NIT Kurukshetra )
   
   Phone: +91 8527467123
   
   *LinkedIn <https://www.linkedin.com/in/vinjit/> *
   
   
   On Fri, Oct 14, 2022 at 12:59 PM Y Ethan Guo ***@***.***>
   wrote:
   
   > @PhantomHunt <https://github.com/PhantomHunt> Thanks for reporting the
   > issue. If possible, could you share the schema and sample data that can
   > reproduce this issue? The issue is likely related to a specific data type.
   > You may strip out any sensitive columns and data that are irrelevant to the
   > issue.
   >
   > —
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/hudi/issues/6936#issuecomment-1278600500>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AITJTTPDMT4CHR5DR6OSGXLWDEDT7ANCNFSM6AAAAAARD5FFWQ>
   > .
   > You are receiving this because you were mentioned.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on issue #6936: [SUPPORT] NPE when trying to upsert with option hoodie.metadata.index.column.stats.enable : true.

Posted by GitBox <gi...@apache.org>.
codope commented on issue #6936:
URL: https://github.com/apache/hudi/issues/6936#issuecomment-1330186251

   I think the issue is due to timestamp type field with null value. The reason it is not reproducible during first insert is that the records go through `HoodieCreateHandle` which does not merge column stats in the first insert. Upon subsequent upsert, records go through `HoodieAppendHandle` which attempts to merge column stats but then fails for timestamp type if value is null. See below script to repro:
   ```
   >>> from pyspark.sql.types import StructType,StructField, TimestampType, StringType
   >>> schema = StructType([
   ...   StructField('TimePeriod', StringType(), True),
   ...   StructField('StartTimeStamp', TimestampType(), True),
   ...   StructField('EndTimeStamp', TimestampType(), True)
   ... ])
   >>> import time
   >>> import datetime
   >>> timestamp = datetime.datetime.strptime('16:00:00:00',"%H:%M:%S:%f")
   >>> timestamp2 = datetime.datetime.strptime('18:59:59:59',"%H:%M:%S:%f")
   >>> columns = ['TimePeriod', 'StartTimeStamp', 'EndTimeStamp']
   >>> data = [("16:00:00:00 -> 18:59:59:59", timestamp, timestamp2 )]
   >>> df2 = spark.createDataFrame(data,schema)
   >>> df2.printSchema()
   root
    |-- TimePeriod: string (nullable = true)
    |-- StartTimeStamp: timestamp (nullable = true)
    |-- EndTimeStamp: timestamp (nullable = true)
   
   >>> hudi_write_options_no_partition = {
   ... "hoodie.table.name": tableName,
   ... "hoodie.datasource.write.recordkey.field": "TimePeriod",
   ... 'hoodie.datasource.write.table.name': tableName,
   ... 'hoodie.datasource.write.precombine.field': 'EndTimeStamp',
   ... 'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
   ... 'hoodie.metadata.enable':'true',
   ... 'hoodie.metadata.index.bloom.filter.enable' : 'true',
   ... 'hoodie.metadata.index.column.stats.enable' : 'true'
   ... }
   >>> df2.write.format("org.apache.hudi").options(**hudi_write_options_no_partition).mode("overwrite").save(basePath)
   22/11/29 07:00:33 WARN config.DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
   22/11/29 07:00:33 WARN config.DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
   22/11/29 07:00:34 WARN metadata.HoodieBackedTableMetadata: Metadata table was not found at path file:/tmp/hudi_trips_cow/.hoodie/metadata
   [Stage 7:>                                                          (0 + 1) / 1]
   
   // update data with non-null timestamp will succeed
   >>> data = [("16:00:00:00 -> 18:59:59:59", timestamp, datetime.datetime.strptime('19:59:59:59',"%H:%M:%S:%f"))]
   >>> updateDF = spark.createDataFrame(data,schema)
   >>> updateDF.write.format("org.apache.hudi").options(**hudi_write_options_no_partition).mode("append").save(basePath)
   
   // update data with null timestamp will throw exception
   >>> data = [("16:00:00:00 -> 18:59:59:59", timestamp, None)]
   >>> updateDF = spark.createDataFrame(data,schema)
   >>> updateDF.write.format("org.apache.hudi").options(**hudi_write_options_no_partition).mode("append").save(basePath)
   ```
   
   I would suggest to clean the data if possible, replace nulls in the dataframe by oldest unix timestamp or some default value that is suitable to usecase. Ideally, this should be handled in `HoodieTableMetadataUtil#collectColumnRangeMetadata`. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org