You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/01/11 05:31:38 UTC

[hudi] 02/06: [HUDI-3112] Fix KafkaConnect cannot sync to Hive Problem (#4458)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.10.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 2b98d909886c34a8a64c520f459cdf9a1a3a9c6c
Author: Thinking Chen <cd...@hotmail.com>
AuthorDate: Mon Jan 10 07:31:57 2022 +0800

    [HUDI-3112] Fix KafkaConnect cannot sync to Hive Problem (#4458)
---
 .../hudi/connect/utils/KafkaConnectUtils.java      | 31 ++++++++++++++++++++++
 .../hudi/connect/writers/KafkaConnectConfigs.java  | 16 +++++++++++
 .../writers/KafkaConnectTransactionServices.java   | 25 ++++++++++-------
 3 files changed, 62 insertions(+), 10 deletions(-)

diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
index cc37de2..6a38430 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
@@ -32,6 +32,8 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.connect.ControlMessage;
 import org.apache.hudi.connect.writers.KafkaConnectConfigs;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hive.HiveSyncConfig;
+import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.CustomAvroKeyGenerator;
 import org.apache.hudi.keygen.CustomKeyGenerator;
@@ -57,6 +59,7 @@ import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -266,4 +269,32 @@ public class KafkaConnectUtils {
     ControlMessage.ConnectWriteStatus connectWriteStatus = participantInfo.getWriteStatus();
     return SerializationUtils.deserialize(connectWriteStatus.getSerializedWriteStatus().toByteArray());
   }
+
+  /**
+   * Build Hive Sync Config
+   * Note: This method is a temporary solution.
+   * Future solutions can be referred to: https://issues.apache.org/jira/browse/HUDI-3199
+   */
+  public static HiveSyncConfig buildSyncConfig(TypedProperties props, String tableBasePath) {
+    HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
+    hiveSyncConfig.basePath = tableBasePath;
+    hiveSyncConfig.usePreApacheInputFormat = props.getBoolean(KafkaConnectConfigs.HIVE_USE_PRE_APACHE_INPUT_FORMAT, false);
+    hiveSyncConfig.databaseName = props.getString(KafkaConnectConfigs.HIVE_DATABASE, "default");
+    hiveSyncConfig.tableName = props.getString(KafkaConnectConfigs.HIVE_TABLE, "");
+    hiveSyncConfig.hiveUser = props.getString(KafkaConnectConfigs.HIVE_USER, "");
+    hiveSyncConfig.hivePass = props.getString(KafkaConnectConfigs.HIVE_PASS, "");
+    hiveSyncConfig.jdbcUrl = props.getString(KafkaConnectConfigs.HIVE_URL, "");
+    hiveSyncConfig.partitionFields = props.getStringList(KafkaConnectConfigs.HIVE_PARTITION_FIELDS, ",", Collections.emptyList());
+    hiveSyncConfig.partitionValueExtractorClass =
+            props.getString(KafkaConnectConfigs.HIVE_PARTITION_EXTRACTOR_CLASS, SlashEncodedDayPartitionValueExtractor.class.getName());
+    hiveSyncConfig.useJdbc = props.getBoolean(KafkaConnectConfigs.HIVE_USE_JDBC, true);
+    if (props.containsKey(KafkaConnectConfigs.HIVE_SYNC_MODE)) {
+      hiveSyncConfig.syncMode = props.getString(KafkaConnectConfigs.HIVE_SYNC_MODE);
+    }
+    hiveSyncConfig.autoCreateDatabase = props.getBoolean(KafkaConnectConfigs.HIVE_AUTO_CREATE_DATABASE, true);
+    hiveSyncConfig.ignoreExceptions = props.getBoolean(KafkaConnectConfigs.HIVE_IGNORE_EXCEPTIONS, false);
+    hiveSyncConfig.skipROSuffix = props.getBoolean(KafkaConnectConfigs.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE, false);
+    hiveSyncConfig.supportTimestamp = props.getBoolean(KafkaConnectConfigs.HIVE_SUPPORT_TIMESTAMP_TYPE, false);
+    return hiveSyncConfig;
+  }
 }
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
index 1200779..e4543c6 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
@@ -164,6 +164,22 @@ public class KafkaConnectConfigs extends HoodieConfig {
     return getString(HADOOP_HOME);
   }
 
+  public static final String HIVE_USE_PRE_APACHE_INPUT_FORMAT = "hoodie.datasource.hive_sync.use_pre_apache_input_format";
+  public static final String HIVE_DATABASE = "hoodie.datasource.hive_sync.database";
+  public static final String HIVE_TABLE = "hoodie.datasource.hive_sync.table";
+  public static final String HIVE_USER = "hoodie.datasource.hive_sync.username";
+  public static final String HIVE_PASS = "hoodie.datasource.hive_sync.password";
+  public static final String HIVE_URL = "hoodie.datasource.hive_sync.jdbcurl";
+  public static final String HIVE_PARTITION_FIELDS = "hoodie.datasource.hive_sync.partition_fields";
+  public static final String HIVE_PARTITION_EXTRACTOR_CLASS = "hoodie.datasource.hive_sync.partition_extractor_class";
+  public static final String HIVE_USE_JDBC = "hoodie.datasource.hive_sync.use_jdbc";
+  public static final String HIVE_SYNC_MODE = "hoodie.datasource.hive_sync.mode";
+  public static final String HIVE_AUTO_CREATE_DATABASE = "hoodie.datasource.hive_sync.auto_create_database";
+  public static final String HIVE_IGNORE_EXCEPTIONS = "hoodie.datasource.hive_sync.ignore_exceptions";
+  public static final String HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE = "hoodie.datasource.hive_sync.skip_ro_suffix";
+  public static final String HIVE_SUPPORT_TIMESTAMP_TYPE = "hoodie.datasource.hive_sync.support_timestamp";
+  public static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
+
   public static class Builder {
 
     protected final KafkaConnectConfigs connectConfigs = new KafkaConnectConfigs();
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
index cca738a..dae19cc 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.connect.writers;
 
-import org.apache.hudi.DataSourceUtils;
 import org.apache.hudi.client.HoodieJavaWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieJavaEngineContext;
@@ -32,12 +31,14 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.connect.transaction.TransactionCoordinator;
 import org.apache.hudi.connect.utils.KafkaConnectUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HiveSyncTool;
+import org.apache.hudi.hive.ddl.HiveSyncMode;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
 import org.apache.hudi.sync.common.AbstractSyncTool;
@@ -163,9 +164,9 @@ public class KafkaConnectTransactionServices implements ConnectTransactionServic
   }
 
   private void syncMeta() {
-    Set<String> syncClientToolClasses = new HashSet<>(
-        Arrays.asList(connectConfigs.getMetaSyncClasses().split(",")));
     if (connectConfigs.isMetaSyncEnabled()) {
+      Set<String> syncClientToolClasses = new HashSet<>(
+          Arrays.asList(connectConfigs.getMetaSyncClasses().split(",")));
       for (String impl : syncClientToolClasses) {
         impl = impl.trim();
         switch (impl) {
@@ -185,16 +186,20 @@ public class KafkaConnectTransactionServices implements ConnectTransactionServic
   }
 
   private void syncHive() {
-    HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(
-        new TypedProperties(connectConfigs.getProps()),
-        tableBasePath,
-        "PARQUET");
+    HiveSyncConfig hiveSyncConfig = KafkaConnectUtils.buildSyncConfig(new TypedProperties(connectConfigs.getProps()), tableBasePath);
+    String url;
+    if (!StringUtils.isNullOrEmpty(hiveSyncConfig.syncMode) && HiveSyncMode.of(hiveSyncConfig.syncMode) == HiveSyncMode.HMS) {
+      url = hadoopConf.get(KafkaConnectConfigs.HIVE_METASTORE_URIS);
+    } else {
+      url = hiveSyncConfig.jdbcUrl;
+    }
+
     LOG.info("Syncing target hoodie table with hive table("
         + hiveSyncConfig.tableName
-        + "). Hive metastore URL :"
-        + hiveSyncConfig.jdbcUrl
+        + "). Hive URL :"
+        + url
         + ", basePath :" + tableBasePath);
-    LOG.info("Hive Sync Conf => " + hiveSyncConfig.toString());
+    LOG.info("Hive Sync Conf => " + hiveSyncConfig);
     FileSystem fs = FSUtils.getFs(tableBasePath, hadoopConf);
     HiveConf hiveConf = new HiveConf();
     hiveConf.addResource(fs.getConf());