You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/07/18 07:09:46 UTC

[GitHub] [hudi] prasannarajaperumal commented on a diff in pull request #5885: [RFC-51][HUDI-3478] Hudi CDC

prasannarajaperumal commented on code in PR #5885:
URL: https://github.com/apache/hudi/pull/5885#discussion_r922989756


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -102,6 +117,15 @@
   protected Map<String, HoodieRecord<T>> keyToNewRecords;
   protected Set<String> writtenRecordKeys;
   protected HoodieFileWriter<IndexedRecord> fileWriter;
+  // a flag that indicate whether allow the change data to write out a cdc log file.
+  protected boolean cdcEnabled = false;

Review Comment:
   Create a sub-class of HoodieAppendHandle - HoodieChangeTrackingAppendHandle and move all the code related to persisting row-level change tracking metadata to the subclass. I prefer naming all methods/parameters as changeTracking instead of CDC. CDC is a feature, ChangeTracking is the action you do during write. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala:
##########
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cdc
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, HoodieDataSourceHelper, HoodieTableSchema, SparkAdapterSupport}
+import org.apache.hudi.HoodieConversionUtils._
+import org.apache.hudi.common.table.cdc.CDCFileTypeEnum._
+import org.apache.hudi.common.table.cdc.CDCUtils._
+import org.apache.hudi.common.table.cdc.CDCOperationEnum._
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, HoodieCommitMetadata, HoodieFileFormat, HoodieFileGroupId, HoodieLogFile, HoodieReplaceCommitMetadata, HoodieWriteStat, WriteOperationType}
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+import org.apache.hudi.common.table.timeline.HoodieTimeline._
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.TableSchemaResolver
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.common.util.StringUtils
+import org.apache.hudi.exception.{HoodieException, HoodieNotSupportedException}
+import org.apache.hudi.internal.schema.InternalSchema
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.{Row, SQLContext, SparkSession}
+import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+class CDCRelation(
+    override val sqlContext: SQLContext,
+    metaClient: HoodieTableMetaClient,
+    cdcSupplementalLogging: Boolean,
+    startInstant: String,
+    endInstant: String,
+    options: Map[String, String]
+) extends BaseRelation with PrunedFilteredScan with Logging {
+
+  val spark: SparkSession = sqlContext.sparkSession
+
+  val fs: FileSystem = metaClient.getFs.getFileSystem
+
+  val basePath: Path = metaClient.getBasePathV2
+
+  val (tableAvroSchema, _) = {
+    val schemaUtil = new TableSchemaResolver(metaClient)
+    val avroSchema = Try(schemaUtil.getTableAvroSchema) match {
+      case Success(schema) => schema
+      case Failure(e) =>
+        throw new IllegalArgumentException("Failed to fetch schema from the table", e)
+    }
+    // try to find internalSchema
+    val internalSchemaFromMeta = try {
+      schemaUtil.getTableInternalSchemaFromCommitMetadata.orElse(InternalSchema.getEmptyInternalSchema)
+    } catch {
+      case _: Exception => InternalSchema.getEmptyInternalSchema
+    }
+    (avroSchema, internalSchemaFromMeta)
+  }
+
+  val tableStructSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
+
+  val commits: Map[HoodieInstant, HoodieCommitMetadata] =
+    CDCRelation.getCompletedCommitInstantInSpecifiedRange(metaClient, startInstant, endInstant)
+
+  /**
+   * Parse the commit metadata between (startInstant, endInstant], and extract the touched partitions
+   * and files to build the filesystem view.
+   */
+  lazy val fsView: HoodieTableFileSystemView = {
+    val touchedPartition = commits.flatMap { case (_, commitMetadata) =>
+      val partitionSet = commitMetadata.getPartitionToWriteStats.keySet()
+      val replacedPartitionSet = commitMetadata match {
+        case replaceCommitMetadata: HoodieReplaceCommitMetadata =>
+          replaceCommitMetadata.getPartitionToReplaceFileIds.keySet().asScala
+        case _ => Set.empty[String]
+      }
+      partitionSet.asScala ++ replacedPartitionSet
+    }.toSet
+    val touchedFiles = touchedPartition.flatMap { partition =>
+      val partitionPath = FSUtils.getPartitionPath(basePath, partition)
+      fs.listStatus(partitionPath)
+    }.toArray
+    new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline.filterCompletedInstants, touchedFiles)
+  }
+
+  /**
+   * At the granularity of a file group, trace the mapping between each commit/instant and changes to this file group.
+   */
+  val changeFilesForPerFileGroupAndCommit: Map[HoodieFileGroupId, HoodieCDCFileGroupSplit] = {
+    val fgToCommitChanges = mutable.Map.empty[HoodieFileGroupId,
+      mutable.Map[HoodieInstant, ChangeFileForSingleFileGroupAndCommit]]
+
+    commits.foreach {
+      case (instant, commitMetadata) =>
+        // parse `partitionToWriteStats` in the metadata of commit
+        commitMetadata.getPartitionToWriteStats.asScala.foreach {
+          case (partition, hoodieWriteStats) =>
+            hoodieWriteStats.asScala.foreach { writeStat =>
+              val fileGroupId = new HoodieFileGroupId(partition, writeStat.getFileId)
+              // Identify the CDC source involved in this commit and
+              // determine its type for subsequent loading using different methods.
+              val changeFile = parseWriteStat(fileGroupId, instant, writeStat,
+                commitMetadata.getOperationType == WriteOperationType.DELETE)
+              if (fgToCommitChanges.contains(fileGroupId)) {
+                fgToCommitChanges(fileGroupId)(instant) = changeFile
+              } else {
+                fgToCommitChanges.put(fileGroupId, mutable.Map(instant -> changeFile))
+              }
+            }
+        }
+
+        // parse `partitionToReplaceFileIds` in the metadata of commit
+        commitMetadata match {
+          case replaceCommitMetadata: HoodieReplaceCommitMetadata =>
+            replaceCommitMetadata.getPartitionToReplaceFileIds.asScala.foreach {
+              case (partition, fileIds) =>
+                fileIds.asScala.foreach { fileId =>
+                  toScalaOption(fsView.fetchLatestFileSlice(partition, fileId)).foreach {
+                    fileSlice =>
+                      val fileGroupId = new HoodieFileGroupId(partition, fileId)
+                      val changeFile =
+                        ChangeFileForSingleFileGroupAndCommit(REPLACED_FILE_GROUP, null, Some(fileSlice))
+                      if (fgToCommitChanges.contains(fileGroupId)) {
+                        fgToCommitChanges(fileGroupId)(instant) = changeFile
+                      } else {
+                        fgToCommitChanges.put(fileGroupId, mutable.Map(instant -> changeFile))
+                      }
+                  }
+                }
+            }
+          case _ =>
+        }
+      case _ =>
+    }
+    fgToCommitChanges.map { case (fgId, instantToChanges) =>
+      (fgId, HoodieCDCFileGroupSplit(instantToChanges.toArray.sortBy(_._1)))
+    }.toMap
+  }
+
+  override final def needConversion: Boolean = false
+
+  override def schema: StructType = CDCRelation.CDC_SPARK_SCHEMA
+
+  override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
+    val internalRows = buildScan0(requiredColumns, filters)
+    internalRows.asInstanceOf[RDD[Row]]
+  }
+
+  def buildScan0(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] = {
+    val nameToField = schema.fields.map(f => f.name -> f).toMap
+    val requiredSchema = StructType(requiredColumns.map(nameToField))
+    val originTableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchema.toString)
+    val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
+      sparkSession = spark,
+      dataSchema = tableStructSchema,
+      partitionSchema = StructType(Nil),
+      requiredSchema = tableStructSchema,
+      filters = Nil,
+      options = options,
+      hadoopConf = spark.sessionState.newHadoopConf()
+    )
+    val cdcRdd = new HoodieCDCRDD(
+      spark,
+      metaClient,
+      cdcSupplementalLogging,
+      parquetReader,
+      originTableSchema,
+      schema,
+      requiredSchema,
+      changeFilesForPerFileGroupAndCommit.values.toArray
+    )
+    cdcRdd.asInstanceOf[RDD[InternalRow]]
+  }
+
+  /**
+   * Parse HoodieWriteStat, judge which type the file is, and what strategy should be used to parse CDC data.
+   * Then build a [[ChangeFileForSingleFileGroupAndCommit]] object.
+   */
+  private def parseWriteStat(

Review Comment:
   Does it make sense to generalize this out of Spark and make the logic to identify the different CDC types and load them common to all clients?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/cdc/CDCFileTypeEnum.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.cdc;
+
+/**
+ * Here define four cdc file types. The different cdc file type will decide which file will be
+ * used to extract the change data, and how to do this.
+ *
+ * CDC_LOG_FILE:
+ *   For this type, there must be a real cdc log file from which we get the whole/part change data.
+ *   when `hoodie.table.cdc.supplemental.logging` is true, it keeps all the fields about the
+ *   change data, including `op`, `ts_ms`, `before` and `after`. So read it and return directly,
+ *   no more other files need to be loaded.
+ *   when `hoodie.table.cdc.supplemental.logging` is false, it just keep the `op` and the key of
+ *   the changing record. When `op` is equal to 'i', `before` is null and get the current record
+ *   from the current base/log file as `after`. When `op` is equal to 'u', get the previous
+ *   record from the previous file slice as `before`, and get the current record from the
+ *   current base/log file `after`. When `op` is equal to 'd', get the previous record from
+ *   the previous file slice as `before`, and `after` is null.
+ *
+ * ADD_BASE_FILE:
+ *   For this type, there must be a base file at the current instant. All the records from this
+ *   file is new-coming, so we can load this, mark all the records with `i`, and treat them as
+ *   the value of `after`. The value of `before` for each record is null.
+ *
+ * REMOVE_BASE_FILE:
+ *   For this type, there must be an empty file at the current instant, but a non-empty base file
+ *   at the previous instant. First we find this base file that has the same file group and belongs
+ *   to the previous instant. Then load this, mark all the records with `d`, and treat them as
+ *   the value of `before`. The value of `after` for each record is null.
+ *
+ * MOR_LOG_FILE:
+ *   For this type, a normal log file of mor table will be used. First we need to load the previous
+ *   file slice(including the base file and other log files in the same file group). Then for each
+ *   record from the log file, get the key of this, and execute the following steps:
+ *     1) if the record is deleted,
+ *       a) if there is a record with the same key in the data loaded, `op` is 'd', 'before' is the
+ *          record from the data loaded, `after` is null;
+ *       b) if there is not a record with the same key in the data loaded, just skip.
+ *     2) the record is not deleted,
+ *       a) if there is a record with the same key in the data loaded, `op` is 'u', 'before' is the
+ *          record from the data loaded, `after` is the current record;
+ *       b) if there is not a record with the same key in the data loaded, `op` is 'i', 'before' is
+ *          null, `after` is the current record;
+ *
+ * REPLACED_FILE_GROUP:
+ *   For this type, it must be a replacecommit, like INSERT_OVERWRITE and DROP_PARTITION. It drops
+ *   a whole file group. First we find this file group. Then load this, mark all the records with
+ *   `d`, and treat them as the value of `before`. The value of `after` for each record is null.
+ */
+public enum CDCFileTypeEnum {
+
+  CDC_LOG_FILE,
+  ADD_BASE_File,

Review Comment:
   s/ADD_BASE_File/ADD_BASE_FILE



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -281,7 +313,18 @@ private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord ol
         return false;
       }
     }
-    return writeRecord(hoodieRecord, indexedRecord, isDelete);
+    boolean result = writeRecord(hoodieRecord, indexedRecord, isDelete);
+    if (cdcEnabled) {
+      if (indexedRecord.isPresent()) {
+        GenericRecord record = (GenericRecord) indexedRecord.get();
+        cdcData.add(cdcRecord(CDCOperationEnum.UPDATE, hoodieRecord.getRecordKey(), hoodieRecord.getPartitionPath(),

Review Comment:
   We will be holding the record data in-memory until the handle is closed when supplemental logging is enabled. Any side-effects to be cautious about?
   We will be deflating the actual record once its written to the file and bloom filter calculation happens after - would there be significant memory pressure if we still hold on to the data for cdc and how do we handle this?



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java:
##########
@@ -44,6 +44,12 @@ public class HoodieWriteStat implements Serializable {
    */
   private String path;
 
+  /**
+   * Relative cdc file path that store the CDC data.
+   */
+  @Nullable
+  private String cdcPath;

Review Comment:
   ChangeTrackingStat



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org