You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/05/20 09:04:11 UTC

[hudi] branch master updated: [HUDI-1871] Fix hive conf for Flink writer hive meta sync (#2968)

This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 928b09e  [HUDI-1871] Fix hive conf for Flink writer hive meta sync (#2968)
928b09e is described below

commit 928b09ea0b2d97bc5055170c8c87fa0103615b4c
Author: swuferhong <33...@qq.com>
AuthorDate: Thu May 20 17:03:52 2021 +0800

    [HUDI-1871] Fix hive conf for Flink writer hive meta sync (#2968)
---
 .../apache/hudi/configuration/FlinkOptions.java    |  6 ++++++
 .../apache/hudi/sink/utils/HiveSyncContext.java    |  7 +++++--
 packaging/hudi-flink-bundle/pom.xml                | 22 ++++++++++++++++++----
 3 files changed, 29 insertions(+), 6 deletions(-)

diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 4f1be0b..8526392 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -423,6 +423,12 @@ public class FlinkOptions {
       .defaultValue("jdbc:hive2://localhost:10000")
       .withDescription("Jdbc URL for hive sync, default 'jdbc:hive2://localhost:10000'");
 
+  public static final ConfigOption<String> HIVE_SYNC_METASTORE_URIS = ConfigOptions
+      .key("hive_sync.metastore.uris")
+      .stringType()
+      .defaultValue("")
+      .withDescription("Metastore uris for hive sync, default ''");
+
   public static final ConfigOption<String> HIVE_SYNC_PARTITION_FIELDS = ConfigOptions
       .key("hive_sync.partition_fields")
       .stringType()
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java
index a791076..79fbd19 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java
@@ -57,7 +57,10 @@ public class HiveSyncContext {
     String path = conf.getString(FlinkOptions.PATH);
     FileSystem fs = FSUtils.getFs(path, hadoopConf);
     HiveConf hiveConf = new HiveConf();
-    hiveConf.addResource(fs.getConf());
+    if (!FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.HIVE_SYNC_METASTORE_URIS)) {
+      hadoopConf.set("hive.metastore.uris", conf.getString(FlinkOptions.HIVE_SYNC_METASTORE_URIS));
+    }
+    hiveConf.addResource(hadoopConf);
     return new HiveSyncContext(syncConfig, hiveConf, fs);
   }
 
@@ -71,7 +74,7 @@ public class HiveSyncContext {
     hiveSyncConfig.hiveUser = conf.getString(FlinkOptions.HIVE_SYNC_USERNAME);
     hiveSyncConfig.hivePass = conf.getString(FlinkOptions.HIVE_SYNC_PASSWORD);
     hiveSyncConfig.jdbcUrl = conf.getString(FlinkOptions.HIVE_SYNC_JDBC_URL);
-    hiveSyncConfig.partitionFields = Arrays.stream(conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS)
+    hiveSyncConfig.partitionFields = Arrays.stream(conf.getString(FlinkOptions.PARTITION_PATH_FIELD)
         .split(",")).map(String::trim).collect(Collectors.toList());
     hiveSyncConfig.partitionValueExtractorClass = conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS);
     hiveSyncConfig.useJdbc = conf.getBoolean(FlinkOptions.HIVE_SYNC_USE_JDBC);
diff --git a/packaging/hudi-flink-bundle/pom.xml b/packaging/hudi-flink-bundle/pom.xml
index ce106d7..9e2b5f9 100644
--- a/packaging/hudi-flink-bundle/pom.xml
+++ b/packaging/hudi-flink-bundle/pom.xml
@@ -131,6 +131,8 @@
                   <include>org.apache.hive:hive-exec</include>
                   <include>org.apache.hive:hive-metastore</include>
                   <include>org.apache.hive:hive-jdbc</include>
+                  <include>org.datanucleus:datanucleus-core</include>
+                  <include>org.datanucleus:datanucleus-api-jdo</include>
 
                   <include>org.apache.hbase:hbase-common</include>
                   <include>commons-codec:commons-codec</include>
@@ -162,10 +164,6 @@
                   <shadedPattern>${flink.bundle.hive.shade.prefix}org.apache.hadoop.hive.metastore.</shadedPattern>
                 </relocation>
                 <relocation>
-                  <pattern>org.apache.hadoop.hive.ql.</pattern>
-                  <shadedPattern>${flink.bundle.hive.shade.prefix}org.apache.hadoop.hive.ql.</shadedPattern>
-                </relocation>
-                <relocation>
                   <pattern>org.apache.hive.common.</pattern>
                   <shadedPattern>${flink.bundle.hive.shade.prefix}org.apache.hive.common.</shadedPattern>
                 </relocation>
@@ -440,6 +438,10 @@
           <artifactId>*</artifactId>
         </exclusion>
         <exclusion>
+          <groupId>org.datanucleus</groupId>
+          <artifactId>datanucleus-core</artifactId>
+        </exclusion>
+        <exclusion>
           <groupId>javax.servlet.jsp</groupId>
           <artifactId>*</artifactId>
         </exclusion>
@@ -477,6 +479,18 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.datanucleus</groupId>
+      <artifactId>datanucleus-core</artifactId>
+      <scope>${flink.bundle.hive.scope}</scope>
+      <version>5.0.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.datanucleus</groupId>
+      <artifactId>datanucleus-api-jdo</artifactId>
+      <scope>${flink.bundle.hive.scope}</scope>
+      <version>5.0.1</version>
+    </dependency>
 
     <dependency>
       <groupId>joda-time</groupId>