You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/02/27 11:37:20 UTC

[hudi] branch master updated: [HUDI-5855] Release resource actively for Flink hive meta sync (#8050)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new dd083c41942 [HUDI-5855] Release resource actively for Flink hive meta sync (#8050)
dd083c41942 is described below

commit dd083c41942b785136d65b21f17ff546be5bda86
Author: ForwardXu <fo...@gmail.com>
AuthorDate: Mon Feb 27 19:37:13 2023 +0800

    [HUDI-5855] Release resource actively for Flink hive meta sync (#8050)
---
 .../org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java     |  9 ++++++++-
 .../org/apache/hudi/sink/StreamWriteOperatorCoordinator.java |  5 ++++-
 .../src/main/java/org/apache/hudi/hive/HiveSyncConfig.java   |  2 --
 .../src/main/java/org/apache/hudi/hive/HiveSyncTool.java     | 12 ++++++++----
 4 files changed, 20 insertions(+), 8 deletions(-)

diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
index b8f0d565df7..fcea6e578a9 100644
--- a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
+++ b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.aws.sync;
 
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HiveSyncTool;
 
@@ -26,6 +28,8 @@ import org.apache.hadoop.conf.Configuration;
 
 import java.util.Properties;
 
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
+
 /**
  * Currently Experimental. Utility class that implements syncing a Hudi Table with the
  * AWS Glue Data Catalog (https://docs.aws.amazon.com/glue/latest/dg/populate-data-catalog.html)
@@ -56,6 +60,9 @@ public class AwsGlueCatalogSyncTool extends HiveSyncTool {
       cmd.usage();
       System.exit(0);
     }
-    new AwsGlueCatalogSyncTool(params.toProps(), new Configuration()).syncHoodieTable();
+    // HiveConf needs to load fs conf to allow instantiation via AWSGlueClientFactory
+    TypedProperties props = params.toProps();
+    Configuration hadoopConf = FSUtils.getFs(props.getString(META_SYNC_BASE_PATH.key()), new Configuration()).getConf();
+    new AwsGlueCatalogSyncTool(props, hadoopConf).syncHoodieTable();
   }
 }
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 9f31d6f59a7..778b528a118 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -33,6 +33,7 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hive.HiveSyncTool;
 import org.apache.hudi.sink.event.CommitAckEvent;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.sink.meta.CkpMetadata;
@@ -334,7 +335,9 @@ public class StreamWriteOperatorCoordinator
    * Sync hoodie table metadata to Hive metastore.
    */
   public void doSyncHive() {
-    hiveSyncContext.hiveSyncTool().syncHoodieTable();
+    try (HiveSyncTool syncTool = hiveSyncContext.hiveSyncTool()) {
+      syncTool.syncHoodieTable();
+    }
   }
 
   private static void initMetadataTable(HoodieFlinkWriteClient<?> writeClient) {
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
index 9d23b88cc1b..b5d13bce9dd 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
@@ -98,8 +98,6 @@ public class HiveSyncConfig extends HoodieSyncConfig {
     super(props, hadoopConf);
     HiveConf hiveConf = hadoopConf instanceof HiveConf
         ? (HiveConf) hadoopConf : new HiveConf(hadoopConf, HiveConf.class);
-    // HiveConf needs to load fs conf to allow instantiation via AWSGlueClientFactory
-    hiveConf.addResource(getHadoopFileSystem().getConf());
     setHadoopConf(hiveConf);
     validateParameters();
   }
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index f1b22ae6652..5f7fcdb8ae2 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -88,14 +88,15 @@ public class HiveSyncTool extends HoodieSyncTool implements AutoCloseable {
   public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
   public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
 
-  protected final HiveSyncConfig config;
-  protected final String databaseName;
-  protected final String tableName;
+  private HiveSyncConfig config;
+  private final String databaseName;
+  private final String tableName;
+
   protected HoodieSyncClient syncClient;
   protected String snapshotTableName;
   protected Option<String> roTableName;
 
-  protected String hiveSyncTableStrategy;
+  private String hiveSyncTableStrategy;
 
   public HiveSyncTool(Properties props, Configuration hadoopConf) {
     super(props, hadoopConf);
@@ -204,6 +205,9 @@ public class HiveSyncTool extends HoodieSyncTool implements AutoCloseable {
         throw new HoodieHiveSyncException("Fail to close sync client.", e);
       }
     }
+    if (config != null) {
+      config = null;
+    }
   }
 
   protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, boolean readAsOptimized) {