You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/02/11 19:33:33 UTC

[GitHub] [hudi] yihua commented on a change in pull request #4782: [HUDI-3398] TableSchemaResolver may fail for metadata table

yihua commented on a change in pull request #4782:
URL: https://github.com/apache/hudi/pull/4782#discussion_r804927321



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
##########
@@ -115,6 +121,21 @@ private MessageType getTableParquetSchemaFromDataFile() {
     }
   }
 
+  private MessageType getMessageType(String filePath) throws IOException {

Review comment:
       nit: rename to `readSchemaFromBaseFile`?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
##########
@@ -422,6 +443,55 @@ public MessageType readSchemaFromBaseFile(Path parquetFilePath) throws IOExcepti
     return fileFooter.getFileMetaData().getSchema();
   }
 
+  /**
+   * Read the parquet schema from a HFile.
+   */
+  public MessageType readSchemaFromHFileBaseFile(Path hFilePath) throws IOException {
+    LOG.info("Reading schema from " + hFilePath);
+
+    FileSystem fs = metaClient.getRawFs();
+    if (!fs.exists(hFilePath)) {
+      throw new IllegalArgumentException(
+          "Failed to read schema from data file " + hFilePath + ". File does not exist.");
+    }
+
+    CacheConfig cacheConfig = new CacheConfig(fs.getConf());
+    HoodieHFileReader<IndexedRecord> hFileReader = new HoodieHFileReader<>(fs.getConf(), hFilePath, cacheConfig);
+
+    return convertAvroSchemaToParquet(hFileReader.getSchema());
+  }
+
+
+  /**
+   * Read the parquet schema from a ORC file.
+   */
+  public MessageType readSchemaFromORCBaseFile(Path orcFilePath) throws IOException {

Review comment:
       After looking our codebase, it should be better to reuse `BaseFileUtils` or `HoodieFileReaderFactory` to abstract out the format-specific logic here.  `BaseFileUtils::readAvroSchema` can be used to read the schema.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
##########
@@ -422,6 +443,55 @@ public MessageType readSchemaFromBaseFile(Path parquetFilePath) throws IOExcepti
     return fileFooter.getFileMetaData().getSchema();
   }
 
+  /**
+   * Read the parquet schema from a HFile.
+   */
+  public MessageType readSchemaFromHFileBaseFile(Path hFilePath) throws IOException {
+    LOG.info("Reading schema from " + hFilePath);
+
+    FileSystem fs = metaClient.getRawFs();
+    if (!fs.exists(hFilePath)) {
+      throw new IllegalArgumentException(
+          "Failed to read schema from data file " + hFilePath + ". File does not exist.");
+    }
+
+    CacheConfig cacheConfig = new CacheConfig(fs.getConf());
+    HoodieHFileReader<IndexedRecord> hFileReader = new HoodieHFileReader<>(fs.getConf(), hFilePath, cacheConfig);
+
+    return convertAvroSchemaToParquet(hFileReader.getSchema());

Review comment:
       @nsivabalan @manojpec Is this the most efficient way of reading the schema from a HFile?

##########
File path: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolver.scala
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.avro.Schema
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.avro.model.HoodieMetadataRecord
+import org.apache.hudi.common.model._
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.testutils.DataSourceTestUtils
+import org.apache.spark.SparkContext
+import org.apache.spark.sql._
+import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.CsvSource
+
+import scala.collection.JavaConverters
+
+/**
+ * Test suite for SparkSqlWriter class.

Review comment:
       nit: fix scaladocs.

##########
File path: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolver.scala
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.avro.Schema
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.avro.model.HoodieMetadataRecord
+import org.apache.hudi.common.model._
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.testutils.DataSourceTestUtils
+import org.apache.spark.SparkContext
+import org.apache.spark.sql._
+import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.CsvSource
+
+import scala.collection.JavaConverters
+
+/**
+ * Test suite for SparkSqlWriter class.
+ */
+class TestTableSchemaResolver {

Review comment:
       put this class in the same package `org.apache.hudi.common.table`?

##########
File path: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolver.scala
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.avro.Schema
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.avro.model.HoodieMetadataRecord
+import org.apache.hudi.common.model._
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.testutils.DataSourceTestUtils
+import org.apache.spark.SparkContext
+import org.apache.spark.sql._
+import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.CsvSource
+
+import scala.collection.JavaConverters
+
+/**
+ * Test suite for SparkSqlWriter class.
+ */
+class TestTableSchemaResolver {

Review comment:
       It's good that you add functional tests for the TableSchemaResolver.  Could you annotate it as `@Tag("functional")` and rename it to `TestTableSchemaResolverWithSparkSQL`?
   
   Besides this, could you add unit tests in another java class for schema read method for different file formats?  Or do you think this is not necessary once `BaseFileUtils` is reused?

##########
File path: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolver.scala
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.avro.Schema
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.avro.model.HoodieMetadataRecord
+import org.apache.hudi.common.model._
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.testutils.DataSourceTestUtils
+import org.apache.spark.SparkContext
+import org.apache.spark.sql._
+import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.CsvSource
+
+import scala.collection.JavaConverters
+
+/**
+ * Test suite for SparkSqlWriter class.
+ */
+class TestTableSchemaResolver {
+  var spark: SparkSession = _
+  var sqlContext: SQLContext = _
+  var sc: SparkContext = _
+  var tempPath: java.nio.file.Path = _
+  var tempBootStrapPath: java.nio.file.Path = _
+  var hoodieFooTableName = "hoodie_foo_tbl"
+  var tempBasePath: String = _
+  var commonTableModifier: Map[String, String] = Map()
+  case class StringLongTest(uuid: String, ts: Long)
+
+  /**
+   * Setup method running before each test.
+   */
+  @BeforeEach
+  def setUp(): Unit = {
+    initSparkContext()
+    tempPath = java.nio.file.Files.createTempDirectory("hoodie_test_path")
+    tempBootStrapPath = java.nio.file.Files.createTempDirectory("hoodie_test_bootstrap")
+    tempBasePath = tempPath.toAbsolutePath.toString
+    commonTableModifier = getCommonParams(tempPath, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name())
+  }
+
+  /**
+   * Tear down method running after each test.
+   */
+  @AfterEach
+  def tearDown(): Unit = {
+    cleanupSparkContexts()
+    FileUtils.deleteDirectory(tempPath.toFile)
+    FileUtils.deleteDirectory(tempBootStrapPath.toFile)
+  }
+
+  /**
+   * Utility method for initializing the spark context.
+   */
+  def initSparkContext(): Unit = {
+    spark = SparkSession.builder()
+      .appName(hoodieFooTableName)
+      .master("local[2]")
+      .withExtensions(new HoodieSparkSessionExtension)
+      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+      .getOrCreate()
+    sc = spark.sparkContext
+    sc.setLogLevel("ERROR")
+    sqlContext = spark.sqlContext
+  }
+
+  /**
+   * Utility method for cleaning up spark resources.
+   */
+  def cleanupSparkContexts(): Unit = {
+    if (sqlContext != null) {
+      sqlContext.clearCache();
+      sqlContext = null;
+    }
+    if (sc != null) {
+      sc.stop()
+      sc = null
+    }
+    if (spark != null) {
+      spark.close()
+    }
+  }
+
+  /**
+   * Utility method for creating common params for writer.
+   *
+   * @param path               Path for hoodie table
+   * @param hoodieFooTableName Name of hoodie table
+   * @param tableType          Type of table
+   * @return                   Map of common params
+   */
+  def getCommonParams(path: java.nio.file.Path, hoodieFooTableName: String, tableType: String): Map[String, String] = {
+    Map("path" -> path.toAbsolutePath.toString,
+      HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
+      "hoodie.insert.shuffle.parallelism" -> "1",
+      "hoodie.upsert.shuffle.parallelism" -> "1",
+      DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
+      DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+      DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.SimpleKeyGenerator")
+  }
+
+  /**
+   * Utility method for converting list of Row to list of Seq.
+   *
+   * @param inputList list of Row
+   * @return list of Seq
+   */
+  def convertRowListToSeq(inputList: java.util.List[Row]): Seq[Row] =
+    JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq
+
+  @Test
+  def testTableSchemaResolverInMetadataTable(): Unit = {
+    val schema = DataSourceTestUtils.getStructTypeExampleSchema
+    //create a new table
+    val tableName = hoodieFooTableName
+    val fooTableModifier = Map("path" -> tempPath.toAbsolutePath.toString,
+      HoodieWriteConfig.TBL_NAME.key -> tableName,
+      "hoodie.avro.schema" -> schema.toString(),
+      "hoodie.insert.shuffle.parallelism" -> "1",
+      "hoodie.upsert.shuffle.parallelism" -> "1",
+      DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+      DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.SimpleKeyGenerator",
+      "hoodie.metadata.compact.max.delta.commits" -> "2",
+      HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key -> "true"
+    )
+
+    // generate the inserts
+    val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
+    val records = DataSourceTestUtils.generateRandomRows(10)
+    val recordsSeq = convertRowListToSeq(records)
+    val df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
+    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableModifier, df1)
+
+    // do update
+    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df1)
+
+    val metadataTablePath = tempPath.toAbsolutePath.toString + "/.hoodie/metadata"
+    val metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(metadataTablePath)
+      .setConf(spark.sessionState.newHadoopConf())
+      .build()
+
+    // Delete latest metadata table deltacommit
+    // Get schema from metadata table hfile format base file.
+    val latestInstant = metaClient.getActiveTimeline.getCommitsTimeline.getReverseOrderedInstants.findFirst()
+    val path = new Path(metadataTablePath + "/.hoodie", latestInstant.get().getFileName)
+    val fs = path.getFileSystem(new Configuration())
+    fs.delete(path, false)
+    metaClient.reloadActiveTimeline()
+
+    var ori: Exception = null
+    try {
+      val schemaFromData = new TableSchemaResolver(metaClient).getTableAvroSchemaFromDataFile
+      val structFromData = AvroConversionUtils.convertAvroSchemaToStructType(HoodieAvroUtils.removeMetadataFields(schemaFromData))
+      val schemeDesign = new Schema.Parser().parse(HoodieMetadataRecord.getClassSchema.toString())
+      val structDesign = AvroConversionUtils.convertAvroSchemaToStructType(schemeDesign)
+      assertEquals(structFromData, structDesign)
+    } catch {
+      case e: Exception => ori = e;
+    }
+    assert(ori == null)
+  }
+
+  @ParameterizedTest
+  @CsvSource(Array("COPY_ON_WRITE,parquet","COPY_ON_WRITE,orc","MERGE_ON_READ,parquet","MERGE_ON_READ,orc"))

Review comment:
       add one for `MERGE_ON_READ,hfile` as well to be explicit?  I know with metadata table enabled, HFile is going to be tested, yet it's still better to have a hfile test for data table.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org