You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/02/07 14:28:34 UTC

[hudi] branch master updated: [HUDI-2941] Show _hoodie_operation in spark sql results (#4649)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 773b317  [HUDI-2941] Show _hoodie_operation in spark sql results (#4649)
773b317 is described below

commit 773b3179834d6e0d2912a7a9d4b3b3873d0c2a21
Author: ForwardXu <fo...@gmail.com>
AuthorDate: Mon Feb 7 22:28:13 2022 +0800

    [HUDI-2941] Show _hoodie_operation in spark sql results (#4649)
---
 .../java/org/apache/hudi/table/HoodieTable.java    |  4 +-
 .../hudi/table/action/compact/HoodieCompactor.java |  4 +-
 .../hudi/common/table/TableSchemaResolver.java     | 35 +++++++++-------
 .../org/apache/hudi/table/HoodieTableSource.java   |  4 +-
 .../apache/hudi/source/TestStreamReadOperator.java |  4 +-
 .../hudi/MergeOnReadIncrementalRelation.scala      |  4 +-
 .../apache/hudi/MergeOnReadSnapshotRelation.scala  |  4 +-
 .../apache/spark/sql/hudi/TestInsertTable.scala    | 48 +++++++++++++++++++++-
 .../hudi/sync/common/AbstractSyncHoodieClient.java | 17 ++------
 .../apache/hudi/utilities/HoodieClusteringJob.java |  4 +-
 10 files changed, 85 insertions(+), 43 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 8c958e9..f19b6aa 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -679,9 +679,9 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
     Schema writerSchema;
     boolean isValid;
     try {
-      TableSchemaResolver schemaUtil = new TableSchemaResolver(getMetaClient());
+      TableSchemaResolver schemaResolver = new TableSchemaResolver(getMetaClient());
       writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
-      tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields());
+      tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaResolver.getTableAvroSchemaWithoutMetadataFields());
       isValid = TableSchemaResolver.isSchemaCompatible(tableSchema, writerSchema);
     } catch (Exception e) {
       throw new HoodieException("Failed to read schema/check compatibility for base path " + metaClient.getBasePath(), e);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index 73e1413..a3b7a7f 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -111,13 +111,13 @@ public abstract class HoodieCompactor<T extends HoodieRecordPayload, I, K, O> im
     table.getMetaClient().reloadActiveTimeline();
 
     HoodieTableMetaClient metaClient = table.getMetaClient();
-    TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
+    TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
 
     // Here we firstly use the table schema as the reader schema to read
     // log file.That is because in the case of MergeInto, the config.getSchema may not
     // the same with the table schema.
     try {
-      Schema readerSchema = schemaUtil.getTableAvroSchema(false);
+      Schema readerSchema = schemaResolver.getTableAvroSchema(false);
       config.setSchema(readerSchema.toString());
     } catch (Exception e) {
       // If there is no commit in the table, just ignore the exception.
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index a707748..0607601 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -21,14 +21,13 @@ package org.apache.hudi.common.table;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.SchemaCompatibility;
-
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
@@ -42,10 +41,8 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.InvalidTableException;
-
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-
 import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.ParquetFileReader;
@@ -61,15 +58,11 @@ public class TableSchemaResolver {
 
   private static final Logger LOG = LogManager.getLogger(TableSchemaResolver.class);
   private final HoodieTableMetaClient metaClient;
-  private final boolean withOperationField;
+  private final boolean hasOperationField;
 
   public TableSchemaResolver(HoodieTableMetaClient metaClient) {
-    this(metaClient, false);
-  }
-
-  public TableSchemaResolver(HoodieTableMetaClient metaClient, boolean withOperationField) {
     this.metaClient = metaClient;
-    this.withOperationField = withOperationField;
+    this.hasOperationField = hasOperationField();
   }
 
   /**
@@ -122,7 +115,7 @@ public class TableSchemaResolver {
     }
   }
 
-  public Schema getTableAvroSchemaFromDataFile() throws Exception {
+  public Schema getTableAvroSchemaFromDataFile() {
     return convertParquetSchemaToAvro(getTableParquetSchemaFromDataFile());
   }
 
@@ -151,7 +144,7 @@ public class TableSchemaResolver {
     Option<Schema> schemaFromTableConfig = metaClient.getTableConfig().getTableCreateSchema();
     if (schemaFromTableConfig.isPresent()) {
       if (includeMetadataFields) {
-        return HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), withOperationField);
+        return HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), hasOperationField);
       } else {
         return schemaFromTableConfig.get();
       }
@@ -176,7 +169,7 @@ public class TableSchemaResolver {
     }
     Option<Schema> schemaFromTableConfig = metaClient.getTableConfig().getTableCreateSchema();
     if (schemaFromTableConfig.isPresent()) {
-      Schema schema = HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), withOperationField);
+      Schema schema = HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), hasOperationField);
       return convertAvroSchemaToParquet(schema);
     }
     return getTableParquetSchemaFromDataFile();
@@ -244,7 +237,7 @@ public class TableSchemaResolver {
 
       Schema schema = new Schema.Parser().parse(existingSchemaStr);
       if (includeMetadataFields) {
-        schema = HoodieAvroUtils.addMetadataFields(schema, withOperationField);
+        schema = HoodieAvroUtils.addMetadataFields(schema, hasOperationField);
       }
       return Option.of(schema);
     } catch (Exception e) {
@@ -477,4 +470,18 @@ public class TableSchemaResolver {
     }
     return null;
   }
+
+  public boolean isHasOperationField() {
+    return hasOperationField;
+  }
+
+  private boolean hasOperationField() {
+    try {
+      Schema tableAvroSchema = getTableAvroSchemaFromDataFile();
+      return tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD) != null;
+    } catch (Exception e) {
+      LOG.warn("Failed to read operation field from avro schema", e);
+      return false;
+    }
+  }
 }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 259c2e4..3efd1d5 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -452,8 +452,8 @@ public class HoodieTableSource implements
   @VisibleForTesting
   public Schema getTableAvroSchema() {
     try {
-      TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
-      return schemaUtil.getTableAvroSchema();
+      TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
+      return schemaResolver.getTableAvroSchema();
     } catch (Throwable e) {
       // table exists but has no written data
       LOG.warn("Get table avro schema error, use schema from the DDL instead", e);
diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
index 911c685..db45a75 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
@@ -245,10 +245,10 @@ public class TestStreamReadOperator {
     final List<String> partitionKeys = Collections.singletonList("partition");
 
     // This input format is used to opening the emitted split.
-    TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
+    TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
     final Schema tableAvroSchema;
     try {
-      tableAvroSchema = schemaUtil.getTableAvroSchema();
+      tableAvroSchema = schemaResolver.getTableAvroSchema();
     } catch (Exception e) {
       throw new HoodieException("Get table avro schema error", e);
     }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 9b96abd..01b480a 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -74,8 +74,8 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext,
     optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, lastInstant.getTimestamp))
   log.debug(s"${commitsTimelineToReturn.getInstants.iterator().toList.map(f => f.toString).mkString(",")}")
   private val commitsToReturn = commitsTimelineToReturn.getInstants.iterator().toList
-  private val schemaUtil = new TableSchemaResolver(metaClient)
-  private val tableAvroSchema = schemaUtil.getTableAvroSchema
+  private val schemaResolver = new TableSchemaResolver(metaClient)
+  private val tableAvroSchema = schemaResolver.getTableAvroSchema
   private val tableStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
   private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf)
   private val fileIndex = if (commitsToReturn.isEmpty) List() else buildFileIndex()
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index 449d87b..2829b4b 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -65,10 +65,10 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
   private val conf = sqlContext.sparkContext.hadoopConfiguration
   private val jobConf = new JobConf(conf)
   // use schema from latest metadata, if not present, read schema from the data file
-  private val schemaUtil = new TableSchemaResolver(metaClient)
+  private val schemaResolver = new TableSchemaResolver(metaClient)
   private lazy val tableAvroSchema = {
     try {
-      schemaUtil.getTableAvroSchema
+      schemaResolver.getTableAvroSchema
     } catch {
       case _: Throwable => // If there is no commit in the table, we cann't get the schema
         // with schemaUtil, use the userSchema instead.
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index 4d12d98..b186381 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -17,8 +17,12 @@
 
 package org.apache.spark.sql.hudi
 
-import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_NAME, MOR_TABLE_TYPE_OPT_VAL, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD, TABLE_TYPE}
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieDuplicateKeyException
+import org.apache.hudi.keygen.ComplexKeyGenerator
+import org.apache.spark.sql.SaveMode
 
 import java.io.File
 
@@ -582,8 +586,48 @@ class TestInsertTable extends TestHoodieSqlBase {
       checkAnswer(s"select id, name, price, ts from $tableName")(
         Seq(1, "a1", 11.0, 1000)
       )
-
     }
   }
 
+  test("Test For read operation's field") {
+      withTempDir { tmp => {
+        val tableName = generateTableName
+        val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+        import spark.implicits._
+        val day = "2021-08-02"
+        val df = Seq((1, "a1", 10, 1000, day, 12)).toDF("id", "name", "value", "ts", "day", "hh")
+        // Write a table by spark dataframe.
+        df.write.format("hudi")
+          .option(HoodieWriteConfig.TBL_NAME.key, tableName)
+          .option(TABLE_TYPE.key, MOR_TABLE_TYPE_OPT_VAL)
+          .option(RECORDKEY_FIELD.key, "id")
+          .option(PRECOMBINE_FIELD.key, "ts")
+          .option(PARTITIONPATH_FIELD.key, "day,hh")
+          .option(KEYGENERATOR_CLASS_NAME.key, classOf[ComplexKeyGenerator].getName)
+          .option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1")
+          .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1")
+          .option(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key, "true")
+          .mode(SaveMode.Overwrite)
+          .save(tablePath)
+
+        val metaClient = HoodieTableMetaClient.builder()
+          .setBasePath(tablePath)
+          .setConf(spark.sessionState.newHadoopConf())
+          .build()
+
+        assertResult(true)(new TableSchemaResolver(metaClient).isHasOperationField)
+
+        spark.sql(
+          s"""
+             |create table $tableName using hudi
+             |location '${tablePath}'
+             |""".stripMargin)
+
+        // Note: spark sql batch write currently does not write actual content to the operation field
+        checkAnswer(s"select id, _hoodie_operation from $tableName")(
+          Seq(1, null)
+        )
+      }
+    }
+  }
 }
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
index 98b11f2..1815491 100644
--- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.sync.common;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -29,9 +31,6 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.parquet.schema.MessageType;
@@ -149,11 +148,7 @@ public abstract class AbstractSyncHoodieClient {
    */
   public MessageType getDataSchema() {
     try {
-      if (withOperationField) {
-        return new TableSchemaResolver(metaClient, true).getTableParquetSchema();
-      } else {
-        return new TableSchemaResolver(metaClient).getTableParquetSchema();
-      }
+      return new TableSchemaResolver(metaClient).getTableParquetSchema();
     } catch (Exception e) {
       throw new HoodieSyncException("Failed to read data schema", e);
     }
@@ -162,11 +157,7 @@ public abstract class AbstractSyncHoodieClient {
   public boolean isDropPartition() {
     try {
       Option<HoodieCommitMetadata> hoodieCommitMetadata;
-      if (withOperationField) {
-        hoodieCommitMetadata = new TableSchemaResolver(metaClient, true).getLatestCommitMetadata();
-      } else {
-        hoodieCommitMetadata = new TableSchemaResolver(metaClient).getLatestCommitMetadata();
-      }
+      hoodieCommitMetadata = new TableSchemaResolver(metaClient).getLatestCommitMetadata();
 
       if (hoodieCommitMetadata.isPresent()
           && WriteOperationType.DELETE_PARTITION.equals(hoodieCommitMetadata.get().getOperationType())) {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
index a4ee808..b7345a1 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
@@ -189,11 +189,11 @@ public class HoodieClusteringJob {
   }
 
   private String getSchemaFromLatestInstant() throws Exception {
-    TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
+    TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
     if (metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 0) {
       throw new HoodieException("Cannot run clustering without any completed commits");
     }
-    Schema schema = schemaUtil.getTableAvroSchema(false);
+    Schema schema = schemaResolver.getTableAvroSchema(false);
     return schema.toString();
   }