You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2022/06/26 03:27:21 UTC

[hudi] branch master updated: [HUDI-3502] Support hdfs parquet import command based on Call Produce Command (#5956)

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c43c590ac [HUDI-3502] Support hdfs parquet import command based on Call Produce Command (#5956)
1c43c590ac is described below

commit 1c43c590aca7ac13f474eb5818b6f3859c5dbf40
Author: ForwardXu <fo...@gmail.com>
AuthorDate: Sun Jun 26 11:27:14 2022 +0800

    [HUDI-3502] Support hdfs parquet import command based on Call Produce Command (#5956)
---
 .../apache/hudi/cli/HDFSParquetImporterUtils.java  | 325 +++++++++++++++++++++
 .../procedures/HdfsParquetImportProcedure.scala    |  85 ++++++
 .../hudi/command/procedures/HoodieProcedures.scala |   1 +
 .../procedure/TestHdfsParquetImportProcedure.scala | 202 +++++++++++++
 4 files changed, 613 insertions(+)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java
new file mode 100644
index 0000000000..6937a3389b
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.HoodieJsonPayload;
+import org.apache.hudi.common.config.DFSPropertiesConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.util.LongAccumulator;
+import scala.Tuple2;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.StringReader;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Loads data from Parquet Sources.
+ */
+public class HDFSParquetImporterUtils implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LogManager.getLogger(HDFSParquetImporterUtils.class);
+  private static final DateTimeFormatter PARTITION_FORMATTER = DateTimeFormatter.ofPattern("yyyy/MM/dd")
+      .withZone(ZoneId.systemDefault());
+
+  private final String command;
+  private final String srcPath;
+  private final String targetPath;
+  private final String tableName;
+  private final String tableType;
+  private final String rowKey;
+  private final String partitionKey;
+  private final int parallelism;
+  private final String schemaFile;
+  private int retry;
+  private final String propsFilePath;
+  private final List<String> configs = new ArrayList<>();
+  private TypedProperties props;
+
+  public HDFSParquetImporterUtils(
+      String command,
+      String srcPath,
+      String targetPath,
+      String tableName,
+      String tableType,
+      String rowKey,
+      String partitionKey,
+      int parallelism,
+      String schemaFile,
+      int retry,
+      String propsFilePath) {
+    this.command = command;
+    this.srcPath = srcPath;
+    this.targetPath = targetPath;
+    this.tableName = tableName;
+    this.tableType = tableType;
+    this.rowKey = rowKey;
+    this.partitionKey = partitionKey;
+    this.parallelism = parallelism;
+    this.schemaFile = schemaFile;
+    this.retry = retry;
+    this.propsFilePath = propsFilePath;
+  }
+
+  public boolean isUpsert() {
+    return "upsert".equalsIgnoreCase(this.command);
+  }
+
+  public int dataImport(JavaSparkContext jsc) {
+    FileSystem fs = FSUtils.getFs(this.targetPath, jsc.hadoopConfiguration());
+    this.props = this.propsFilePath == null || this.propsFilePath.isEmpty() ? buildProperties(this.configs)
+        : readConfig(fs.getConf(), new Path(this.propsFilePath), this.configs).getProps(true);
+    LOG.info("Starting data import with configs : " + props.toString());
+    int ret = -1;
+    try {
+      // Verify that targetPath is not present.
+      if (fs.exists(new Path(this.targetPath)) && !isUpsert()) {
+        throw new HoodieIOException(String.format("Make sure %s is not present.", this.targetPath));
+      }
+      do {
+        ret = dataImport(jsc, fs);
+      } while (ret != 0 && retry-- > 0);
+    } catch (Throwable t) {
+      LOG.error("dataImport failed", t);
+    }
+    return ret;
+  }
+
+  public int dataImport(JavaSparkContext jsc, FileSystem fs) {
+    try {
+      if (fs.exists(new Path(this.targetPath)) && !isUpsert()) {
+        // cleanup target directory.
+        fs.delete(new Path(this.targetPath), true);
+      }
+
+      if (!fs.exists(new Path(this.targetPath))) {
+        // Initialize target hoodie table.
+        Properties properties = HoodieTableMetaClient.withPropertyBuilder()
+            .setTableName(this.tableName)
+            .setTableType(this.tableType)
+            .build();
+        HoodieTableMetaClient.initTableAndGetMetaClient(jsc.hadoopConfiguration(), this.targetPath, properties);
+      }
+
+      // Get schema.
+      String schemaStr = parseSchema(fs, this.schemaFile);
+
+      SparkRDDWriteClient<HoodieRecordPayload> client =
+          createHoodieClient(jsc, this.targetPath, schemaStr, this.parallelism, Option.empty(), props);
+
+      JavaRDD<HoodieRecord<HoodieRecordPayload>> hoodieRecords = buildHoodieRecordsForImport(jsc, schemaStr);
+      // Get instant time.
+      String instantTime = client.startCommit();
+      JavaRDD<WriteStatus> writeResponse = load(client, instantTime, hoodieRecords);
+      return handleErrors(jsc, instantTime, writeResponse);
+    } catch (Throwable t) {
+      LOG.error("Error occurred.", t);
+    }
+    return -1;
+  }
+
+  public JavaRDD<HoodieRecord<HoodieRecordPayload>> buildHoodieRecordsForImport(JavaSparkContext jsc,
+                                                                                String schemaStr) throws IOException {
+    Job job = Job.getInstance(jsc.hadoopConfiguration());
+    // Allow recursive directories to be found
+    job.getConfiguration().set(FileInputFormat.INPUT_DIR_RECURSIVE, "true");
+    // To parallelize reading file status.
+    job.getConfiguration().set(FileInputFormat.LIST_STATUS_NUM_THREADS, "1024");
+    AvroReadSupport.setAvroReadSchema(jsc.hadoopConfiguration(), (new Schema.Parser().parse(schemaStr)));
+    ParquetInputFormat.setReadSupportClass(job, (AvroReadSupport.class));
+
+    HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
+    context.setJobStatus(this.getClass().getSimpleName(), "Build records for import: " + this.tableName);
+    return jsc.newAPIHadoopFile(this.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class,
+            job.getConfiguration())
+        // To reduce large number of tasks.
+        .coalesce(16 * this.parallelism).map(entry -> {
+          GenericRecord genericRecord = ((Tuple2<Void, GenericRecord>) entry)._2();
+          Object partitionField = genericRecord.get(this.partitionKey);
+          if (partitionField == null) {
+            throw new HoodieIOException("partition key is missing. :" + this.partitionKey);
+          }
+          Object rowField = genericRecord.get(this.rowKey);
+          if (rowField == null) {
+            throw new HoodieIOException("row field is missing. :" + this.rowKey);
+          }
+          String partitionPath = partitionField.toString();
+          LOG.debug("Row Key : " + rowField + ", Partition Path is (" + partitionPath + ")");
+          if (partitionField instanceof Number) {
+            try {
+              long ts = (long) (Double.parseDouble(partitionField.toString()) * 1000L);
+              partitionPath = PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts));
+            } catch (NumberFormatException nfe) {
+              LOG.warn("Unable to parse date from partition field. Assuming partition as (" + partitionField + ")");
+            }
+          }
+          return new HoodieAvroRecord<>(new HoodieKey(rowField.toString(), partitionPath),
+              new HoodieJsonPayload(genericRecord.toString()));
+        });
+  }
+
+  /**
+   * Imports records to Hoodie table.
+   *
+   * @param client        Hoodie Client
+   * @param instantTime   Instant Time
+   * @param hoodieRecords Hoodie Records
+   * @param <T>           Type
+   */
+  public <T extends HoodieRecordPayload> JavaRDD<WriteStatus> load(SparkRDDWriteClient<T> client, String instantTime,
+                                                                   JavaRDD<HoodieRecord<T>> hoodieRecords) {
+    switch (this.command.toLowerCase()) {
+      case "upsert": {
+        return client.upsert(hoodieRecords, instantTime);
+      }
+      case "bulkinsert": {
+        return client.bulkInsert(hoodieRecords, instantTime);
+      }
+      default: {
+        return client.insert(hoodieRecords, instantTime);
+      }
+    }
+  }
+
+  public static TypedProperties buildProperties(List<String> props) {
+    TypedProperties properties = DFSPropertiesConfiguration.getGlobalProps();
+    props.forEach(x -> {
+      String[] kv = x.split("=");
+      ValidationUtils.checkArgument(kv.length == 2);
+      properties.setProperty(kv[0], kv[1]);
+    });
+    return properties;
+  }
+
+  public static DFSPropertiesConfiguration readConfig(Configuration hadoopConfig, Path cfgPath, List<String> overriddenProps) {
+    DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration(hadoopConfig, cfgPath);
+    try {
+      if (!overriddenProps.isEmpty()) {
+        LOG.info("Adding overridden properties to file properties.");
+        conf.addPropsFromStream(new BufferedReader(new StringReader(String.join("\n", overriddenProps))));
+      }
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Unexpected error adding config overrides", ioe);
+    }
+
+    return conf;
+  }
+
+  /**
+   * Build Hoodie write client.
+   *
+   * @param jsc         Java Spark Context
+   * @param basePath    Base Path
+   * @param schemaStr   Schema
+   * @param parallelism Parallelism
+   */
+  public static SparkRDDWriteClient<HoodieRecordPayload> createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr,
+                                                                            int parallelism, Option<String> compactionStrategyClass, TypedProperties properties) {
+    HoodieCompactionConfig compactionConfig = compactionStrategyClass
+        .map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false)
+            .withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build())
+        .orElse(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build());
+    HoodieWriteConfig config =
+        HoodieWriteConfig.newBuilder().withPath(basePath)
+            .withParallelism(parallelism, parallelism)
+            .withBulkInsertParallelism(parallelism)
+            .withDeleteParallelism(parallelism)
+            .withSchema(schemaStr).combineInput(true, true).withCompactionConfig(compactionConfig)
+            .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
+            .withProps(properties).build();
+    return new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), config);
+  }
+
+  /**
+   * Parse Schema from file.
+   *
+   * @param fs         File System
+   * @param schemaFile Schema File
+   */
+  public static String parseSchema(FileSystem fs, String schemaFile) throws Exception {
+    // Read schema file.
+    Path p = new Path(schemaFile);
+    if (!fs.exists(p)) {
+      throw new Exception(String.format("Could not find - %s - schema file.", schemaFile));
+    }
+    long len = fs.getFileStatus(p).getLen();
+    ByteBuffer buf = ByteBuffer.allocate((int) len);
+    try (FSDataInputStream inputStream = fs.open(p)) {
+      inputStream.readFully(0, buf.array(), 0, buf.array().length);
+    }
+    return new String(buf.array(), StandardCharsets.UTF_8);
+  }
+
+  public static int handleErrors(JavaSparkContext jsc, String instantTime, JavaRDD<WriteStatus> writeResponse) {
+    LongAccumulator errors = jsc.sc().longAccumulator();
+    writeResponse.foreach(writeStatus -> {
+      if (writeStatus.hasErrors()) {
+        errors.add(1);
+        LOG.error(String.format("Error processing records :writeStatus:%s", writeStatus.getStat().toString()));
+      }
+    });
+    if (errors.value() == 0) {
+      LOG.info(String.format("Table imported into hoodie with %s instant time.", instantTime));
+      return 0;
+    }
+    LOG.error(String.format("Import failed with %d errors.", errors.value()));
+    return -1;
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HdfsParquetImportProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HdfsParquetImportProcedure.scala
new file mode 100644
index 0000000000..1589d230ce
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HdfsParquetImportProcedure.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.cli.HDFSParquetImporterUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
+
+import java.util.function.Supplier
+import scala.language.higherKinds
+
+class HdfsParquetImportProcedure extends BaseProcedure with ProcedureBuilder with Logging {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(1, "tableType", DataTypes.StringType, None),
+    ProcedureParameter.required(2, "srcPath", DataTypes.StringType, None),
+    ProcedureParameter.required(3, "targetPath", DataTypes.StringType, None),
+    ProcedureParameter.required(4, "rowKey", DataTypes.StringType, None),
+    ProcedureParameter.required(5, "partitionKey", DataTypes.StringType, None),
+    ProcedureParameter.required(6, "schemaFilePath", DataTypes.StringType, None),
+    ProcedureParameter.optional(7, "format", DataTypes.StringType, "parquet"),
+    ProcedureParameter.optional(8, "command", DataTypes.StringType, "insert"),
+    ProcedureParameter.optional(9, "retry", DataTypes.IntegerType, 0),
+    ProcedureParameter.optional(10, "parallelism", DataTypes.IntegerType, jsc.defaultParallelism),
+    ProcedureParameter.optional(11, "propsFilePath", DataTypes.StringType, "")
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("import_result", DataTypes.IntegerType, nullable = true, Metadata.empty)
+  ))
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String]
+    val tableType = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
+    val srcPath = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String]
+    val targetPath = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[String]
+    val rowKey = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[String]
+    val partitionKey = getArgValueOrDefault(args, PARAMETERS(5)).get.asInstanceOf[String]
+    val schemaFilePath = getArgValueOrDefault(args, PARAMETERS(6)).get.asInstanceOf[String]
+    val format = getArgValueOrDefault(args, PARAMETERS(7)).get.asInstanceOf[String]
+    val command = getArgValueOrDefault(args, PARAMETERS(8)).get.asInstanceOf[String]
+    val retry = getArgValueOrDefault(args, PARAMETERS(9)).get.asInstanceOf[Int]
+    val parallelism = getArgValueOrDefault(args, PARAMETERS(10)).get.asInstanceOf[Int]
+    val propsFilePath = getArgValueOrDefault(args, PARAMETERS(11)).get.asInstanceOf[String]
+
+    val parquetImporterUtils: HDFSParquetImporterUtils = new HDFSParquetImporterUtils(command, srcPath, targetPath,
+      tableName, tableType, rowKey, partitionKey, parallelism, schemaFilePath, retry, propsFilePath)
+
+    Seq(Row(parquetImporterUtils.dataImport(jsc)))
+  }
+
+  override def build = new HdfsParquetImportProcedure()
+}
+
+object HdfsParquetImportProcedure {
+  val NAME = "hdfs_parquet_import"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new HdfsParquetImportProcedure()
+  }
+}
+
+
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index 7f29f4b86f..33ca211b03 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -54,6 +54,7 @@ object HoodieProcedures {
     mapBuilder.put(ShowHoodieLogFileRecordsProcedure.NAME, ShowHoodieLogFileRecordsProcedure.builder)
     mapBuilder.put(StatsWriteAmplificationProcedure.NAME, StatsWriteAmplificationProcedure.builder)
     mapBuilder.put(StatsFileSizeProcedure.NAME, StatsFileSizeProcedure.builder)
+    mapBuilder.put(HdfsParquetImportProcedure.NAME, HdfsParquetImportProcedure.builder)
     mapBuilder.build
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHdfsParquetImportProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHdfsParquetImportProcedure.scala
new file mode 100644
index 0000000000..90e6164c10
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHdfsParquetImportProcedure.scala
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestUtils}
+import org.apache.hudi.testutils.HoodieClientTestUtils
+import org.apache.parquet.avro.AvroParquetWriter
+import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
+import org.junit.jupiter.api.Assertions.assertTrue
+
+import java.io.IOException
+import java.util
+import java.util.Objects
+import java.util.concurrent.TimeUnit
+
+class TestHdfsParquetImportProcedure extends HoodieSparkSqlTestBase {
+
+  test("Test Call hdfs_parquet_import Procedure with insert operation") {
+    withTempDir { tmp =>
+      val fs: FileSystem = FSUtils.getFs(tmp.getCanonicalPath, spark.sparkContext.hadoopConfiguration)
+      val tableName = generateTableName
+      val tablePath = tmp.getCanonicalPath + Path.SEPARATOR + tableName
+      val sourcePath = new Path(tmp.getCanonicalPath, "source")
+      val targetPath = new Path(tablePath)
+      val schemaFile = new Path(tmp.getCanonicalPath, "file.schema").toString
+
+      // create schema file
+      val schemaFileOS = fs.create(new Path(schemaFile))
+      try schemaFileOS.write(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA.getBytes)
+      finally if (schemaFileOS != null) schemaFileOS.close()
+
+      val insertData: util.List[GenericRecord] = createInsertRecords(sourcePath)
+
+      // Check required fields
+      checkExceptionContain(s"""call hdfs_parquet_import(tableType => 'mor')""")(
+        s"Argument: table is required")
+
+      checkAnswer(
+        s"""call hdfs_parquet_import(
+           |table => '$tableName', tableType => '${HoodieTableType.COPY_ON_WRITE.name}',
+           |srcPath => '$sourcePath', targetPath => '$targetPath',
+           |rowKey => '_row_key', partitionKey => 'timestamp',
+           |schemaFilePath => '$schemaFile')""".stripMargin) {
+        Seq(0)
+      }
+
+      verifyResultData(insertData, fs, tablePath)
+    }
+  }
+
+  test("Test Call hdfs_parquet_import Procedure with upsert operation") {
+    withTempDir { tmp =>
+      val fs: FileSystem = FSUtils.getFs(tmp.getCanonicalPath, spark.sparkContext.hadoopConfiguration)
+      val tableName = generateTableName
+      val tablePath = tmp.getCanonicalPath + Path.SEPARATOR + tableName
+      val sourcePath = new Path(tmp.getCanonicalPath, "source")
+      val targetPath = new Path(tablePath)
+      val schemaFile = new Path(tmp.getCanonicalPath, "file.schema").toString
+
+      // create schema file
+      val schemaFileOS = fs.create(new Path(schemaFile))
+      try schemaFileOS.write(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA.getBytes)
+      finally if (schemaFileOS != null) schemaFileOS.close()
+
+      val insertData: util.List[GenericRecord] = createUpsertRecords(sourcePath)
+
+      // Check required fields
+      checkExceptionContain(s"""call hdfs_parquet_import(tableType => 'mor')""")(
+        s"Argument: table is required")
+
+      checkAnswer(
+        s"""call hdfs_parquet_import(
+           |table => '$tableName', tableType => '${HoodieTableType.COPY_ON_WRITE.name}',
+           |srcPath => '$sourcePath', targetPath => '$targetPath',
+           |rowKey => '_row_key', partitionKey => 'timestamp',
+           |schemaFilePath => '$schemaFile', command => 'upsert')""".stripMargin) {
+        Seq(0)
+      }
+
+      verifyResultData(insertData, fs, tablePath)
+    }
+  }
+
+  @throws[ParseException]
+  @throws[IOException]
+  def createInsertRecords(srcFolder: Path): util.List[GenericRecord] = {
+    import scala.collection.JavaConversions._
+    val srcFile: Path = new Path(srcFolder.toString, "file1.parquet")
+    val startTime: Long = HoodieActiveTimeline.parseDateFromInstantTime("20170203000000").getTime / 1000
+    val records: util.List[GenericRecord] = new util.ArrayList[GenericRecord]
+    for (recordNum <- 0 until 96) {
+      records.add(new HoodieTestDataGenerator().generateGenericRecord(recordNum.toString,
+        "0", "rider-" + recordNum, "driver-" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)))
+    }
+    try {
+      val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](srcFile)
+        .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf).build
+      try {
+        for (record <- records) {
+          writer.write(record)
+        }
+      } finally {
+        if (writer != null) writer.close()
+      }
+    }
+    records
+  }
+
+  @throws[ParseException]
+  @throws[IOException]
+  def createUpsertRecords(srcFolder: Path): util.List[GenericRecord] = {
+    import scala.collection.JavaConversions._
+    val srcFile = new Path(srcFolder.toString, "file1.parquet")
+    val startTime = HoodieActiveTimeline.parseDateFromInstantTime("20170203000000").getTime / 1000
+    val records = new util.ArrayList[GenericRecord]
+    // 10 for update
+    val dataGen = new HoodieTestDataGenerator
+    for (recordNum <- 0 until 11) {
+      records.add(dataGen.generateGenericRecord(recordNum.toString, "0", "rider-upsert-" + recordNum, "driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)))
+    }
+    // 4 for insert
+    for (recordNum <- 96 until 100) {
+      records.add(dataGen.generateGenericRecord(recordNum.toString, "0", "rider-upsert-" + recordNum, "driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)))
+    }
+    try {
+      val writer = AvroParquetWriter.builder[GenericRecord](srcFile).withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf).build
+      try {
+        for (record <- records) {
+          writer.write(record)
+        }
+      } finally {
+        if (writer != null) writer.close()
+      }
+    }
+    records
+  }
+
+  private def verifyResultData(expectData: util.List[GenericRecord], fs: FileSystem, tablePath: String): Unit = {
+    import scala.collection.JavaConversions._
+    val jsc = new JavaSparkContext(spark.sparkContext)
+    val ds = HoodieClientTestUtils.read(jsc, tablePath, spark.sqlContext, fs, tablePath + "/*/*/*/*")
+    val readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList()
+    val result = readData.toList.map((row: Row) =>
+      new HoodieTripModel(row.getLong(0), row.getString(1),
+        row.getString(2), row.getString(3), row.getDouble(4), row.getDouble(5), row.getDouble(6), row.getDouble(7))
+    )
+    val expected = expectData.toList.map((g: GenericRecord) => new HoodieTripModel(Long.unbox(g.get("timestamp")),
+      g.get("_row_key").toString, g.get("rider").toString, g.get("driver").toString, g.get("begin_lat").toString.toDouble,
+      g.get("begin_lon").toString.toDouble, g.get("end_lat").toString.toDouble, g.get("end_lon").toString.toDouble))
+
+    assertTrue(expected.size == result.size || (result.containsAll(expected) && expected.containsAll(result)))
+  }
+
+  class HoodieTripModel(
+     var timestamp: Long,
+     var rowKey: String,
+     var rider: String,
+     var driver: String,
+     var beginLat: Double,
+     var beginLon: Double,
+     var endLat: Double,
+     var endLon: Double) {
+    override def equals(o: Any): Boolean = {
+      if (this == o) {
+        true
+      } else if (o == null || (getClass ne o.getClass)) {
+        false
+      } else {
+        val other = o.asInstanceOf[HoodieTripModel]
+        timestamp == other.timestamp && rowKey == other.rowKey && rider == other.rider &&
+          driver == other.driver && beginLat == other.beginLat && beginLon == other.beginLon &&
+          endLat == other.endLat && endLon == other.endLon
+      }
+    }
+
+    override def hashCode: Int = Objects.hashCode(timestamp, rowKey, rider, driver, beginLat, beginLon, endLat, endLon)
+  }
+}