You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/02/27 11:37:20 UTC
[hudi] branch master updated: [HUDI-5855] Release resource actively for Flink hive meta sync (#8050)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new dd083c41942 [HUDI-5855] Release resource actively for Flink hive meta sync (#8050)
dd083c41942 is described below
commit dd083c41942b785136d65b21f17ff546be5bda86
Author: ForwardXu <fo...@gmail.com>
AuthorDate: Mon Feb 27 19:37:13 2023 +0800
[HUDI-5855] Release resource actively for Flink hive meta sync (#8050)
---
.../org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java | 9 ++++++++-
.../org/apache/hudi/sink/StreamWriteOperatorCoordinator.java | 5 ++++-
.../src/main/java/org/apache/hudi/hive/HiveSyncConfig.java | 2 --
.../src/main/java/org/apache/hudi/hive/HiveSyncTool.java | 12 ++++++++----
4 files changed, 20 insertions(+), 8 deletions(-)
diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
index b8f0d565df7..fcea6e578a9 100644
--- a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
+++ b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
@@ -18,6 +18,8 @@
package org.apache.hudi.aws.sync;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
@@ -26,6 +28,8 @@ import org.apache.hadoop.conf.Configuration;
import java.util.Properties;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
+
/**
* Currently Experimental. Utility class that implements syncing a Hudi Table with the
* AWS Glue Data Catalog (https://docs.aws.amazon.com/glue/latest/dg/populate-data-catalog.html)
@@ -56,6 +60,9 @@ public class AwsGlueCatalogSyncTool extends HiveSyncTool {
cmd.usage();
System.exit(0);
}
- new AwsGlueCatalogSyncTool(params.toProps(), new Configuration()).syncHoodieTable();
+ // HiveConf needs to load fs conf to allow instantiation via AWSGlueClientFactory
+ TypedProperties props = params.toProps();
+ Configuration hadoopConf = FSUtils.getFs(props.getString(META_SYNC_BASE_PATH.key()), new Configuration()).getConf();
+ new AwsGlueCatalogSyncTool(props, hadoopConf).syncHoodieTable();
}
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 9f31d6f59a7..778b528a118 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -33,6 +33,7 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.meta.CkpMetadata;
@@ -334,7 +335,9 @@ public class StreamWriteOperatorCoordinator
* Sync hoodie table metadata to Hive metastore.
*/
public void doSyncHive() {
- hiveSyncContext.hiveSyncTool().syncHoodieTable();
+ try (HiveSyncTool syncTool = hiveSyncContext.hiveSyncTool()) {
+ syncTool.syncHoodieTable();
+ }
}
private static void initMetadataTable(HoodieFlinkWriteClient<?> writeClient) {
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
index 9d23b88cc1b..b5d13bce9dd 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
@@ -98,8 +98,6 @@ public class HiveSyncConfig extends HoodieSyncConfig {
super(props, hadoopConf);
HiveConf hiveConf = hadoopConf instanceof HiveConf
? (HiveConf) hadoopConf : new HiveConf(hadoopConf, HiveConf.class);
- // HiveConf needs to load fs conf to allow instantiation via AWSGlueClientFactory
- hiveConf.addResource(getHadoopFileSystem().getConf());
setHadoopConf(hiveConf);
validateParameters();
}
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index f1b22ae6652..5f7fcdb8ae2 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -88,14 +88,15 @@ public class HiveSyncTool extends HoodieSyncTool implements AutoCloseable {
public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
- protected final HiveSyncConfig config;
- protected final String databaseName;
- protected final String tableName;
+ private HiveSyncConfig config;
+ private final String databaseName;
+ private final String tableName;
+
protected HoodieSyncClient syncClient;
protected String snapshotTableName;
protected Option<String> roTableName;
- protected String hiveSyncTableStrategy;
+ private String hiveSyncTableStrategy;
public HiveSyncTool(Properties props, Configuration hadoopConf) {
super(props, hadoopConf);
@@ -204,6 +205,9 @@ public class HiveSyncTool extends HoodieSyncTool implements AutoCloseable {
throw new HoodieHiveSyncException("Fail to close sync client.", e);
}
}
+ if (config != null) {
+ config = null;
+ }
}
protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, boolean readAsOptimized) {