You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/04/01 01:51:45 UTC

[GitHub] [hudi] vinothchandar commented on a change in pull request #4910: [RFC-33] [HUDI-2429][Stacked on HUDI-2560] Support full Schema evolution for Spark

vinothchandar commented on a change in pull request #4910:
URL: https://github.com/apache/hudi/pull/4910#discussion_r840006716



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -167,6 +167,22 @@
           + "implementations of HoodieRecordPayload to convert incoming records to avro. This is also used as the write schema "
           + "evolving records during an update.");
 
+  public static final ConfigProperty<String> INTERNAL_SCHEMA_STRING = ConfigProperty

Review comment:
       will revisit these docs.

##########
File path: pom.xml
##########
@@ -127,6 +127,8 @@
     <flink.runtime.artifactId>flink-runtime</flink.runtime.artifactId>
     <flink.table.runtime.artifactId>flink-table-runtime_${scala.binary.version}</flink.table.runtime.artifactId>
     <flink.table.planner.artifactId>flink-table-planner_${scala.binary.version}</flink.table.planner.artifactId>
+    <spark31.version>3.1.2</spark31.version>

Review comment:
       Need to ensure the pom changes are compatible with all thats gone into master

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
##########
@@ -73,7 +73,8 @@
       INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION,
       REQUESTED_RESTORE_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION,
       ROLLBACK_EXTENSION, REQUESTED_ROLLBACK_EXTENSION, INFLIGHT_ROLLBACK_EXTENSION,
-      REQUESTED_REPLACE_COMMIT_EXTENSION, INFLIGHT_REPLACE_COMMIT_EXTENSION, REPLACE_COMMIT_EXTENSION));
+      REQUESTED_REPLACE_COMMIT_EXTENSION, INFLIGHT_REPLACE_COMMIT_EXTENSION, REPLACE_COMMIT_EXTENSION,
+      REQUESTED_SAVE_SCHEMA_ACTION_EXTENSION, INFLIGHT_SAVE_SCHEMA_ACTION_EXTENSION, SAVE_SCHEMA_ACTION_EXTENSION));

Review comment:
       can we call this `ALTER_SCHEMA` as well, to be consistent?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
##########
@@ -55,6 +55,8 @@
   String COMPACTION_ACTION = "compaction";
   String REQUESTED_EXTENSION = ".requested";
   String RESTORE_ACTION = "restore";
+  // only for schema save
+  String SAVE_SCHEMA_ACTION = "schemacommit";

Review comment:
       then lets just call it SCHEMA_COMMIT_ACTION? 

##########
File path: hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieVectorizedParquetRecordReader.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hudi.client.utils.SparkInternalSchemaConverter;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+import java.io.IOException;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+
+public class Spark32HoodieVectorizedParquetRecordReader extends VectorizedParquetRecordReader {

Review comment:
       oh god. we have our own reader now? :) 

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
##########
@@ -82,21 +85,33 @@ class IncrementalRelation(val sqlContext: SQLContext,
   private val commitsToReturn = commitsTimelineToReturn.getInstants.iterator().toList
 
   // use schema from a file produced in the end/latest instant
-  val usedSchema: StructType = {
+
+  val (usedSchema, internalSchema) = {
     log.info("Inferring schema..")
     val schemaResolver = new TableSchemaResolver(metaClient)
-    val tableSchema = if (useEndInstantSchema) {
+    val iSchema = if (useEndInstantSchema && !commitsToReturn.isEmpty) {
+      InternalSchemaCache.searchSchemaAndCache(commitsToReturn.last.getTimestamp.toLong, metaClient, hoodieTable.getConfig.getInternalSchemaCacheEnable)
+    } else {
+      schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(null)
+    }

Review comment:
       IIUC if schema evol is not turned on, there is nothing to find in the cache (default turned off) and since LATEST_SCHEMA wont be written into commit metadta - iSchema will be null. Corrct? 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
##########
@@ -588,6 +589,9 @@ public void archive(HoodieEngineContext context, List<HoodieInstant> instants) t
         }
       }
       writeToFile(wrapperSchema, records);
+      // try to clean old history schema.
+      FileBasedInternalSchemaStorageManager fss = new FileBasedInternalSchemaStorageManager(metaClient);

Review comment:
       Ccarrying over the same comment. this should happen at the cleaner?

##########
File path: hudi-common/pom.xml
##########
@@ -108,6 +108,13 @@
       <artifactId>jackson-databind</artifactId>
     </dependency>
 
+    <!-- caffeine -->
+    <dependency>
+      <groupId>com.github.ben-manes.caffeine</groupId>
+      <artifactId>caffeine</artifactId>
+      <version>2.9.1</version>

Review comment:
       Pull version into root pom

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
##########
@@ -78,12 +90,41 @@ public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<Hood
 
     BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
     HoodieFileReader<GenericRecord> reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath());
+
+    Option<InternalSchema> querySchemaOpt = SerDeHelper.fromJson(table.getConfig().getInternalSchema());

Review comment:
       Would this be empty if schema evol is not enabled?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -167,6 +167,22 @@
           + "implementations of HoodieRecordPayload to convert incoming records to avro. This is also used as the write schema "
           + "evolving records during an update.");
 
+  public static final ConfigProperty<String> INTERNAL_SCHEMA_STRING = ConfigProperty
+      .key("hoodie.internal.schema")
+      .noDefaultValue()
+      .withDocumentation("Schema string representing the latest schema of the table. Hudi passes this to "
+          + "implementations of evolution of schema");
+
+  public static final ConfigProperty<Boolean> SCHEMA_EVOLUTION_ENABLE = ConfigProperty

Review comment:
       This almost seems it like Hudi does not evolve schemas today. Could we call this `schema.on.read.enable` (what we have today is a more conservative schema.on.write approach. what are your thoughts?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
##########
@@ -241,6 +255,31 @@ protected void commit(HoodieTable table, String commitActionType, String instant
     HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
     // Finalize write
     finalizeWrite(table, instantTime, stats);
+    // do save internal schema to support Implicitly add columns in write process

Review comment:
       Pull into a Separate method?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
##########
@@ -117,8 +120,10 @@ public abstract void preCompact(
     // log file.That is because in the case of MergeInto, the config.getSchema may not
     // the same with the table schema.
     try {
-      Schema readerSchema = schemaResolver.getTableAvroSchema(false);
-      config.setSchema(readerSchema.toString());
+      if (StringUtils.isNullOrEmpty(config.getInternalSchema())) {

Review comment:
       this should be empty if the latest_schema key is not written at all i.e schema evol is not turned on?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public class InternalSchemaCache {
+  // Use segment lock to reduce competition.
+  // the lock size should be powers of 2 for better hash.
+  private static Object[] lockList = new Object[16];
+
+  static {
+    for (int i = 0; i < lockList.length; i++) {
+      lockList[i] = new Object();
+    }
+  }
+
+  // historySchemas cache maintain a map about (tablePath, HistorySchemas).
+  // this is a Global cache, all threads in one container/executor share the same cache.
+  private static final Cache<String, TreeMap<Long, InternalSchema>>
+      HISTORICAL_SCHEMA_CACHE = Caffeine.newBuilder().maximumSize(1000).weakValues().build();
+
+  /**
+   * Search internalSchema based on versionID.
+   * first step: try to get internalSchema from hoodie commit files, we no need to add lock.
+   * if we cannot get internalSchema by first step, then we try to get internalSchema from cache.
+   *
+   * @param versionID schema version_id need to search
+   * @param metaClient current hoodie metaClient
+   * @return internalSchema
+   */
+  public static InternalSchema searchSchemaAndCache(long versionID, HoodieTableMetaClient metaClient, boolean cacheEnable) {
+    Option<InternalSchema> candidateSchema = getSchemaByReadingCommitFile(versionID, metaClient);
+    if (candidateSchema.isPresent()) {
+      return candidateSchema.get();
+    }
+    if (!cacheEnable) {
+      // parse history schema and return directly
+      return InternalSchemaUtils.searchSchema(versionID, getHistoricalSchemas(metaClient));
+    }
+    String tablePath = metaClient.getBasePath();
+    // use segment lock to reduce competition.
+    synchronized (lockList[tablePath.hashCode() & (lockList.length - 1)]) {
+      TreeMap<Long, InternalSchema> historicalSchemas = HISTORICAL_SCHEMA_CACHE.getIfPresent(tablePath);
+      if (historicalSchemas == null || InternalSchemaUtils.searchSchema(versionID, historicalSchemas) == null) {
+        historicalSchemas = getHistoricalSchemas(metaClient);
+        HISTORICAL_SCHEMA_CACHE.put(tablePath, historicalSchemas);
+      } else {
+        long maxVersionId = historicalSchemas.keySet().stream().max(Long::compareTo).get();
+        if (versionID > maxVersionId) {
+          historicalSchemas = getHistoricalSchemas(metaClient);
+          HISTORICAL_SCHEMA_CACHE.put(tablePath, historicalSchemas);
+        }
+      }
+      return InternalSchemaUtils.searchSchema(versionID, historicalSchemas);
+    }
+  }
+
+  private static TreeMap<Long, InternalSchema> getHistoricalSchemas(HoodieTableMetaClient metaClient) {
+    TreeMap<Long, InternalSchema> result = new TreeMap<>();
+    FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(metaClient);
+    String historySchemaStr = schemasManager.getHistorySchemaStr();
+    if (!StringUtils.isNullOrEmpty(historySchemaStr)) {
+      result = SerDeHelper.parseSchemas(historySchemaStr);
+    }
+    return result;
+  }
+
+  private static Option<InternalSchema> getSchemaByReadingCommitFile(long versionID, HoodieTableMetaClient metaClient) {
+    try {
+      HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+      List<HoodieInstant> instants = timeline.getInstants().filter(f -> f.getTimestamp().equals(String.valueOf(versionID))).collect(Collectors.toList());
+      if (instants.isEmpty()) {
+        return Option.empty();
+      }
+      byte[] data = timeline.getInstantDetails(instants.get(0)).get();
+      HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
+      String latestInternalSchemaStr = metadata.getMetadata(SerDeHelper.LATEST_SCHEMA);
+      return SerDeHelper.fromJson(latestInternalSchemaStr);
+    } catch (Exception e) {
+      throw new HoodieException("Failed to read schema from commit metadata", e);
+    }
+  }
+
+  /**
+   * Get internalSchema and avroSchema for compaction/cluster operation.
+   *
+   * @param metaClient current hoodie metaClient
+   * @param compactionAndClusteringInstant first instant before current compaction/cluster instant
+   * @return (internalSchemaStrOpt, avroSchemaStrOpt) a pair of InternalSchema/avroSchema
+   */
+  public static Pair<Option<String>, Option<String>> getInternalSchemaAndAvroSchemaForClusteringAndCompaction(HoodieTableMetaClient metaClient, String compactionAndClusteringInstant) {
+    // try to load internalSchema to support Schema Evolution
+    HoodieTimeline timelineBeforeCurrentCompaction = metaClient.getCommitsAndCompactionTimeline().findInstantsBefore(compactionAndClusteringInstant).filterCompletedInstants();
+    Option<HoodieInstant> lastInstantBeforeCurrentCompaction =  timelineBeforeCurrentCompaction.lastInstant();
+    if (lastInstantBeforeCurrentCompaction.isPresent()) {
+      // try to find internalSchema
+      byte[] data = timelineBeforeCurrentCompaction.getInstantDetails(lastInstantBeforeCurrentCompaction.get()).get();
+      HoodieCommitMetadata metadata;
+      try {
+        metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
+      } catch (Exception e) {
+        throw new HoodieException(String.format("cannot read metadata from commit: %s", lastInstantBeforeCurrentCompaction.get()), e);
+      }
+      String internalSchemaStr = metadata.getMetadata(SerDeHelper.LATEST_SCHEMA);
+      if (internalSchemaStr != null) {
+        String existingSchemaStr = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
+        return Pair.of(Option.of(internalSchemaStr), Option.of(existingSchemaStr));
+      }
+    }
+    return Pair.of(Option.empty(), Option.empty());
+  }
+
+  /**
+   * Give a schema versionId return its internalSchema.
+   * This method will be called by spark tasks, we should minimize time cost.
+   * We try our best to not use metaClient, since the initialization of metaClient is time cost
+   * step1:
+   * try to parser internalSchema from HoodieInstant directly
+   * step2:
+   * if we cannot parser internalSchema in step1,
+   * try to find internalSchema in historySchema.
+   *
+   * @param versionId the internalSchema version to be search.
+   * @param tablePath table path
+   * @param hadoopConf conf
+   * @param validCommits current validate commits, use to make up the commit file path/verify the validity of the history schema files
+   * @return a internalSchema.
+   */
+  public static InternalSchema getInternalSchemaByVersionId(long versionId, String tablePath, Configuration hadoopConf, String validCommits) {
+    Set<String> commitSet = Arrays.stream(validCommits.split(",")).collect(Collectors.toSet());
+    List<String> validateCommitList = commitSet.stream().map(fileName -> {
+      String fileExtension = HoodieInstant.getTimelineFileExtension(fileName);
+      return fileName.replace(fileExtension, "");
+    }).collect(Collectors.toList());
+
+    FileSystem fs = FSUtils.getFs(tablePath, hadoopConf);
+    Path hoodieMetaPath = new Path(tablePath, HoodieTableMetaClient.METAFOLDER_NAME);
+    //step1:
+    Path candidateCommitFile = commitSet.stream().filter(fileName -> {
+      String fileExtension = HoodieInstant.getTimelineFileExtension(fileName);
+      return fileName.replace(fileExtension, "").equals(versionId + "");
+    }).findFirst().map(f -> new Path(hoodieMetaPath, f)).orElse(null);
+    if (candidateCommitFile != null) {
+      try {
+        byte[] data;
+        try (FSDataInputStream is = fs.open(candidateCommitFile)) {
+          data = FileIOUtils.readAsByteArray(is);
+        } catch (IOException e) {
+          throw e;
+        }
+        HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
+        String latestInternalSchemaStr = metadata.getMetadata(SerDeHelper.LATEST_SCHEMA);
+        if (latestInternalSchemaStr != null) {
+          return SerDeHelper.fromJson(latestInternalSchemaStr).orElse(null);
+        }
+      } catch (Exception e1) {
+        // swallow this exception.

Review comment:
       log something?

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java
##########
@@ -65,7 +65,7 @@ private HoodieMetadataMergedLogRecordReader(FileSystem fs, String basePath, Stri
                                               Option<InstantRange> instantRange, boolean enableFullScan) {
     super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, false, false, bufferSize,
         spillableMapBasePath, instantRange, false, diskMapType, isBitCaskDiskMapCompressionEnabled, false,
-        enableFullScan, Option.of(partitionName));
+        enableFullScan, Option.of(partitionName), null);

Review comment:
       dont love passing in `null` as a sentinel value. change to empty?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
##########
@@ -109,6 +114,9 @@
   private final FileSystem fs;
   // Total log files read - for metrics
   private AtomicLong totalLogFiles = new AtomicLong(0);
+  // Internal schema
+  private InternalSchema internalSchema;
+  private final String path;

Review comment:
       add javadocs for this?

##########
File path: hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.net.URI
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hudi.client.utils.SparkInternalSchemaConverter
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.collection.Pair
+import org.apache.hudi.internal.schema.InternalSchema
+import org.apache.hudi.internal.schema.action.InternalSchemaMerger
+import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader}
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+class Spark32HoodieParquetFileFormat extends ParquetFileFormat {
+
+  // reference ParquetFileFormat from spark project
+  override def buildReaderWithPartitionValues(
+                                               sparkSession: SparkSession,
+                                               dataSchema: StructType,
+                                               partitionSchema: StructType,
+                                               requiredSchema: StructType,
+                                               filters: Seq[Filter],
+                                               options: Map[String, String],
+                                               hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+    if (hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, "").isEmpty) {
+      // fallback to origin parquet File read
+      super.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf)

Review comment:
       Okay ! this should preserve existing perf

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
##########
@@ -60,6 +60,12 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
                                     tableSchema: HoodieTableSchema,
                                     requiredSchema: HoodieTableSchema,
                                     filters: Array[Filter]): HoodieUnsafeRDD = {
+    if (!internalSchema.isEmptySchema) {
+      // it is safe to enable vectorizedReader
+      sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled", "false")

Review comment:
       whats the impact of this?

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
##########
@@ -110,15 +110,15 @@ object AlterHoodieTableAddColumnsCommand {
       HoodieWriterUtils.parametersWithWriteDefaults(hoodieCatalogTable.catalogProperties).asJava
     )
 
-    val commitActionType = CommitUtils.getCommitActionType(WriteOperationType.INSERT, hoodieCatalogTable.tableType)
+    val commitActionType = CommitUtils.getCommitActionType(WriteOperationType.ALTER_SCHEMA, hoodieCatalogTable.tableType)

Review comment:
       does this work even if the writer does not turn on schema evolution originally for the table? say it was created using DeltaStreamer tool

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -315,6 +329,32 @@ object HoodieSparkSqlWriter {
     processedRecord
   }
 
+  def addSchemaEvolutionParameters(parameters: Map[String, String], internalSchemaOpt: Option[InternalSchema]): Map[String, String] = {
+    val schemaEvolutionEnable = if (internalSchemaOpt.isDefined) "true" else "false"
+    parameters ++ Map(HoodieWriteConfig.INTERNAL_SCHEMA_STRING.key() -> SerDeHelper.toJson(internalSchemaOpt.getOrElse(null)),
+      HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE.key() -> schemaEvolutionEnable)
+  }
+
+  /**
+    * get latest internalSchema from table
+    *
+    * @param fs           instance of FileSystem.
+    * @param basePath     base path.
+    * @param sparkContext instance of spark context.
+    * @param schema       incoming record's schema.
+    * @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required.
+    */
+  def getLatestTableInternalSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext): Option[InternalSchema] = {
+    if (FSUtils.isTableExists(basePath.toString, fs)) {

Review comment:
       can we handle the exception and return None as opposed to checking `isTableExists`

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
##########
@@ -46,7 +49,7 @@ object HoodieDataSourceHelper extends PredicateHelper {
                                options: Map[String, String],
                                hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
 
-    val readParquetFile: PartitionedFile => Iterator[Any] = new ParquetFileFormat().buildReaderWithPartitionValues(
+    val readParquetFile: PartitionedFile => Iterator[Any] = sparkAdapter.createHoodieParquetFileFormat().get.buildReaderWithPartitionValues(

Review comment:
        I understand you are subclasses the original class. Have you verfied there are no performance diffrences with the file format switch? 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
##########
@@ -55,6 +55,8 @@
   String COMPACTION_ACTION = "compaction";
   String REQUESTED_EXTENSION = ".requested";
   String RESTORE_ACTION = "restore";
+  // only for schema save
+  String SAVE_SCHEMA_ACTION = "schemacommit";

Review comment:
       Should the rest of the code treat this as a write ? i.e should be include this in the getWriteTimeline() method?

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
##########
@@ -61,6 +61,11 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
                                     tableSchema: HoodieTableSchema,
                                     requiredSchema: HoodieTableSchema,
                                     filters: Array[Filter]): HoodieMergeOnReadRDD = {
+    if (!internalSchema.isEmptySchema) {

Review comment:
       pull this into a method and share across the different classes?

##########
File path: hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieVectorizedParquetRecordReader.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hudi.client.utils.SparkInternalSchemaConverter;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+import java.io.IOException;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+
+public class Spark32HoodieVectorizedParquetRecordReader extends VectorizedParquetRecordReader {

Review comment:
       but we just seem to be wrapping the super class 

##########
File path: hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieVectorizedParquetRecordReader.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hudi.client.utils.SparkInternalSchemaConverter;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+import java.io.IOException;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+
+public class Spark32HoodieVectorizedParquetRecordReader extends VectorizedParquetRecordReader {

Review comment:
       how is this wired to the file format and invoked?

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
##########
@@ -78,4 +81,11 @@ object HoodieDataSourceHelper extends PredicateHelper {
     }
   }
 
+  def getConfigurationForInternalSchema(conf: Configuration, internalSchema: InternalSchema, tablePath: String, validCommits: String): Configuration = {

Review comment:
       add docs to explain why we do this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org