You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2023/04/26 15:18:18 UTC
[iceberg] branch master updated: Hive: Clean up expired metastore clients (#7310)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new bbacaf41e2 Hive: Clean up expired metastore clients (#7310)
bbacaf41e2 is described below
commit bbacaf41e23923af3640f3cba3e7aec834abb9e8
Author: frankliee <88...@qq.com>
AuthorDate: Wed Apr 26 23:18:10 2023 +0800
Hive: Clean up expired metastore clients (#7310)
---
core/src/main/java/org/apache/iceberg/ClientPoolImpl.java | 4 ++++
.../src/main/java/org/apache/iceberg/hive/CachedClientPool.java | 7 +++++++
.../test/java/org/apache/iceberg/hive/TestCachedClientPool.java | 4 ++++
3 files changed, 15 insertions(+)
diff --git a/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java b/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
index f63e0e0f33..e8ab57fed3 100644
--- a/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
+++ b/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
@@ -147,4 +147,8 @@ public abstract class ClientPoolImpl<C, E extends Exception>
public int poolSize() {
return poolSize;
}
+
+ public boolean isClosed() {
+ return closed;
+ }
}
diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/CachedClientPool.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/CachedClientPool.java
index 118299c55f..98859e67d4 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/CachedClientPool.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/CachedClientPool.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.hive;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Scheduler;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Comparator;
@@ -41,6 +42,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.ThreadPools;
import org.apache.thrift.TException;
import org.immutables.value.Value;
@@ -97,10 +99,15 @@ public class CachedClientPool implements ClientPool<IMetaStoreClient, TException
private synchronized void init() {
if (clientPoolCache == null) {
+ // Since Caffeine does not ensure that removalListener will be involved after expiration
+ // We use a scheduler with one thread to clean up expired clients.
clientPoolCache =
Caffeine.newBuilder()
.expireAfterAccess(evictionInterval, TimeUnit.MILLISECONDS)
.removalListener((ignored, value, cause) -> ((HiveClientPool) value).close())
+ .scheduler(
+ Scheduler.forScheduledExecutorService(
+ ThreadPools.newScheduledPool("hive-metastore-cleaner", 1)))
.build();
}
}
diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestCachedClientPool.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestCachedClientPool.java
index a2dd233e44..61e83dc659 100644
--- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestCachedClientPool.java
+++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestCachedClientPool.java
@@ -46,6 +46,10 @@ public class TestCachedClientPool extends HiveMetastoreTest {
Assert.assertNull(
CachedClientPool.clientPoolCache()
.getIfPresent(CachedClientPool.extractKey(null, hiveConf)));
+
+ // The client has been really closed.
+ Assert.assertTrue(clientPool1.isClosed());
+ Assert.assertTrue(clientPool2.isClosed());
}
@Test