You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/06/01 08:17:43 UTC

[hudi] branch master updated: [HUDI-4174] Add hive conf dir option for flink sink (#5725)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d069b5e57 [HUDI-4174] Add hive conf dir option for flink sink (#5725)
0d069b5e57 is described below

commit 0d069b5e57e96984d508434a9b76c48a282bd45b
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Wed Jun 1 16:17:36 2022 +0800

    [HUDI-4174] Add hive conf dir option for flink sink (#5725)
---
 .../java/org/apache/hudi/configuration/FlinkOptions.java   |  6 ++++++
 .../apache/hudi/configuration/HadoopConfigurations.java    | 14 ++++++++++++++
 .../apache/hudi/sink/StreamWriteOperatorCoordinator.java   | 10 +++++++++-
 .../java/org/apache/hudi/sink/utils/HiveSyncContext.java   |  4 +++-
 4 files changed, 32 insertions(+), 2 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 3de4bd4f75..57cb8daa44 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -769,6 +769,12 @@ public class FlinkOptions extends HoodieConfig {
       .noDefaultValue()
       .withDescription("Serde properties to hive table, the data format is k1=v1\nk2=v2");
 
+  public static final ConfigOption<String> HIVE_SYNC_CONF_DIR = ConfigOptions
+      .key("hive_sync.conf.dir")
+      .stringType()
+      .noDefaultValue()
+      .withDescription("The hive configuration directory, where the hive-site.xml lies in, the file should be put on the client machine");
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/HadoopConfigurations.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/HadoopConfigurations.java
index 72f2031150..d15ef280f5 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/HadoopConfigurations.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/HadoopConfigurations.java
@@ -19,6 +19,8 @@
 package org.apache.hudi.configuration;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.hadoop.fs.Path;
+
 import org.apache.hudi.util.FlinkClientUtil;
 
 import java.util.Map;
@@ -51,4 +53,16 @@ public class HadoopConfigurations {
     options.forEach(hadoopConf::set);
     return hadoopConf;
   }
+
+  /**
+   * Creates a Hive configuration with configured dir path or empty if no Hive conf dir is set.
+   */
+  public static org.apache.hadoop.conf.Configuration getHiveConf(Configuration conf) {
+    String explicitDir = conf.getString(FlinkOptions.HIVE_SYNC_CONF_DIR, System.getenv("HIVE_CONF_DIR"));
+    org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
+    if (explicitDir != null) {
+      hadoopConf.addResource(new Path(explicitDir, "hive-site.xml"));
+    }
+    return hadoopConf;
+  }
 }
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 39976e5ee2..75e8beaef1 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -20,6 +20,7 @@ package org.apache.hudi.sink;
 
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -28,6 +29,7 @@ import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.event.CommitAckEvent;
@@ -82,6 +84,11 @@ public class StreamWriteOperatorCoordinator
    */
   private final Configuration conf;
 
+  /**
+   * Hive config options.
+   */
+  private final SerializableConfiguration hiveConf;
+
   /**
    * Coordinator context.
    */
@@ -160,6 +167,7 @@ public class StreamWriteOperatorCoordinator
     this.conf = conf;
     this.context = context;
     this.parallelism = context.currentParallelism();
+    this.hiveConf = new SerializableConfiguration(HadoopConfigurations.getHiveConf(conf));
   }
 
   @Override
@@ -314,7 +322,7 @@ public class StreamWriteOperatorCoordinator
 
   private void initHiveSync() {
     this.hiveSyncExecutor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
-    this.hiveSyncContext = HiveSyncContext.create(conf);
+    this.hiveSyncContext = HiveSyncContext.create(conf, this.hiveConf);
   }
 
   private void syncHiveAsync() {
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java
index bd837efc87..9fc5323d46 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java
@@ -20,6 +20,7 @@ package org.apache.hudi.sink.utils;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool;
+import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
@@ -58,7 +59,7 @@ public class HiveSyncContext {
     return new HiveSyncTool(this.syncConfig, this.hiveConf, this.fs);
   }
 
-  public static HiveSyncContext create(Configuration conf) {
+  public static HiveSyncContext create(Configuration conf, SerializableConfiguration serConf) {
     HiveSyncConfig syncConfig = buildSyncConfig(conf);
     org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf(conf);
     String path = conf.getString(FlinkOptions.PATH);
@@ -67,6 +68,7 @@ public class HiveSyncContext {
     if (!FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.HIVE_SYNC_METASTORE_URIS)) {
       hadoopConf.set(HiveConf.ConfVars.METASTOREURIS.varname, conf.getString(FlinkOptions.HIVE_SYNC_METASTORE_URIS));
     }
+    hiveConf.addResource(serConf.get());
     hiveConf.addResource(hadoopConf);
     return new HiveSyncContext(syncConfig, hiveConf, fs);
   }