You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/02/11 22:13:06 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #4789: [HUDI-1296] Support Metadata Table in Spark Datasource

nsivabalan commented on a change in pull request #4789:
URL: https://github.com/apache/hudi/pull/4789#discussion_r805009590



##########
File path: hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
##########
@@ -18,363 +18,30 @@
 
 package org.apache.hudi
 
-import java.nio.ByteBuffer
-import java.sql.{Date, Timestamp}
-import java.time.Instant
-
-import org.apache.avro.Conversions.DecimalConversion
-import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis}
-import org.apache.avro.Schema.Type._
-import org.apache.avro.generic.GenericData.{Fixed, Record}
-import org.apache.avro.generic.{GenericData, GenericFixed, GenericRecord}
-import org.apache.avro.{LogicalTypes, Schema}
-
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.avro.SchemaConverters
-import org.apache.spark.sql.catalyst.expressions.GenericRow
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.avro.Schema
 import org.apache.spark.sql.types._
 
-import org.apache.hudi.AvroConversionUtils._
-import org.apache.hudi.exception.HoodieIncompatibleSchemaException
-
-import scala.collection.JavaConverters._
-
 object AvroConversionHelper {
 
-  private def createDecimal(decimal: java.math.BigDecimal, precision: Int, scale: Int): Decimal = {
-    if (precision <= Decimal.MAX_LONG_DIGITS) {
-      // Constructs a `Decimal` with an unscaled `Long` value if possible.
-      Decimal(decimal.unscaledValue().longValue(), precision, scale)
-    } else {
-      // Otherwise, resorts to an unscaled `BigInteger` instead.
-      Decimal(decimal, precision, scale)
-    }
-  }
-
   /**
-    *
-    * Returns a converter function to convert row in avro format to GenericRow of catalyst.
-    *
-    * @param sourceAvroSchema Source schema before conversion inferred from avro file by passed in
-    *                         by user.
-    * @param targetSqlType    Target catalyst sql type after the conversion.
-    * @return returns a converter function to convert row in avro format to GenericRow of catalyst.
+    * @deprecated please use [[AvroConversionUtils.createRowToAvroConverter]]

Review comment:
       guess you meant createAvroToRowConverter 

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
##########
@@ -82,16 +76,11 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
 
   private val fileIndex = if (commitsToReturn.isEmpty) List() else buildFileIndex()
 
-  private val preCombineField = {
-    val preCombineFieldFromTableConfig = metaClient.getTableConfig.getPreCombineField
-    if (preCombineFieldFromTableConfig != null) {
-      Some(preCombineFieldFromTableConfig)
-    } else {
+  private val preCombineFieldOpt =
+    Option(metaClient.getTableConfig.getPreCombineField)
       // get preCombineFiled from the options if this is a old table which have not store
       // the field to hoodie.properties
-      optParams.get(DataSourceReadOptions.READ_PRE_COMBINE_FIELD.key)
-    }
-  }
+      .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key))

Review comment:
       guess this should be READ_PRE_COMBINE_FIELD

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -325,24 +329,33 @@ private object HoodieMergeOnReadRDD {
 
   def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: Configuration): HoodieMergedLogRecordScanner = {
     val fs = FSUtils.getFs(split.tablePath, config)
-    HoodieMergedLogRecordScanner.newBuilder()
-      .withFileSystem(fs)
-      .withBasePath(split.tablePath)
-      .withLogFilePaths(split.logPaths.get.asJava)
-      .withReaderSchema(logSchema)
-      .withLatestInstantTime(split.latestCommit)
-      .withReadBlocksLazily(
-        Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
-          HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
-          .getOrElse(false))
-      .withReverseReader(false)
-      .withBufferSize(
-        config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
-          HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
-      .withMaxMemorySizeInBytes(split.maxCompactionMemoryInBytes)
-      .withSpillableMapBasePath(
-        config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
-          HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
-      .build()
+
+    if (HoodieTableMetadata.isMetadataTable(split.tablePath)) {
+      val metadataConfig = HoodieMetadataConfig.newBuilder().enable(true).build()
+      val dataTableBasePath = getDataTableBasePathFromMetadataTable(split.tablePath)
+      val metadataTable = new HoodieBackedTableMetadata(new HoodieLocalEngineContext(config), metadataConfig, dataTableBasePath, "/tmp")
+
+      metadataTable.getLogRecordScanner(split.logPaths.get.asJava, "blah").getLeft

Review comment:
       can we fix "blah" ? -> "files" we are going to fetch records only from "files" partition right? or even for any partition ? 

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
##########
@@ -41,15 +58,91 @@ abstract class HoodieBaseRelation(
 
   protected val sparkSession: SparkSession = sqlContext.sparkSession
 
-  protected val tableAvroSchema: Schema = {
+  protected lazy val tableAvroSchema: Schema = {
     val schemaUtil = new TableSchemaResolver(metaClient)
-    Try (schemaUtil.getTableAvroSchema).getOrElse(SchemaConverters.toAvroType(userSchema.get))
+    Try(schemaUtil.getTableAvroSchema).getOrElse(
+      // If there is no commit in the table, we can't get the schema
+      // t/h [[TableSchemaResolver]], fallback to provided the [[userSchema]] instead.
+      userSchema match {
+        case Some(s) => SchemaConverters.toAvroType(s)
+        case _ => throw new IllegalArgumentException("User-provided schema is required in case the table is empty")
+      }
+    )
   }
 
   protected val tableStructSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
 
   protected val partitionColumns: Array[String] = metaClient.getTableConfig.getPartitionFields.orElse(Array.empty)
 
-  override def schema: StructType = userSchema.getOrElse(tableStructSchema)
+  override def schema: StructType = tableStructSchema
+}
+
+object HoodieBaseRelation {
+
+  def isMetadataTable(metaClient: HoodieTableMetaClient) =
+    HoodieTableMetadata.isMetadataTable(metaClient.getBasePath)
 
+  /**
+   * Returns file-reader routine accepting [[PartitionedFile]] and returning an [[Iterator]]
+   * over [[InternalRow]]
+   */
+  def createBaseFileReader(spark: SparkSession,
+                           tableSchemas: HoodieTableSchemas,
+                           filters: Array[Filter],
+                           options: Map[String, String],
+                           hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+    val hfileReader = createHFileReader(

Review comment:
       can we move this to else block in L116. 

##########
File path: hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##########
@@ -155,8 +151,7 @@ object HoodieSparkUtils extends SparkAdapterSupport {
     // Note: deserializer.deserializeRow(row) is not capable of handling evolved schema. i.e. if Row was serialized in
     // old schema, but deserializer was created with an encoder with evolved schema, deserialization fails.
     // Hence we always need to deserialize in the same schema as serialized schema.
-    df.queryExecution.toRdd.map(row => deserializer.deserializeRow(row))
-      .mapPartitions { records =>
+    df.queryExecution.toRdd.mapPartitions { records =>

Review comment:
       prior to this patch, records were Iterator[Row] and now its Iterator[InternalRow]. so the transition did not cause any issues is it? bcoz, in L157 
   ```
   AvroConversionHelper.createConverterToAvro(reconciledDataType, structName, recordNamespace), 
   ```
   1st arg is 
   ```
   AvroConversionUtils.convertAvroSchemaToStructType(latestTableSchema)
   ```
   which refers to StructType. 
   can you help me understand. for Row we have a schema of type StructType, but for InternalRow there is no schema as such right. 
   
   
   
   

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
##########
@@ -41,15 +58,91 @@ abstract class HoodieBaseRelation(
 
   protected val sparkSession: SparkSession = sqlContext.sparkSession
 
-  protected val tableAvroSchema: Schema = {
+  protected lazy val tableAvroSchema: Schema = {
     val schemaUtil = new TableSchemaResolver(metaClient)
-    Try (schemaUtil.getTableAvroSchema).getOrElse(SchemaConverters.toAvroType(userSchema.get))
+    Try(schemaUtil.getTableAvroSchema).getOrElse(
+      // If there is no commit in the table, we can't get the schema
+      // t/h [[TableSchemaResolver]], fallback to provided the [[userSchema]] instead.
+      userSchema match {
+        case Some(s) => SchemaConverters.toAvroType(s)
+        case _ => throw new IllegalArgumentException("User-provided schema is required in case the table is empty")
+      }
+    )
   }
 
   protected val tableStructSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
 
   protected val partitionColumns: Array[String] = metaClient.getTableConfig.getPartitionFields.orElse(Array.empty)
 
-  override def schema: StructType = userSchema.getOrElse(tableStructSchema)
+  override def schema: StructType = tableStructSchema
+}
+
+object HoodieBaseRelation {
+
+  def isMetadataTable(metaClient: HoodieTableMetaClient) =
+    HoodieTableMetadata.isMetadataTable(metaClient.getBasePath)
 
+  /**
+   * Returns file-reader routine accepting [[PartitionedFile]] and returning an [[Iterator]]
+   * over [[InternalRow]]
+   */
+  def createBaseFileReader(spark: SparkSession,
+                           tableSchemas: HoodieTableSchemas,
+                           filters: Array[Filter],
+                           options: Map[String, String],
+                           hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+    val hfileReader = createHFileReader(
+      spark = spark,
+      tableSchemas = tableSchemas,
+      filters = filters,
+      options = options,
+      hadoopConf = hadoopConf
+    )
+    val parquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(

Review comment:
       also, can we move this to L114

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
##########
@@ -96,44 +89,48 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
     log.debug(s" buildScan requiredColumns = ${requiredColumns.mkString(",")}")
     log.debug(s" buildScan filters = ${filters.mkString(",")}")
 
+    // NOTE: In case list of requested columns doesn't contain the Primary Key one, we
+    //       have to add it explicitly so that
+    //          - Merging could be performed correctly
+    //          - In case 0 columns are to be fetched (for ex, when doing {@code count()} on Spark's [[Dataset]],
+    //          Spark still fetches all the rows to execute the query correctly
+    //
+    //       It's okay to return columns that have not been requested by the caller, as those nevertheless will be
+    //       filtered out upstream
+    val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
+
     val (requiredAvroSchema, requiredStructSchema) =
-      HoodieSparkUtils.getRequiredSchema(tableAvroSchema, requiredColumns)
+      HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns)
     val fileIndex = buildFileIndex(filters)
-    val hoodieTableState = HoodieMergeOnReadTableState(
-      tableStructSchema,
-      requiredStructSchema,
-      tableAvroSchema.toString,
-      requiredAvroSchema.toString,
-      fileIndex,
-      preCombineField,
-      recordKeyFieldOpt
-    )
-    val fullSchemaParquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
-      sparkSession = sqlContext.sparkSession,
-      dataSchema = tableStructSchema,
+    val tableSchemas = HoodieTableSchemas(
+      tableSchema = tableStructSchema,
       partitionSchema = StructType(Nil),
-      requiredSchema = tableStructSchema,
-      filters = Seq.empty,
+      requiredSchema = requiredStructSchema,
+      tableAvroSchema = tableAvroSchema.toString,
+      requiredAvroSchema = requiredAvroSchema.toString
+    )
+    val tableState = HoodieMergeOnReadTableState(tableSchemas, fileIndex, recordKeyField, preCombineFieldOpt)
+    val fullSchemaParquetReader = createBaseFileReader(
+      spark = sqlContext.sparkSession,
+      tableSchemas = tableSchemas,
+      filters = filters,
       options = optParams,
-      hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+      hadoopConf = conf
     )
-
-    val requiredSchemaParquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(

Review comment:
       prior to this patch we had some manipulation of how we were reading parquet files
   ```
       file: PartitionedFile => {
         val iter = readParquetFile(file)
         val rows = iter.flatMap(_ match {
           case r: InternalRow => Seq(r)
           case b: ColumnarBatch => b.rowIterator().asScala
         })
         rows
       }
   ```
   excerpt from HoodieDataSourceHelper.buildHoodieParquetReader.
   Looks like we need this for vectorized reader. this is very critical for performance. May be we can embed this into HoodieBaseRelation,createBaseFileReader only so that all callers benefit. 

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -325,24 +329,33 @@ private object HoodieMergeOnReadRDD {
 
   def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: Configuration): HoodieMergedLogRecordScanner = {
     val fs = FSUtils.getFs(split.tablePath, config)
-    HoodieMergedLogRecordScanner.newBuilder()
-      .withFileSystem(fs)
-      .withBasePath(split.tablePath)
-      .withLogFilePaths(split.logPaths.get.asJava)
-      .withReaderSchema(logSchema)
-      .withLatestInstantTime(split.latestCommit)
-      .withReadBlocksLazily(
-        Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
-          HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
-          .getOrElse(false))
-      .withReverseReader(false)
-      .withBufferSize(
-        config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
-          HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
-      .withMaxMemorySizeInBytes(split.maxCompactionMemoryInBytes)
-      .withSpillableMapBasePath(
-        config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
-          HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
-      .build()
+
+    if (HoodieTableMetadata.isMetadataTable(split.tablePath)) {
+      val metadataConfig = HoodieMetadataConfig.newBuilder().enable(true).build()
+      val dataTableBasePath = getDataTableBasePathFromMetadataTable(split.tablePath)
+      val metadataTable = new HoodieBackedTableMetadata(new HoodieLocalEngineContext(config), metadataConfig, dataTableBasePath, "/tmp")
+
+      metadataTable.getLogRecordScanner(split.logPaths.get.asJava, "blah").getLeft

Review comment:
       for last arg, can we do config.getSpillableMapBasePath

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
##########
@@ -72,19 +66,18 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
 
   private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf)
 
-  private val preCombineField = {
-    val preCombineFieldFromTableConfig = metaClient.getTableConfig.getPreCombineField
-    if (preCombineFieldFromTableConfig != null) {
-      Some(preCombineFieldFromTableConfig)
-    } else {
+  private val recordKeyField = metaClient.getTableConfig.getRecordKeyFieldProp
+  private val preCombineFieldOpt =
+    Option(metaClient.getTableConfig.getPreCombineField)
       // get preCombineFiled from the options if this is a old table which have not store
       // the field to hoodie.properties
-      optParams.get(DataSourceReadOptions.READ_PRE_COMBINE_FIELD.key)
-    }
-  }
-  private var recordKeyFieldOpt = Option.empty[String]
-  if (!metaClient.getTableConfig.populateMetaFields()) {
-    recordKeyFieldOpt = Option(metaClient.getTableConfig.getRecordKeyFieldProp)
+      .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key))

Review comment:
       DataSourceReadOptions.READ_PRE_COMBINE_FIELD




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org