You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yu...@apache.org on 2022/10/06 13:08:00 UTC
[hudi] 04/07: [HUDI-4344] fix usage of HoodieDataBlock#getRecordIterator (#6005)
This is an automated email from the ASF dual-hosted git repository.
yuzhaojing pushed a commit to branch release-feature-rfc46
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 60edf301d7cec9f928c743f4e499389c4525e9aa
Author: komao <ma...@gmail.com>
AuthorDate: Thu Jun 30 04:29:33 2022 +0800
[HUDI-4344] fix usage of HoodieDataBlock#getRecordIterator (#6005)
Co-authored-by: wangzixuan.wzxuan <wa...@bytedance.com>
---
.../sql/hudi/command/procedures/ExportInstantsProcedure.scala | 9 ++++++---
.../procedures/ShowHoodieLogFileMetadataProcedure.scala | 8 ++++++--
.../procedures/ShowHoodieLogFileRecordsProcedure.scala | 11 +++++++----
3 files changed, 19 insertions(+), 9 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
index 114f4c4ee1..ad21c11e9b 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
@@ -17,14 +17,14 @@
package org.apache.spark.sql.hudi.command.procedures
-import org.apache.avro.generic.GenericRecord
+import org.apache.avro.generic.{GenericRecord, IndexedRecord}
import org.apache.avro.specific.SpecificData
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hudi.HoodieCLIUtils
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry
import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.HoodieLogFile
+import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieLogFile, HoodieRecord}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.log.HoodieLogFormat
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock
@@ -124,7 +124,10 @@ class ExportInstantsProcedure extends BaseProcedure with ProcedureBuilder with L
}) {
val blk = reader.next.asInstanceOf[HoodieAvroDataBlock]
try {
- val recordItr = blk.getRecordIterator
+ val mapper = new HoodieRecord.Mapper() {
+ override def apply(data: IndexedRecord) = new HoodieAvroIndexedRecord(data)
+ }
+ val recordItr = blk.getRecordIterator(mapper)
try while ( {
recordItr.hasNext
}) {
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
index 3a26823ded..415c642d95 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
@@ -19,8 +19,9 @@ package org.apache.spark.sql.hudi.command.procedures
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.hadoop.fs.Path
+import org.apache.avro.generic.IndexedRecord
import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.HoodieLogFile
+import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieLogFile, HoodieRecord}
import org.apache.hudi.common.table.log.HoodieLogFormat
import org.apache.hudi.common.table.log.block.HoodieLogBlock.{HeaderMetadataType, HoodieLogBlockType}
import org.apache.hudi.common.table.log.block.{HoodieCorruptBlock, HoodieDataBlock}
@@ -93,7 +94,10 @@ class ShowHoodieLogFileMetadataProcedure extends BaseProcedure with ProcedureBui
}
block match {
case dataBlock: HoodieDataBlock =>
- val recordItr = dataBlock.getRecordIterator
+ val mapper = new HoodieRecord.Mapper() {
+ override def apply(data: IndexedRecord) = new HoodieAvroIndexedRecord(data)
+ }
+ val recordItr = dataBlock.getRecordIterator(mapper)
recordItr.asScala.foreach(_ => recordCount.incrementAndGet())
recordItr.close()
}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
index 2806138a89..ecee96bc46 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
@@ -21,7 +21,7 @@ import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.config.HoodieCommonConfig
import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.HoodieLogFile
+import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieLogFile, HoodieRecord, HoodieRecordPayload}
import org.apache.hudi.common.table.log.block.HoodieDataBlock
import org.apache.hudi.common.table.log.{HoodieLogFormat, HoodieMergedLogRecordScanner}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
@@ -78,7 +78,7 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure with ProcedureBuil
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue)
.build
scanner.asScala.foreach(hoodieRecord => {
- val record = hoodieRecord.getData.getInsertValue(schema).get()
+ val record = hoodieRecord.getData.asInstanceOf[HoodieRecordPayload[_]].getInsertValue(schema).get()
if (allRecords.size() < limit) {
allRecords.add(record)
}
@@ -92,10 +92,13 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure with ProcedureBuil
val block = reader.next()
block match {
case dataBlock: HoodieDataBlock =>
- val recordItr = dataBlock.getRecordIterator
+ val mapper = new HoodieRecord.Mapper() {
+ override def apply(data: IndexedRecord) = new HoodieAvroIndexedRecord(data)
+ }
+ val recordItr = dataBlock.getRecordIterator(mapper)
recordItr.asScala.foreach(record => {
if (allRecords.size() < limit) {
- allRecords.add(record)
+ allRecords.add(record.getData.asInstanceOf[IndexedRecord])
}
})
recordItr.close()