You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/01/07 00:42:30 UTC

[GitHub] [hudi] cdmikechen commented on a change in pull request #4458: [HUDI-3112] Fix KafkaConnect can not sync to Hive Problem

cdmikechen commented on a change in pull request #4458:
URL: https://github.com/apache/hudi/pull/4458#discussion_r779957645



##########
File path: hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
##########
@@ -185,20 +187,50 @@ private void syncMeta() {
   }
 
   private void syncHive() {
-    HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(
-        new TypedProperties(connectConfigs.getProps()),
-        tableBasePath,
-        "PARQUET");
+    HiveSyncConfig hiveSyncConfig = buildSyncConfig(new TypedProperties(connectConfigs.getProps()), tableBasePath);
+    String url;
+    if (!StringUtils.isNullOrEmpty(hiveSyncConfig.syncMode) && HiveSyncMode.of(hiveSyncConfig.syncMode) == HiveSyncMode.HMS) {
+      url = hadoopConf.get(KafkaConnectConfigs.HIVE_METASTORE_URIS);
+    } else {
+      url = hiveSyncConfig.jdbcUrl;
+    }
+
     LOG.info("Syncing target hoodie table with hive table("
         + hiveSyncConfig.tableName
         + "). Hive metastore URL :"
-        + hiveSyncConfig.jdbcUrl
+        + url
         + ", basePath :" + tableBasePath);
-    LOG.info("Hive Sync Conf => " + hiveSyncConfig.toString());
+    LOG.info("Hive Sync Conf => " + hiveSyncConfig);
     FileSystem fs = FSUtils.getFs(tableBasePath, hadoopConf);
     HiveConf hiveConf = new HiveConf();
     hiveConf.addResource(fs.getConf());
     LOG.info("Hive Conf => " + hiveConf.getAllProperties().toString());
     new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable();
   }
+
+  /**
+   * Build Hive Sync Config
+   */
+  public HiveSyncConfig buildSyncConfig(TypedProperties props, String tableBasePath) {

Review comment:
       @yihua 
   I had checked the Hudi project when I modified the codes. Besides hive synchronization in spark, Flink also has the same problem. However, in the flink, they also redeclared a new set of variables to solve the problem. 
   https://github.com/apache/hudi/blob/e9efbdb63c95d2971ac67576783e122a6e271738/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java#L576-L610
   Considering that if unification is a relatively large part of adjustment, it may be a better way to solve it with a new issue. Because there are some Scala logic in hive sync, it cannot be split directly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org