You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2023/04/26 15:18:18 UTC

[iceberg] branch master updated: Hive: Clean up expired metastore clients (#7310)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new bbacaf41e2 Hive: Clean up expired metastore clients (#7310)
bbacaf41e2 is described below

commit bbacaf41e23923af3640f3cba3e7aec834abb9e8
Author: frankliee <88...@qq.com>
AuthorDate: Wed Apr 26 23:18:10 2023 +0800

    Hive: Clean up expired metastore clients (#7310)
---
 core/src/main/java/org/apache/iceberg/ClientPoolImpl.java          | 4 ++++
 .../src/main/java/org/apache/iceberg/hive/CachedClientPool.java    | 7 +++++++
 .../test/java/org/apache/iceberg/hive/TestCachedClientPool.java    | 4 ++++
 3 files changed, 15 insertions(+)

diff --git a/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java b/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
index f63e0e0f33..e8ab57fed3 100644
--- a/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
+++ b/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
@@ -147,4 +147,8 @@ public abstract class ClientPoolImpl<C, E extends Exception>
   public int poolSize() {
     return poolSize;
   }
+
+  public boolean isClosed() {
+    return closed;
+  }
 }
diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/CachedClientPool.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/CachedClientPool.java
index 118299c55f..98859e67d4 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/CachedClientPool.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/CachedClientPool.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.hive;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Scheduler;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.Comparator;
@@ -41,6 +42,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.ThreadPools;
 import org.apache.thrift.TException;
 import org.immutables.value.Value;
 
@@ -97,10 +99,15 @@ public class CachedClientPool implements ClientPool<IMetaStoreClient, TException
 
   private synchronized void init() {
     if (clientPoolCache == null) {
+      // Since Caffeine does not ensure that removalListener will be involved after expiration
+      // We use a scheduler with one thread to clean up expired clients.
       clientPoolCache =
           Caffeine.newBuilder()
               .expireAfterAccess(evictionInterval, TimeUnit.MILLISECONDS)
               .removalListener((ignored, value, cause) -> ((HiveClientPool) value).close())
+              .scheduler(
+                  Scheduler.forScheduledExecutorService(
+                      ThreadPools.newScheduledPool("hive-metastore-cleaner", 1)))
               .build();
     }
   }
diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestCachedClientPool.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestCachedClientPool.java
index a2dd233e44..61e83dc659 100644
--- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestCachedClientPool.java
+++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestCachedClientPool.java
@@ -46,6 +46,10 @@ public class TestCachedClientPool extends HiveMetastoreTest {
     Assert.assertNull(
         CachedClientPool.clientPoolCache()
             .getIfPresent(CachedClientPool.extractKey(null, hiveConf)));
+
+    // The client has been really closed.
+    Assert.assertTrue(clientPool1.isClosed());
+    Assert.assertTrue(clientPool2.isClosed());
   }
 
   @Test