You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2021/08/05 13:57:31 UTC

[hudi] branch master updated: [HUDI-2233] Use HMS To Sync Hive Meta For Spark Sql (#3387)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0dcd6a8  [HUDI-2233] Use HMS To Sync Hive Meta For Spark Sql (#3387)
0dcd6a8 is described below

commit 0dcd6a8fcaff0a8ac2e0fdccbfd14634722185b2
Author: pengzhiwei <pe...@icloud.com>
AuthorDate: Thu Aug 5 21:57:22 2021 +0800

    [HUDI-2233] Use HMS To Sync Hive Meta For Spark Sql (#3387)
---
 .../scala/org/apache/hudi/DataSourceOptions.scala  |  9 ++++-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  2 +-
 .../hudi/command/DeleteHoodieTableCommand.scala    |  5 ++-
 .../command/InsertIntoHoodieTableCommand.scala     |  2 ++
 .../hudi/command/MergeIntoHoodieTableCommand.scala |  2 ++
 .../hudi/command/UpdateHoodieTableCommand.scala    |  2 ++
 .../org/apache/hudi/hive/HoodieHiveClient.java     | 15 ++++----
 .../org/apache/hudi/hive/ddl/HiveSyncMode.java     | 42 ++++++++++++++++++++++
 packaging/hudi-spark-bundle/pom.xml                | 14 --------
 pom.xml                                            |  2 --
 10 files changed, 69 insertions(+), 26 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 3c82c44..b12361b 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -122,7 +122,6 @@ object DataSourceReadOptions {
   * Options supported for writing hoodie tables.
   */
 object DataSourceWriteOptions {
-
   private val log = LogManager.getLogger(DataSourceWriteOptions.getClass)
 
   val BULK_INSERT_OPERATION_OPT_VAL = WriteOperationType.BULK_INSERT.value
@@ -349,9 +348,12 @@ object DataSourceWriteOptions {
     .defaultValue("false")
     .withDocumentation("")
 
+  // We should use HIVE_SYNC_MODE instead of this config from 0.9.0
+  @Deprecated
   val HIVE_USE_JDBC: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.hive_sync.use_jdbc")
     .defaultValue("true")
+    .deprecatedAfter("0.9.0")
     .withDocumentation("Use JDBC when hive synchronization is enabled")
 
   val HIVE_AUTO_CREATE_DATABASE: ConfigProperty[String] = ConfigProperty
@@ -401,6 +403,11 @@ object DataSourceWriteOptions {
     .defaultValue(1000)
     .withDocumentation("The number of partitions one batch when synchronous partitions to hive.")
 
+  val HIVE_SYNC_MODE: ConfigProperty[String] = ConfigProperty
+    .key("hoodie.datasource.hive_sync.mode")
+    .noDefaultValue()
+    .withDocumentation("Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql.")
+
   // Async Compaction - Enabled by default for MOR
   val ASYNC_COMPACT_ENABLE: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.compaction.async.enable")
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index e132471..6213ab8 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -464,7 +464,7 @@ object HoodieSparkSqlWriter {
     hiveSyncConfig.syncAsSparkDataSourceTable =  hoodieConfig.getStringOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE).toBoolean
     hiveSyncConfig.sparkSchemaLengthThreshold = sqlConf.getConf(StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD)
     hiveSyncConfig.createManagedTable = hoodieConfig.getBoolean(HIVE_CREATE_MANAGED_TABLE)
-
+    hiveSyncConfig.syncMode = hoodieConfig.getString(HIVE_SYNC_MODE)
     hiveSyncConfig.serdeProperties = hoodieConfig.getString(HIVE_TABLE_SERDE_PROPERTIES)
     hiveSyncConfig.tableProperties = hoodieConfig.getString(HIVE_TABLE_PROPERTIES)
     hiveSyncConfig
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
index 74fe88b..6c88d89 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
@@ -17,10 +17,12 @@
 
 package org.apache.spark.sql.hudi.command
 
+import org.apache.hudi.DataSourceWriteOptions.OPERATION
 import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
-import org.apache.hudi.DataSourceWriteOptions.{HIVE_STYLE_PARTITIONING, HIVE_SUPPORT_TIMESTAMP, KEYGENERATOR_CLASS, OPERATION, PARTITIONPATH_FIELD, RECORDKEY_FIELD}
+import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
+import org.apache.hudi.hive.ddl.HiveSyncMode
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, SubqueryAlias}
 import org.apache.spark.sql.execution.command.RunnableCommand
@@ -73,6 +75,7 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Runnab
         TABLE_NAME.key -> tableId.table,
         OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
         PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","),
+        HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
         HIVE_SUPPORT_TIMESTAMP.key -> "true",
         HIVE_STYLE_PARTITIONING.key -> "true",
         HoodieWriteConfig.DELETE_PARALLELISM.key -> "200",
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index bbca17f..9782ec5 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -27,6 +27,7 @@ import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
 import org.apache.hudi.hive.MultiPartKeysValueExtractor
+import org.apache.hudi.hive.ddl.HiveSyncMode
 import org.apache.hudi.{HoodieSparkSqlWriter, HoodieWriterUtils}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
@@ -244,6 +245,7 @@ object InsertIntoHoodieTableCommand {
         PARTITIONPATH_FIELD.key -> partitionFields,
         PAYLOAD_CLASS.key -> payloadClassName,
         META_SYNC_ENABLED.key -> enableHive.toString,
+        HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
         HIVE_USE_JDBC.key -> "false",
         HIVE_DATABASE.key -> table.identifier.database.getOrElse("default"),
         HIVE_TABLE.key -> table.identifier.table,
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index ab714c8..5146868 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -23,6 +23,7 @@ import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
 import org.apache.hudi.hive.MultiPartKeysValueExtractor
+import org.apache.hudi.hive.ddl.HiveSyncMode
 import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils, SparkAdapterSupport}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, Cast, EqualTo, Expression, Literal}
@@ -437,6 +438,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
           PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","),
           PAYLOAD_CLASS.key -> classOf[ExpressionPayload].getCanonicalName,
           META_SYNC_ENABLED.key -> enableHive.toString,
+          HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
           HIVE_USE_JDBC.key -> "false",
           HIVE_DATABASE.key -> targetTableDb,
           HIVE_TABLE.key -> targetTableName,
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
index e384c41..f6d1195 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
 import org.apache.hudi.hive.MultiPartKeysValueExtractor
+import org.apache.hudi.hive.ddl.HiveSyncMode
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{Assignment, SubqueryAlias, UpdateTable}
@@ -104,6 +105,7 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo
         OPERATION.key -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
         PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","),
         META_SYNC_ENABLED.key -> enableHive.toString,
+        HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
         HIVE_USE_JDBC.key -> "false",
         HIVE_DATABASE.key -> tableId.database.getOrElse("default"),
         HIVE_TABLE.key -> tableId.table,
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index 36d7c50..4a22bb8 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -22,13 +22,13 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.hive.util.HiveSchemaUtil;
+import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
 import org.apache.hudi.hive.ddl.DDLExecutor;
 import org.apache.hudi.hive.ddl.HMSDDLExecutor;
 import org.apache.hudi.hive.ddl.HiveQueryDDLExecutor;
+import org.apache.hudi.hive.ddl.HiveSyncMode;
 import org.apache.hudi.hive.ddl.JDBCExecutor;
-import org.apache.hudi.hive.util.HiveSchemaUtil;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
-
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -69,14 +69,15 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
     // disable jdbc and depend on metastore client for all hive registrations
     try {
       if (!StringUtils.isNullOrEmpty(cfg.syncMode)) {
-        switch (cfg.syncMode.toLowerCase()) {
-          case "hms":
+        HiveSyncMode syncMode = HiveSyncMode.of(cfg.syncMode);
+        switch (syncMode) {
+          case HMS:
             ddlExecutor = new HMSDDLExecutor(configuration, cfg, fs);
             break;
-          case "hiveql":
+          case HIVEQL:
             ddlExecutor = new HiveQueryDDLExecutor(cfg, fs, configuration);
             break;
-          case "jdbc":
+          case JDBC:
             ddlExecutor = new JDBCExecutor(cfg, fs);
             break;
           default:
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveSyncMode.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveSyncMode.java
new file mode 100644
index 0000000..7e01153
--- /dev/null
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveSyncMode.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.ddl;
+
+import java.util.Locale;
+
+public enum HiveSyncMode {
+  
+  /**
+   * The HMS mode use the hive meta client to sync metadata.
+   */
+  HMS,
+  /**
+   * The HIVEQL mode execute hive ql to sync metadata.
+   */
+  HIVEQL,
+  /**
+   * The JDBC mode use hive jdbc to sync metadata.
+   */
+  JDBC
+  ;
+
+  public static HiveSyncMode of(String syncMode) {
+    return HiveSyncMode.valueOf(syncMode.toUpperCase(Locale.ROOT));
+  }
+}
diff --git a/packaging/hudi-spark-bundle/pom.xml b/packaging/hudi-spark-bundle/pom.xml
index 829e034..9124cfe 100644
--- a/packaging/hudi-spark-bundle/pom.xml
+++ b/packaging/hudi-spark-bundle/pom.xml
@@ -115,8 +115,6 @@
                   <include>org.apache.curator:curator-client</include>
                   <include>org.apache.curator:curator-recipes</include>
                   <include>commons-codec:commons-codec</include>
-                  <include>org.json:json</include>
-                  <include>org.apache.calcite:calcite-core</include>
                 </includes>
               </artifactSet>
               <relocations>
@@ -369,18 +367,6 @@
       <artifactId>curator-recipes</artifactId>
       <version>${zk-curator.version}</version>
     </dependency>
-<!--    import json to fix the crash when sync meta in spark-->
-    <dependency>
-      <groupId>org.json</groupId>
-      <artifactId>json</artifactId>
-      <version>${json.version}</version>
-    </dependency>
-<!--    import calcite to support hive3 & spark 3-->
-    <dependency>
-      <groupId>org.apache.calcite</groupId>
-      <artifactId>calcite-core</artifactId>
-      <version>${calcite.version}</version>
-    </dependency>
     <!-- TODO: Reinvestigate PR 633 -->
   </dependencies>
 
diff --git a/pom.xml b/pom.xml
index 7bd7cca..5672669 100644
--- a/pom.xml
+++ b/pom.xml
@@ -152,8 +152,6 @@
     <presto.bundle.bootstrap.shade.prefix>org.apache.hudi.</presto.bundle.bootstrap.shade.prefix>
     <shadeSources>true</shadeSources>
     <zk-curator.version>2.7.1</zk-curator.version>
-    <json.version>20200518</json.version>
-    <calcite.version>1.16.0</calcite.version>
   </properties>
 
   <scm>