You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by ud...@apache.org on 2021/06/17 11:18:44 UTC

[hudi] branch master updated: [HUDI-1879] Fix RO Tables Returning Snapshot Result (#2925)

This is an automated email from the ASF dual-hosted git repository.

uditme pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ad53cf4  [HUDI-1879] Fix RO Tables Returning Snapshot Result (#2925)
ad53cf4 is described below

commit ad53cf450ef01806ff8b2cfe8ff76fa350a7b4c5
Author: pengzhiwei <pe...@icloud.com>
AuthorDate: Thu Jun 17 19:18:21 2021 +0800

    [HUDI-1879] Fix RO Tables Returning Snapshot Result (#2925)
---
 .../scala/org/apache/hudi/DataSourceOptions.scala  |  1 -
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  9 ++-
 .../functional/HoodieSparkSqlWriterSuite.scala     |  6 +-
 .../java/org/apache/hudi/hive/HiveSyncTool.java    | 32 +++++++--
 .../org/apache/hudi/hive/util/ConfigUtils.java     |  6 ++
 .../org/apache/hudi/hive/TestHiveSyncTool.java     | 75 +++++++++++++++++++++-
 .../apache/hudi/hive/testutils/HiveTestUtil.java   |  3 +-
 7 files changed, 119 insertions(+), 13 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 4643da5..89faa3b 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -23,7 +23,6 @@ import org.apache.hudi.common.model.WriteOperationType
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.hive.HiveSyncTool
 import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
-import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config
 import org.apache.hudi.keygen.{CustomKeyGenerator, SimpleKeyGenerator}
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
 import org.apache.log4j.LogManager
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index f992a97..5d6ebd6 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -437,7 +437,14 @@ object HoodieSparkSqlWriter {
       DataSourceWriteOptions.DEFAULT_HIVE_SYNC_AS_DATA_SOURCE_TABLE).toBoolean
     if (syncAsDtaSourceTable) {
       hiveSyncConfig.tableProperties = parameters.getOrElse(HIVE_TABLE_PROPERTIES, null)
-      hiveSyncConfig.serdeProperties = createSqlTableSerdeProperties(parameters, basePath.toString)
+      val serdePropText = createSqlTableSerdeProperties(parameters, basePath.toString)
+      val serdeProp = ConfigUtils.toMap(serdePropText)
+      serdeProp.put(ConfigUtils.SPARK_QUERY_TYPE_KEY, DataSourceReadOptions.QUERY_TYPE_OPT_KEY)
+      serdeProp.put(ConfigUtils.SPARK_QUERY_AS_RO_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+      serdeProp.put(ConfigUtils.SPARK_QUERY_AS_RT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+
+      hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProp)
+
     }
     hiveSyncConfig
   }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
index 10141fb..cbae43a 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
@@ -570,8 +570,10 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
       "{\"name\":\"_row_key\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}," +
       "{\"name\":\"ts\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}," +
       "{\"name\":\"partition\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}")(hiveSyncConfig.tableProperties)
-
-    assertResult("path=/tmp/hoodie_test")(hiveSyncConfig.serdeProperties)
+    assertResult("path=/tmp/hoodie_test\n" +
+      "spark.query.type.key=hoodie.datasource.query.type\n" +
+      "spark.query.as.rt.key=snapshot\n" +
+      "spark.query.as.ro.key=read_optimized")(hiveSyncConfig.serdeProperties)
   }
 
   test("Test build sync config for skip Ro Suffix vals") {
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index 18d133b..0dbe97f 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -106,13 +106,13 @@ public class HiveSyncTool extends AbstractSyncTool {
       if (hoodieHiveClient != null) {
         switch (hoodieHiveClient.getTableType()) {
           case COPY_ON_WRITE:
-            syncHoodieTable(snapshotTableName, false);
+            syncHoodieTable(snapshotTableName, false, false);
             break;
           case MERGE_ON_READ:
             // sync a RO table for MOR
-            syncHoodieTable(roTableName.get(), false);
+            syncHoodieTable(roTableName.get(), false, true);
             // sync a RT table for MOR
-            syncHoodieTable(snapshotTableName, true);
+            syncHoodieTable(snapshotTableName, true, false);
             break;
           default:
             LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
@@ -128,7 +128,8 @@ public class HiveSyncTool extends AbstractSyncTool {
     }
   }
 
-  private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat) {
+  private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
+                               boolean readAsOptimized) {
     LOG.info("Trying to sync hoodie table " + tableName + " with base path " + hoodieHiveClient.getBasePath()
         + " of type " + hoodieHiveClient.getTableType());
 
@@ -152,7 +153,7 @@ public class HiveSyncTool extends AbstractSyncTool {
     // Get the parquet schema for this table looking at the latest commit
     MessageType schema = hoodieHiveClient.getDataSchema();
     // Sync schema if needed
-    syncSchema(tableName, tableExists, useRealtimeInputFormat, schema);
+    syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema);
 
     LOG.info("Schema sync complete. Syncing partitions for " + tableName);
     // Get the last time we successfully synced partitions
@@ -177,7 +178,8 @@ public class HiveSyncTool extends AbstractSyncTool {
    * @param tableExists - does table exist
    * @param schema - extracted schema
    */
-  private void syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat, MessageType schema) {
+  private void syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat,
+                          boolean readAsOptimized, MessageType schema) {
     // Check and sync schema
     if (!tableExists) {
       LOG.info("Hive table " + tableName + " is not found. Creating it");
@@ -194,11 +196,27 @@ public class HiveSyncTool extends AbstractSyncTool {
       String outputFormatClassName = HoodieInputFormatUtils.getOutputFormatClassName(baseFileFormat);
       String serDeFormatClassName = HoodieInputFormatUtils.getSerDeClassName(baseFileFormat);
 
+      Map<String, String> serdeProperties = ConfigUtils.toMap(cfg.serdeProperties);
+
+      // The serdeProperties is non-empty only for spark sync meta data currently.
+      if (!serdeProperties.isEmpty()) {
+        String queryTypeKey = serdeProperties.remove(ConfigUtils.SPARK_QUERY_TYPE_KEY);
+        String queryAsROKey = serdeProperties.remove(ConfigUtils.SPARK_QUERY_AS_RO_KEY);
+        String queryAsRTKey = serdeProperties.remove(ConfigUtils.SPARK_QUERY_AS_RT_KEY);
+
+        if (queryTypeKey != null && queryAsROKey != null && queryAsRTKey != null) {
+          if (readAsOptimized) { // read optimized
+            serdeProperties.put(queryTypeKey, queryAsROKey);
+          } else { // read snapshot
+            serdeProperties.put(queryTypeKey, queryAsRTKey);
+          }
+        }
+      }
       // Custom serde will not work with ALTER TABLE REPLACE COLUMNS
       // https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive
       // /ql/exec/DDLTask.java#L3488
       hoodieHiveClient.createTable(tableName, schema, inputFormatClassName,
-          outputFormatClassName, serDeFormatClassName, ConfigUtils.toMap(cfg.serdeProperties), ConfigUtils.toMap(cfg.tableProperties));
+          outputFormatClassName, serDeFormatClassName, serdeProperties, ConfigUtils.toMap(cfg.tableProperties));
     } else {
       // Check if the table schema has evolved
       Map<String, String> tableSchema = hoodieHiveClient.getTableSchema(tableName);
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ConfigUtils.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ConfigUtils.java
index 8c9dfb6..b8745b6 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ConfigUtils.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ConfigUtils.java
@@ -24,6 +24,12 @@ import org.apache.hudi.common.util.StringUtils;
 
 public class ConfigUtils {
 
+  public static final String SPARK_QUERY_TYPE_KEY = "spark.query.type.key";
+
+  public static final String SPARK_QUERY_AS_RO_KEY = "spark.query.as.ro.key";
+
+  public static final String SPARK_QUERY_AS_RT_KEY = "spark.query.as.rt.key";
+
   /**
    * Convert the key-value config to a map.The format of the config
    * is a key-value pair just like "k1=v1\nk2=v2\nk3=v3".
diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index 118ad12..4324a64 100644
--- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -264,11 +264,15 @@ public class TestHiveSyncTool {
 
   @ParameterizedTest
   @MethodSource({"useJdbcAndSchemaFromCommitMetadata"})
-  public void testSyncWithProperties(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception {
+  public void testSyncCOWTableWithProperties(boolean useJdbc,
+                                             boolean useSchemaFromCommitMetadata) throws Exception {
     HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig;
     Map<String, String> serdeProperties = new HashMap<String, String>() {
       {
         put("path", hiveSyncConfig.basePath);
+        put(ConfigUtils.SPARK_QUERY_TYPE_KEY, "hoodie.datasource.query.type");
+        put(ConfigUtils.SPARK_QUERY_AS_RO_KEY, "read_optimized");
+        put(ConfigUtils.SPARK_QUERY_AS_RT_KEY, "snapshot");
       }
     };
 
@@ -304,10 +308,79 @@ public class TestHiveSyncTool {
     assertTrue(results.get(results.size() - 1).startsWith("transient_lastDdlTime"));
 
     results.clear();
+    // validate serde properties
     hiveDriver.run("SHOW CREATE TABLE " + dbTableName);
     hiveDriver.getResults(results);
     String ddl = String.join("\n", results);
     assertTrue(ddl.contains("'path'='" + hiveSyncConfig.basePath + "'"));
+    assertTrue(ddl.contains("'hoodie.datasource.query.type'='snapshot'"));
+  }
+
+  @ParameterizedTest
+  @MethodSource({"useJdbcAndSchemaFromCommitMetadata"})
+  public void testSyncMORTableWithProperties(boolean useJdbc,
+                                             boolean useSchemaFromCommitMetadata) throws Exception {
+    HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig;
+    Map<String, String> serdeProperties = new HashMap<String, String>() {
+      {
+        put("path", hiveSyncConfig.basePath);
+        put(ConfigUtils.SPARK_QUERY_TYPE_KEY, "hoodie.datasource.query.type");
+        put(ConfigUtils.SPARK_QUERY_AS_RO_KEY, "read_optimized");
+        put(ConfigUtils.SPARK_QUERY_AS_RT_KEY, "snapshot");
+      }
+    };
+
+    Map<String, String> tableProperties = new HashMap<String, String>() {
+      {
+        put("tp_0", "p0");
+        put("tp_1", "p1");
+      }
+    };
+    hiveSyncConfig.useJdbc = useJdbc;
+    hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProperties);
+    hiveSyncConfig.tableProperties = ConfigUtils.configToString(tableProperties);
+    String instantTime = "100";
+    String deltaCommitTime = "101";
+    HiveTestUtil.createMORTable(instantTime, deltaCommitTime, 5, true,
+        useSchemaFromCommitMetadata);
+
+    HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+    tool.syncHoodieTable();
+
+    String roTableName = hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
+    String rtTableName = hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
+
+    String[] tableNames = new String[] {roTableName, rtTableName};
+    String[] expectQueryTypes = new String[] {"read_optimized", "snapshot"};
+
+    SessionState.start(HiveTestUtil.getHiveConf());
+    Driver hiveDriver = new org.apache.hadoop.hive.ql.Driver(HiveTestUtil.getHiveConf());
+
+    for (int i = 0;i < 2; i++) {
+      String dbTableName = hiveSyncConfig.databaseName + "." + tableNames[i];
+      String expectQueryType = expectQueryTypes[i];
+
+      hiveDriver.run("SHOW TBLPROPERTIES " + dbTableName);
+      List<String> results = new ArrayList<>();
+      hiveDriver.getResults(results);
+
+      String tblPropertiesWithoutDdlTime = String.join("\n",
+          results.subList(0, results.size() - 1));
+      assertEquals(
+          "EXTERNAL\tTRUE\n"
+          + "last_commit_time_sync\t101\n"
+          + "tp_0\tp0\n"
+          + "tp_1\tp1", tblPropertiesWithoutDdlTime);
+      assertTrue(results.get(results.size() - 1).startsWith("transient_lastDdlTime"));
+
+      results.clear();
+      // validate serde properties
+      hiveDriver.run("SHOW CREATE TABLE " + dbTableName);
+      hiveDriver.getResults(results);
+      String ddl = String.join("\n", results);
+      assertTrue(ddl.contains("'path'='" + hiveSyncConfig.basePath + "'"));
+      assertTrue(ddl.contains("'hoodie.datasource.query.type'='" + expectQueryType + "'"));
+    }
   }
 
   @ParameterizedTest
diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
index 74f28cc..1d6bfb4 100644
--- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
+++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
@@ -188,7 +188,8 @@ public class HiveTestUtil {
     DateTime dateTime = DateTime.now();
     HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true,
         useSchemaFromCommitMetadata, dateTime, commitTime);
-    createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
+    createdTablesSet
+      .add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE);
     createdTablesSet
         .add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE);
     HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata();