You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yu...@apache.org on 2022/10/06 13:08:00 UTC

[hudi] 04/07: [HUDI-4344] fix usage of HoodieDataBlock#getRecordIterator (#6005)

This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-feature-rfc46
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 60edf301d7cec9f928c743f4e499389c4525e9aa
Author: komao <ma...@gmail.com>
AuthorDate: Thu Jun 30 04:29:33 2022 +0800

    [HUDI-4344] fix usage of HoodieDataBlock#getRecordIterator (#6005)
    
    Co-authored-by: wangzixuan.wzxuan <wa...@bytedance.com>
---
 .../sql/hudi/command/procedures/ExportInstantsProcedure.scala |  9 ++++++---
 .../procedures/ShowHoodieLogFileMetadataProcedure.scala       |  8 ++++++--
 .../procedures/ShowHoodieLogFileRecordsProcedure.scala        | 11 +++++++----
 3 files changed, 19 insertions(+), 9 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
index 114f4c4ee1..ad21c11e9b 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
@@ -17,14 +17,14 @@
 
 package org.apache.spark.sql.hudi.command.procedures
 
-import org.apache.avro.generic.GenericRecord
+import org.apache.avro.generic.{GenericRecord, IndexedRecord}
 import org.apache.avro.specific.SpecificData
 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
 import org.apache.hudi.HoodieCLIUtils
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.avro.model.HoodieArchivedMetaEntry
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.HoodieLogFile
+import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieLogFile, HoodieRecord}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.log.HoodieLogFormat
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock
@@ -124,7 +124,10 @@ class ExportInstantsProcedure extends BaseProcedure with ProcedureBuilder with L
       }) {
         val blk = reader.next.asInstanceOf[HoodieAvroDataBlock]
         try {
-          val recordItr = blk.getRecordIterator
+          val mapper = new HoodieRecord.Mapper() {
+            override def apply(data: IndexedRecord) = new HoodieAvroIndexedRecord(data)
+          }
+          val recordItr = blk.getRecordIterator(mapper)
           try while ( {
             recordItr.hasNext
           }) {
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
index 3a26823ded..415c642d95 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
@@ -19,8 +19,9 @@ package org.apache.spark.sql.hudi.command.procedures
 
 import com.fasterxml.jackson.databind.ObjectMapper
 import org.apache.hadoop.fs.Path
+import org.apache.avro.generic.IndexedRecord
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.HoodieLogFile
+import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieLogFile, HoodieRecord}
 import org.apache.hudi.common.table.log.HoodieLogFormat
 import org.apache.hudi.common.table.log.block.HoodieLogBlock.{HeaderMetadataType, HoodieLogBlockType}
 import org.apache.hudi.common.table.log.block.{HoodieCorruptBlock, HoodieDataBlock}
@@ -93,7 +94,10 @@ class ShowHoodieLogFileMetadataProcedure extends BaseProcedure with ProcedureBui
             }
             block match {
               case dataBlock: HoodieDataBlock =>
-                val recordItr = dataBlock.getRecordIterator
+                val mapper = new HoodieRecord.Mapper() {
+                  override def apply(data: IndexedRecord) = new HoodieAvroIndexedRecord(data)
+                }
+                val recordItr = dataBlock.getRecordIterator(mapper)
                 recordItr.asScala.foreach(_ => recordCount.incrementAndGet())
                 recordItr.close()
             }
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
index 2806138a89..ecee96bc46 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
@@ -21,7 +21,7 @@ import org.apache.avro.generic.IndexedRecord
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.common.config.HoodieCommonConfig
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.HoodieLogFile
+import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieLogFile, HoodieRecord, HoodieRecordPayload}
 import org.apache.hudi.common.table.log.block.HoodieDataBlock
 import org.apache.hudi.common.table.log.{HoodieLogFormat, HoodieMergedLogRecordScanner}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
@@ -78,7 +78,7 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure with ProcedureBuil
         .withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue)
         .build
       scanner.asScala.foreach(hoodieRecord => {
-        val record = hoodieRecord.getData.getInsertValue(schema).get()
+        val record = hoodieRecord.getData.asInstanceOf[HoodieRecordPayload[_]].getInsertValue(schema).get()
         if (allRecords.size() < limit) {
           allRecords.add(record)
         }
@@ -92,10 +92,13 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure with ProcedureBuil
             val block = reader.next()
             block match {
               case dataBlock: HoodieDataBlock =>
-                val recordItr = dataBlock.getRecordIterator
+                val mapper = new HoodieRecord.Mapper() {
+                  override def apply(data: IndexedRecord) = new HoodieAvroIndexedRecord(data)
+                }
+                val recordItr = dataBlock.getRecordIterator(mapper)
                 recordItr.asScala.foreach(record => {
                   if (allRecords.size() < limit) {
-                    allRecords.add(record)
+                    allRecords.add(record.getData.asInstanceOf[IndexedRecord])
                   }
                 })
                 recordItr.close()