You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/03/11 14:05:08 UTC

[GitHub] [iceberg] lcspinter opened a new pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

lcspinter opened a new pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325


   HiveClientPool instances should be shared between HiveCatalogs if they are targeting the same HiveMetastore. 
   
   What I have done
   - Removed HiveCatalog global cache calls and marked HiveCatalogs#loadCatalog() as deprecated
   - Created HiveClientPool cache inside HiveCatalog
   - Created HiveClientPool cache cleaner thread which runs periodically (iceberg.hive.client-pool-cache-cleaner-interval), and removes HiveClientPool entries if they were not used for a certain amount of time (iceberg.hive.client-pool-cache-eviction-interval). This makes sure we are cleaning up every unused HiveClientPool connection. 
   - Removed the finalize() method implementation from the HiveCatalog. Previously the cleanup of the HiveClientPool connections was handled when the HiveCatalog was GC'd. 
   - Instead of passing the HiveClientPool to the HiveTableOperations, now the HiveCatalog is passed to fetch the HiveClientPool instance from the cache.
   - HiveCatalog is not an instance of Closable anymore. 
   - Created unit test to validate the cleaner thread.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] lcspinter commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r596808364



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
##########
@@ -100,8 +99,9 @@
   private final long lockCheckMinWaitTime;
   private final long lockCheckMaxWaitTime;
   private final FileIO fileIO;
+  private final CachedClientPool metaClients;
 
-  protected HiveTableOperations(Configuration conf, HiveClientPool metaClients, FileIO fileIO,
+  protected HiveTableOperations(Configuration conf, CachedClientPool metaClients, FileIO fileIO,

Review comment:
       I changed it to ClientPool




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r596747294



##########
File path: hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java
##########
@@ -99,7 +105,7 @@ public void before() throws Exception {
 
     Assert.assertEquals(2, ops.current().schema().columns().size());
 
-    spyOps = spy(new HiveTableOperations(overriddenHiveConf, spyClientPool, ops.io(), catalog.name(),
+    spyOps = spy(new HiveTableOperations(overriddenHiveConf,  spyCachedClientPool, ops.io(), catalog.name(),

Review comment:
       nit: extra space




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] marton-bod commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
marton-bod commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r593303519



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
##########
@@ -50,25 +55,37 @@
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class HiveCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Configurable {
   private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);
 
+  public static final String CACHE_CLEANER_INTERVAL = "iceberg.hive.client-pool-cache-cleaner-interval";
+  private static final long CACHE_CLEANER_INTERVAL_DEFAULT = TimeUnit.SECONDS.toMillis(30);
+  public static final String CACHE_EVICTION_INTERVAL = "iceberg.hive.client-pool-cache-eviction-interval";
+  private static final long CACHE_EVICTION_INTERVAL_DEFAULT = TimeUnit.MINUTES.toMillis(5);

Review comment:
       Quick question: is there a rationale behind the 5 minutes cache eviction as a default? Does it typically become stale somehow after a few minutes that we need to replace it? Just wondering, given that the pool recreation has a cost




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] lcspinter commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r596363309



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
##########
@@ -102,14 +94,17 @@ public void initialize(String inputName, Map<String, String> properties) {
       this.conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, properties.get(CatalogProperties.WAREHOUSE_LOCATION));
     }
 
-    int clientPoolSize = PropertyUtil.propertyAsInt(properties,
-        CatalogProperties.CLIENT_POOL_SIZE, CatalogProperties.CLIENT_POOL_SIZE_DEFAULT);
-    this.clients = new HiveClientPool(clientPoolSize, this.conf);
-    this.createStack = Thread.currentThread().getStackTrace();
-    this.closed = false;
+    this.conf.setInt(CatalogProperties.CLIENT_POOL_SIZE, PropertyUtil.propertyAsInt(properties,
+            CatalogProperties.CLIENT_POOL_SIZE, CatalogProperties.CLIENT_POOL_SIZE_DEFAULT));
+
+    this.conf.setLong(CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
+            PropertyUtil.propertyAsLong(properties, CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
pvary commented on pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#issuecomment-804001355


   @rymurr, @rdblue: Any more comments? I think this would be ready to be merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r596746805



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
##########
@@ -100,8 +99,9 @@
   private final long lockCheckMinWaitTime;
   private final long lockCheckMaxWaitTime;
   private final FileIO fileIO;
+  private final CachedClientPool metaClients;
 
-  protected HiveTableOperations(Configuration conf, HiveClientPool metaClients, FileIO fileIO,
+  protected HiveTableOperations(Configuration conf, CachedClientPool metaClients, FileIO fileIO,

Review comment:
       What is the least restrictive parameter that we can use here? `HiveClientPool`, or `ClientPool`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] lcspinter commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r597589599



##########
File path: mr/src/main/java/org/apache/iceberg/mr/Catalogs.java
##########
@@ -212,7 +214,7 @@ public static boolean hiveCatalog(Configuration conf) {
           LOG.info("Loaded Hadoop catalog {}", catalog);
           return Optional.of(catalog);
         case HIVE:
-          catalog = HiveCatalogs.loadCatalog(conf);
+          catalog = CatalogUtil.loadCatalog(HiveCatalog.class.getName(), "hive", ImmutableMap.of(), conf);

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] marton-bod commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
marton-bod commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r593312779



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
##########
@@ -547,4 +541,42 @@ public void setConf(Configuration conf) {
   public Configuration getConf() {
     return conf;
   }
+
+  @VisibleForTesting
+  HiveClientPool clientPool() {
+    synchronized (CLIENT_POOL_CACHE) {
+      String metastoreUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, "");
+      Pair<HiveClientPool, Long> cacheEntry = CLIENT_POOL_CACHE.getIfPresent(metastoreUri);
+      HiveClientPool clientPool = cacheEntry == null ? new HiveClientPool(clientPoolSize, conf) : cacheEntry.first();
+      CLIENT_POOL_CACHE.put(metastoreUri, Pair.of(clientPool, System.currentTimeMillis() + evictionInterval));
+      return clientPool;
+    }
+  }
+
+  private void scheduleCacheCleaner() {
+    if (cleaner == null) {
+      synchronized (HiveCatalog.class) {
+        if (cleaner == null) {
+          cleaner = Executors.newSingleThreadScheduledExecutor(
+                  new ThreadFactoryBuilder()
+                          .setDaemon(true)
+                          .setNameFormat("iceberg-client-pool-cache-cleaner-%d")
+                          .build());
+        }
+        long cleanerInterval = conf.getLong(CACHE_CLEANER_INTERVAL, CACHE_CLEANER_INTERVAL_DEFAULT);
+        ScheduledFuture<?> futures = cleaner.scheduleWithFixedDelay(() -> {
+          synchronized (CLIENT_POOL_CACHE) {
+            long currentTime = System.currentTimeMillis();
+            CLIENT_POOL_CACHE.asMap().entrySet().stream()
+                    .filter(e -> e.getValue().second() <= currentTime)
+                    .forEach(e -> {
+                      HiveClientPool pool = e.getValue().first();
+                      CLIENT_POOL_CACHE.invalidate(e.getKey());
+                      pool.close();

Review comment:
       If we decide to close the pool here regardless of its usage by any clients, it's probably worth pointing out in documentation that the pool has an expiration date and users should not cache it in a variable for too long. For example:
   ```
   pool = getPool()
   // some long running other operation
   pool.getTable("table"); // -> FAIL
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r595793707



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPoolProvider.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+
+public class HiveClientPoolProvider {

Review comment:
       Maybe even something like this:
   https://github.com/apache/iceberg/blob/94705c6799f3b283b45e47e6f9b46b5d081ed666/core/src/main/java/org/apache/iceberg/CachingCatalog.java#L35-L37




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
pvary commented on pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#issuecomment-805753363


   Merged the PR. Thanks for the PR @lcspinter and @marton-bod, @rymurr, @rdblue  for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] lcspinter commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r592700382



##########
File path: hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
##########
@@ -400,6 +401,20 @@ public void testTableName() {
     }
   }
 
+  @Test
+  public void testClientPoolCleaner() throws InterruptedException {
+    HiveCatalog catalog = new HiveCatalog(hiveConf);
+    HiveClientPool clientPool1 = catalog.loadHiveClientPool(hiveConf);
+    Thread.sleep(TimeUnit.SECONDS.toMillis(5));

Review comment:
       I choose Thread.sleep() because we are using this pattern in other test suits as well. Anyway, I changed this part to TimeUnit.SECONDS.sleep() for better readability.

##########
File path: hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java
##########
@@ -215,6 +216,8 @@ private void initConf(HiveConf conf, int port) {
     conf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, "false");
     conf.set(HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES.varname, "false");
     conf.set("iceberg.hive.client-pool-size", "2");
+    conf.setLong("iceberg.hive.client-pool-cache-cleaner-interval", TimeUnit.SECONDS.toMillis(5));

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rymurr commented on pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
rymurr commented on pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#issuecomment-804014676


   Hey @pvary I am on holiday this week so no need to wait for me!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r592605668



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
##########
@@ -51,36 +55,48 @@
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class HiveCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Configurable {
   private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);
 
+  private static final String CACHE_CLEANER_INTERVAL = "iceberg.hive.client-pool-cache-cleaner-interval";
+  private static final long CACHE_CLEANER_INTERVAL_DEFAULT = TimeUnit.SECONDS.toMillis(30);
+  private static final String CACHE_EVICTION_INTERVAL = "iceberg.hive.client-pool-cache-eviction-interval";
+  private static final long CACHE_EVICTION_INTERVAL_DEFAULT = TimeUnit.MINUTES.toMillis(5);
+
+  @VisibleForTesting
+  static final Cache<String, Pair<HiveClientPool, Long>> CLIENT_POOL_CACHE = Caffeine.newBuilder()

Review comment:
       Just a question:
   Wouldn't it make sense to put this into the HiveClientPool instead with all the scaffolding. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r596881050



##########
File path: hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveClientPool.java
##########
@@ -44,7 +44,7 @@ public void testConf() {
     conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, "file:/mywarehouse/");
     conf.setInt("iceberg.hive.client-pool-size", 10);
 
-    HiveClientPool clientPool = new HiveClientPool(conf);
+    HiveClientPool clientPool = new HiveClientPool(10, conf);

Review comment:
       This one is not needed anymore




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary merged pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
pvary merged pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] lcspinter commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r595318146



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
##########
@@ -98,15 +97,16 @@
   private final long lockCheckMinWaitTime;
   private final long lockCheckMaxWaitTime;
   private final FileIO fileIO;
+  private final HiveCatalog catalog;
 
-  protected HiveTableOperations(Configuration conf, HiveClientPool metaClients, FileIO fileIO,
-                                String catalogName, String database, String table) {
+  protected HiveTableOperations(Configuration conf, FileIO fileIO, HiveCatalog catalog,

Review comment:
       @rymurr @rdblue You are right. I changed this part.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r592620810



##########
File path: spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java
##########
@@ -73,9 +73,6 @@ public static void startMetastoreAndSpark() {
 
   @AfterClass
   public static void stopMetastoreAndSpark() {
-    if (catalog != null) {
-      catalog.close();
-    }

Review comment:
       Should we make this consistent with other tests and set the catalog to null?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] lcspinter commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r615887257



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/ClientPoolImpl.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import java.io.Closeable;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ClientPoolImpl<C, E extends Exception> implements Closeable, ClientPool<C, E> {

Review comment:
       @rdblue The `ClientPoolImpl` is basically just the renamed copy of `ClientPool` from the previous version of the code, with a slight modification. What changed is that the `Action` interface and `run` method was extracted into a separate `ClientPool` interface (That's why we had to do the renaming). This interface is implemented by the `CacheClientPool` and `ClientPoolImpl` classes (and indirectly by `HiveClientPool`). 
   When someone calls `run` on an instance of `CachedClientPool`, we just fetch the appropriate `HiveClientPool` instance from the cache and call `run` on that instance. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r592618888



##########
File path: hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
##########
@@ -400,6 +401,20 @@ public void testTableName() {
     }
   }
 
+  @Test
+  public void testClientPoolCleaner() throws InterruptedException {
+    HiveCatalog catalog = new HiveCatalog(hiveConf);
+    HiveClientPool clientPool1 = catalog.loadHiveClientPool(hiveConf);
+    Thread.sleep(TimeUnit.SECONDS.toMillis(5));

Review comment:
       I have found that `Clock` is very useful when we run tests against current time and do not want to use Thread.sleeps...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r615980048



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/ClientPoolImpl.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import java.io.Closeable;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ClientPoolImpl<C, E extends Exception> implements Closeable, ClientPool<C, E> {

Review comment:
       Thanks, @lcspinter! The context I was missing was that the `CacheClientPool` implements the interface.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r592615110



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
##########
@@ -604,4 +596,34 @@ public void setConf(Configuration conf) {
   public Configuration getConf() {
     return conf;
   }
+
+  @VisibleForTesting
+  HiveClientPool loadHiveClientPool(Configuration configuration) {

Review comment:
       Could we just call it `clientPool`? I would prefer not to "leak" information about how it is handled internally




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] lcspinter commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r597774353



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
##########
@@ -547,4 +541,42 @@ public void setConf(Configuration conf) {
   public Configuration getConf() {
     return conf;
   }
+
+  @VisibleForTesting
+  HiveClientPool clientPool() {
+    synchronized (CLIENT_POOL_CACHE) {
+      String metastoreUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, "");
+      Pair<HiveClientPool, Long> cacheEntry = CLIENT_POOL_CACHE.getIfPresent(metastoreUri);
+      HiveClientPool clientPool = cacheEntry == null ? new HiveClientPool(clientPoolSize, conf) : cacheEntry.first();
+      CLIENT_POOL_CACHE.put(metastoreUri, Pair.of(clientPool, System.currentTimeMillis() + evictionInterval));
+      return clientPool;
+    }
+  }
+
+  private void scheduleCacheCleaner() {

Review comment:
       @rdblue I made a few changes compared to the original version of the HiveClientPoolCache. Now the cleanup and update are completely delegated to the cache itself. Could you please review it? Thanks 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r594631930



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
##########
@@ -547,4 +541,42 @@ public void setConf(Configuration conf) {
   public Configuration getConf() {
     return conf;
   }
+
+  @VisibleForTesting
+  HiveClientPool clientPool() {
+    synchronized (CLIENT_POOL_CACHE) {
+      String metastoreUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, "");
+      Pair<HiveClientPool, Long> cacheEntry = CLIENT_POOL_CACHE.getIfPresent(metastoreUri);
+      HiveClientPool clientPool = cacheEntry == null ? new HiveClientPool(clientPoolSize, conf) : cacheEntry.first();
+      CLIENT_POOL_CACHE.put(metastoreUri, Pair.of(clientPool, System.currentTimeMillis() + evictionInterval));

Review comment:
       Why use a cache but not delegate thread safety to it? Shouldn't this use `get` with an init function?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r594635018



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
##########
@@ -98,15 +97,16 @@
   private final long lockCheckMinWaitTime;
   private final long lockCheckMaxWaitTime;
   private final FileIO fileIO;
+  private final HiveCatalog catalog;
 
-  protected HiveTableOperations(Configuration conf, HiveClientPool metaClients, FileIO fileIO,
-                                String catalogName, String database, String table) {
+  protected HiveTableOperations(Configuration conf, FileIO fileIO, HiveCatalog catalog,

Review comment:
       Agreed. I think that expiring connections in the pool rather than the pool itself would solve this problem and would be a better solution for managing connections in the long term.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r595792609



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
##########
@@ -102,14 +94,17 @@ public void initialize(String inputName, Map<String, String> properties) {
       this.conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, properties.get(CatalogProperties.WAREHOUSE_LOCATION));
     }
 
-    int clientPoolSize = PropertyUtil.propertyAsInt(properties,
-        CatalogProperties.CLIENT_POOL_SIZE, CatalogProperties.CLIENT_POOL_SIZE_DEFAULT);
-    this.clients = new HiveClientPool(clientPoolSize, this.conf);
-    this.createStack = Thread.currentThread().getStackTrace();
-    this.closed = false;
+    this.conf.setInt(CatalogProperties.CLIENT_POOL_SIZE, PropertyUtil.propertyAsInt(properties,
+            CatalogProperties.CLIENT_POOL_SIZE, CatalogProperties.CLIENT_POOL_SIZE_DEFAULT));
+
+    this.conf.setLong(CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
+            PropertyUtil.propertyAsLong(properties, CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,

Review comment:
       Is there a particular reason to not push this logic to the `HiveClientPoolProvider` and add a property map as an input parameter?
   Or otherwise specifically providing it as a constructor parameter?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r592611880



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
##########
@@ -604,4 +596,34 @@ public void setConf(Configuration conf) {
   public Configuration getConf() {
     return conf;
   }
+
+  @VisibleForTesting
+  HiveClientPool loadHiveClientPool(Configuration configuration) {
+    String metastoreUri = configuration.get(HiveConf.ConfVars.METASTOREURIS.varname, "");
+    Pair<HiveClientPool, Long> cacheEntry = CLIENT_POOL_CACHE.getIfPresent(metastoreUri);
+    HiveClientPool clientPool = cacheEntry == null ? new HiveClientPool(configuration) : cacheEntry.first();
+    CLIENT_POOL_CACHE.put(metastoreUri, Pair.of(clientPool, System.currentTimeMillis()));
+    return clientPool;
+  }
+
+  private void scheduleCacheCleaner() {
+    if (cleaner == null) {
+      synchronized (HiveCatalog.class) {
+        if (cleaner == null) {
+          cleaner = Executors.newSingleThreadScheduledExecutor(
+                  new ThreadFactoryBuilder()
+                          .setDaemon(true)
+                          .setNameFormat("iceberg-client-pool-cache-cleaner-%d")
+                          .build());
+        }
+        ScheduledFuture<?> futures = cleaner.scheduleWithFixedDelay(() -> CLIENT_POOL_CACHE.asMap().entrySet().stream()
+                .filter(e -> e.getValue().second() + evictionInterval <= System.currentTimeMillis())
+                .forEach(e -> {
+                  HiveClientPool pool = e.getValue().first();
+                  CLIENT_POOL_CACHE.invalidate(e.getKey());
+                  pool.close();

Review comment:
       Could we have some problem with concurrency?
   We decide to evict, but just after the check someone gets this pool, but we close it...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r594633666



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
##########
@@ -547,4 +541,42 @@ public void setConf(Configuration conf) {
   public Configuration getConf() {
     return conf;
   }
+
+  @VisibleForTesting
+  HiveClientPool clientPool() {
+    synchronized (CLIENT_POOL_CACHE) {
+      String metastoreUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, "");
+      Pair<HiveClientPool, Long> cacheEntry = CLIENT_POOL_CACHE.getIfPresent(metastoreUri);
+      HiveClientPool clientPool = cacheEntry == null ? new HiveClientPool(clientPoolSize, conf) : cacheEntry.first();
+      CLIENT_POOL_CACHE.put(metastoreUri, Pair.of(clientPool, System.currentTimeMillis() + evictionInterval));
+      return clientPool;
+    }
+  }
+
+  private void scheduleCacheCleaner() {
+    if (cleaner == null) {
+      synchronized (HiveCatalog.class) {
+        if (cleaner == null) {
+          cleaner = Executors.newSingleThreadScheduledExecutor(
+                  new ThreadFactoryBuilder()
+                          .setDaemon(true)
+                          .setNameFormat("iceberg-client-pool-cache-cleaner-%d")
+                          .build());
+        }
+        long cleanerInterval = conf.getLong(CACHE_CLEANER_INTERVAL, CACHE_CLEANER_INTERVAL_DEFAULT);
+        ScheduledFuture<?> futures = cleaner.scheduleWithFixedDelay(() -> {
+          synchronized (CLIENT_POOL_CACHE) {
+            long currentTime = System.currentTimeMillis();
+            CLIENT_POOL_CACHE.asMap().entrySet().stream()
+                    .filter(e -> e.getValue().second() <= currentTime)

Review comment:
       Can't expiration be delegated to the cache?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] lcspinter commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r595336976



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
##########
@@ -50,25 +55,37 @@
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class HiveCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Configurable {
   private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);
 
+  public static final String CACHE_CLEANER_INTERVAL = "iceberg.hive.client-pool-cache-cleaner-interval";

Review comment:
       Right, moved these configs to `CatalogProperties`. 

##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
##########
@@ -19,11 +19,16 @@
 
 package org.apache.iceberg.hive;
 
-import java.io.Closeable;
-import java.util.Arrays;
+

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r594634855



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
##########
@@ -547,4 +541,42 @@ public void setConf(Configuration conf) {
   public Configuration getConf() {
     return conf;
   }
+
+  @VisibleForTesting
+  HiveClientPool clientPool() {
+    synchronized (CLIENT_POOL_CACHE) {
+      String metastoreUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, "");
+      Pair<HiveClientPool, Long> cacheEntry = CLIENT_POOL_CACHE.getIfPresent(metastoreUri);
+      HiveClientPool clientPool = cacheEntry == null ? new HiveClientPool(clientPoolSize, conf) : cacheEntry.first();
+      CLIENT_POOL_CACHE.put(metastoreUri, Pair.of(clientPool, System.currentTimeMillis() + evictionInterval));
+      return clientPool;
+    }
+  }
+
+  private void scheduleCacheCleaner() {

Review comment:
       I think it would be cleaner to always keep the pool around and never close it until the cache is cleaned up. Then the pool would always be shared and could be kept as an instance variable inside the catalog. It would also make cleanup less coarse. We could expire individual connections that are unused instead of expiring whole pools. That aligns better with usage patterns and keeps the complexity inside the pool rather than in the cache.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] lcspinter commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r592697678



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
##########
@@ -604,4 +596,34 @@ public void setConf(Configuration conf) {
   public Configuration getConf() {
     return conf;
   }
+
+  @VisibleForTesting
+  HiveClientPool loadHiveClientPool(Configuration configuration) {

Review comment:
       Fixed it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r592619945



##########
File path: hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java
##########
@@ -215,6 +216,8 @@ private void initConf(HiveConf conf, int port) {
     conf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, "false");
     conf.set(HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES.varname, "false");
     conf.set("iceberg.hive.client-pool-size", "2");
+    conf.setLong("iceberg.hive.client-pool-cache-cleaner-interval", TimeUnit.SECONDS.toMillis(5));

Review comment:
       Could we use the consts here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r594635018



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
##########
@@ -98,15 +97,16 @@
   private final long lockCheckMinWaitTime;
   private final long lockCheckMaxWaitTime;
   private final FileIO fileIO;
+  private final HiveCatalog catalog;
 
-  protected HiveTableOperations(Configuration conf, HiveClientPool metaClients, FileIO fileIO,
-                                String catalogName, String database, String table) {
+  protected HiveTableOperations(Configuration conf, FileIO fileIO, HiveCatalog catalog,

Review comment:
       Agreed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r592615110



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
##########
@@ -604,4 +596,34 @@ public void setConf(Configuration conf) {
   public Configuration getConf() {
     return conf;
   }
+
+  @VisibleForTesting
+  HiveClientPool loadHiveClientPool(Configuration configuration) {

Review comment:
       Could we just call it `clientPool`? I would prefer not to "leak" information about how it is handled internally.
   Also HiveCatalog has a `conf`, do we really need a new `configuration` object?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] marton-bod commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
marton-bod commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r597033443



##########
File path: mr/src/main/java/org/apache/iceberg/mr/Catalogs.java
##########
@@ -212,7 +214,7 @@ public static boolean hiveCatalog(Configuration conf) {
           LOG.info("Loaded Hadoop catalog {}", catalog);
           return Optional.of(catalog);
         case HIVE:
-          catalog = HiveCatalogs.loadCatalog(conf);
+          catalog = CatalogUtil.loadCatalog(HiveCatalog.class.getName(), "hive", ImmutableMap.of(), conf);

Review comment:
       Shall we replace the "hive" literals with `CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] lcspinter commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r592696818



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
##########
@@ -604,4 +596,34 @@ public void setConf(Configuration conf) {
   public Configuration getConf() {
     return conf;
   }
+
+  @VisibleForTesting
+  HiveClientPool loadHiveClientPool(Configuration configuration) {
+    String metastoreUri = configuration.get(HiveConf.ConfVars.METASTOREURIS.varname, "");
+    Pair<HiveClientPool, Long> cacheEntry = CLIENT_POOL_CACHE.getIfPresent(metastoreUri);
+    HiveClientPool clientPool = cacheEntry == null ? new HiveClientPool(configuration) : cacheEntry.first();
+    CLIENT_POOL_CACHE.put(metastoreUri, Pair.of(clientPool, System.currentTimeMillis()));
+    return clientPool;
+  }
+
+  private void scheduleCacheCleaner() {
+    if (cleaner == null) {
+      synchronized (HiveCatalog.class) {
+        if (cleaner == null) {
+          cleaner = Executors.newSingleThreadScheduledExecutor(
+                  new ThreadFactoryBuilder()
+                          .setDaemon(true)
+                          .setNameFormat("iceberg-client-pool-cache-cleaner-%d")
+                          .build());
+        }
+        ScheduledFuture<?> futures = cleaner.scheduleWithFixedDelay(() -> CLIENT_POOL_CACHE.asMap().entrySet().stream()
+                .filter(e -> e.getValue().second() + evictionInterval <= System.currentTimeMillis())

Review comment:
       Good idea. I changed the code accordingly. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r594628716



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
##########
@@ -50,25 +55,37 @@
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class HiveCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Configurable {
   private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);
 
+  public static final String CACHE_CLEANER_INTERVAL = "iceberg.hive.client-pool-cache-cleaner-interval";

Review comment:
       Where should these be set? We avoid setting things in Hadoop `Configuration`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r595791255



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPoolProvider.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+
+public class HiveClientPoolProvider {

Review comment:
       Could this object implement the `ClientPool` interface and maybe renamed to `CachedClientPool`?
   
   This way we can push this `ClientPool` to the HiveTableOperations and do not need to change there




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r596685329



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
##########
@@ -83,12 +78,15 @@ public HiveCatalog() {
   @Deprecated
   public HiveCatalog(Configuration conf) {
     this.name = "hive";
-    int clientPoolSize = conf.getInt(CatalogProperties.CLIENT_POOL_SIZE, CatalogProperties.CLIENT_POOL_SIZE_DEFAULT);
-    this.clients = new HiveClientPool(clientPoolSize, conf);
     this.conf = conf;
-    this.createStack = Thread.currentThread().getStackTrace();
-    this.closed = false;
     this.fileIO = new HadoopFileIO(conf);
+    Map<String, String> properties = ImmutableMap.of(
+            CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
+            conf.get(CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS),
+            CatalogProperties.CLIENT_POOL_SIZE,
+            conf.get(CatalogProperties.CLIENT_POOL_SIZE)
+    );
+    this.clientPool = new CachedClientPool(conf, properties);

Review comment:
       Could we rename this back to `clients`? So there is fewer changes...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r594628877



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
##########
@@ -19,11 +19,16 @@
 
 package org.apache.iceberg.hive;
 
-import java.io.Closeable;
-import java.util.Arrays;
+

Review comment:
       Nit: please remove unnecessary whitespace changes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] lcspinter commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r595315023



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
##########
@@ -51,36 +55,48 @@
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class HiveCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Configurable {
   private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);
 
+  private static final String CACHE_CLEANER_INTERVAL = "iceberg.hive.client-pool-cache-cleaner-interval";
+  private static final long CACHE_CLEANER_INTERVAL_DEFAULT = TimeUnit.SECONDS.toMillis(30);
+  private static final String CACHE_EVICTION_INTERVAL = "iceberg.hive.client-pool-cache-eviction-interval";
+  private static final long CACHE_EVICTION_INTERVAL_DEFAULT = TimeUnit.MINUTES.toMillis(5);
+
+  @VisibleForTesting
+  static final Cache<String, Pair<HiveClientPool, Long>> CLIENT_POOL_CACHE = Caffeine.newBuilder()

Review comment:
       Thanks, @pvary and @rymurr for the reviews. I created a new class to handle all the caching related stuff. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r592610329



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
##########
@@ -604,4 +596,34 @@ public void setConf(Configuration conf) {
   public Configuration getConf() {
     return conf;
   }
+
+  @VisibleForTesting
+  HiveClientPool loadHiveClientPool(Configuration configuration) {
+    String metastoreUri = configuration.get(HiveConf.ConfVars.METASTOREURIS.varname, "");
+    Pair<HiveClientPool, Long> cacheEntry = CLIENT_POOL_CACHE.getIfPresent(metastoreUri);
+    HiveClientPool clientPool = cacheEntry == null ? new HiveClientPool(configuration) : cacheEntry.first();
+    CLIENT_POOL_CACHE.put(metastoreUri, Pair.of(clientPool, System.currentTimeMillis()));
+    return clientPool;
+  }
+
+  private void scheduleCacheCleaner() {
+    if (cleaner == null) {
+      synchronized (HiveCatalog.class) {
+        if (cleaner == null) {
+          cleaner = Executors.newSingleThreadScheduledExecutor(
+                  new ThreadFactoryBuilder()
+                          .setDaemon(true)
+                          .setNameFormat("iceberg-client-pool-cache-cleaner-%d")
+                          .build());
+        }
+        ScheduledFuture<?> futures = cleaner.scheduleWithFixedDelay(() -> CLIENT_POOL_CACHE.asMap().entrySet().stream()
+                .filter(e -> e.getValue().second() + evictionInterval <= System.currentTimeMillis())

Review comment:
       Maybe put the eviction time instead of the pair, and calculate the currentTimeMillis outside of the stream to shave off a few cycles.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] lcspinter commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r595335982



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
##########
@@ -50,25 +55,37 @@
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class HiveCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Configurable {
   private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);
 
+  public static final String CACHE_CLEANER_INTERVAL = "iceberg.hive.client-pool-cache-cleaner-interval";
+  private static final long CACHE_CLEANER_INTERVAL_DEFAULT = TimeUnit.SECONDS.toMillis(30);
+  public static final String CACHE_EVICTION_INTERVAL = "iceberg.hive.client-pool-cache-eviction-interval";
+  private static final long CACHE_EVICTION_INTERVAL_DEFAULT = TimeUnit.MINUTES.toMillis(5);

Review comment:
       @marton-bod There is no particular reason for choosing this value.
   If we keep the ClientPool and its connections open we might end up with a memory leak. (With the removal of the finalize() method nobody is closing them). To tackle this issue, I decided to close every unused ClientPool after a certain amount of time. This threshold is configurable and should be updated based on the load/usage of the cluster. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] StefanXiepj commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
StefanXiepj commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r639376971



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/ClientPoolImpl.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import java.io.Closeable;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ClientPoolImpl<C, E extends Exception> implements Closeable, ClientPool<C, E> {
+  private static final Logger LOG = LoggerFactory.getLogger(ClientPoolImpl.class);
+
+  private final int poolSize;
+  private final Deque<C> clients;
+  private final Class<? extends E> reconnectExc;
+  private final Object signal = new Object();
+  private volatile int currentSize;
+  private boolean closed;
+
+  ClientPoolImpl(int poolSize, Class<? extends E> reconnectExc) {
+    this.poolSize = poolSize;
+    this.reconnectExc = reconnectExc;
+    this.clients = new ArrayDeque<>(poolSize);
+    this.currentSize = 0;
+    this.closed = false;
+  }
+
+  @Override
+  public <R> R run(Action<R, C, E> action) throws E, InterruptedException {
+    C client = get();
+    try {
+      return action.run(client);
+
+    } catch (Exception exc) {
+      if (isConnectionException(exc)) {
+        try {
+          client = reconnect(client);
+        } catch (Exception ignored) {
+          // if reconnection throws any exception, rethrow the original failure
+          throw reconnectExc.cast(exc);
+        }
+
+        return action.run(client);
+      }
+
+      throw exc;
+
+    } finally {
+      release(client);
+    }
+  }
+
+  protected abstract C newClient();
+
+  protected abstract C reconnect(C client);
+
+  protected boolean isConnectionException(Exception exc) {
+    return reconnectExc.isInstance(exc);
+  }
+
+  protected abstract void close(C client);
+
+  @Override
+  public void close() {
+    this.closed = true;
+    try {
+      while (currentSize > 0) {
+        if (!clients.isEmpty()) {
+          synchronized (this) {
+            if (!clients.isEmpty()) {
+              C client = clients.removeFirst();
+              close(client);
+              currentSize -= 1;
+            }
+          }
+        }
+        if (clients.isEmpty() && currentSize > 0) {
+          // wake every second in case this missed the signal
+          synchronized (signal) {
+            signal.wait(1000);
+          }
+        }
+      }
+
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.warn("Interrupted while shutting down pool. Some clients may not be closed.", e);

Review comment:
       @lcspinter  hi, i have a question for u, why not catching InterruptedException in the while-loop and continue to close others client? or thrown exception directly? thx~~~




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] marton-bod commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
marton-bod commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r593308684



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
##########
@@ -547,4 +541,42 @@ public void setConf(Configuration conf) {
   public Configuration getConf() {
     return conf;
   }
+
+  @VisibleForTesting
+  HiveClientPool clientPool() {
+    synchronized (CLIENT_POOL_CACHE) {
+      String metastoreUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, "");
+      Pair<HiveClientPool, Long> cacheEntry = CLIENT_POOL_CACHE.getIfPresent(metastoreUri);
+      HiveClientPool clientPool = cacheEntry == null ? new HiveClientPool(clientPoolSize, conf) : cacheEntry.first();

Review comment:
       Can we restrict the scope of the synchronised block to contain only the cache population part? that way we could decrease the contention on the more frequent cache lookup operation. If `cacheEntry` is null, we enter the block, and re-check if it's been populated by another thread in the meantime




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] lcspinter commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r592697550



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
##########
@@ -604,4 +596,34 @@ public void setConf(Configuration conf) {
   public Configuration getConf() {
     return conf;
   }
+
+  @VisibleForTesting
+  HiveClientPool loadHiveClientPool(Configuration configuration) {
+    String metastoreUri = configuration.get(HiveConf.ConfVars.METASTOREURIS.varname, "");
+    Pair<HiveClientPool, Long> cacheEntry = CLIENT_POOL_CACHE.getIfPresent(metastoreUri);
+    HiveClientPool clientPool = cacheEntry == null ? new HiveClientPool(configuration) : cacheEntry.first();
+    CLIENT_POOL_CACHE.put(metastoreUri, Pair.of(clientPool, System.currentTimeMillis()));
+    return clientPool;
+  }
+
+  private void scheduleCacheCleaner() {
+    if (cleaner == null) {
+      synchronized (HiveCatalog.class) {
+        if (cleaner == null) {
+          cleaner = Executors.newSingleThreadScheduledExecutor(
+                  new ThreadFactoryBuilder()
+                          .setDaemon(true)
+                          .setNameFormat("iceberg-client-pool-cache-cleaner-%d")
+                          .build());
+        }
+        ScheduledFuture<?> futures = cleaner.scheduleWithFixedDelay(() -> CLIENT_POOL_CACHE.asMap().entrySet().stream()
+                .filter(e -> e.getValue().second() + evictionInterval <= System.currentTimeMillis())
+                .forEach(e -> {
+                  HiveClientPool pool = e.getValue().first();
+                  CLIENT_POOL_CACHE.invalidate(e.getKey());
+                  pool.close();

Review comment:
       You are right, I missed this one. We should synchronize on the CLIENT_POOL_CACHE when inserting/deleting entries. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] lcspinter commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r597583276



##########
File path: hive-metastore/src/test/java/org/apache/iceberg/hive/TestCachedClientPool.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestCachedClientPool extends HiveMetastoreTest {
+
+  @Test
+  public void testClientPoolCleaner() throws InterruptedException {
+    String metastoreUri = hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname, "");
+    CachedClientPool clientPool = new CachedClientPool(hiveConf, Collections.emptyMap());
+    HiveClientPool clientPool1 = clientPool.clientPool();
+    Assert.assertTrue(CachedClientPool.clientPoolCache().getIfPresent(metastoreUri) == clientPool1);
+    TimeUnit.SECONDS.sleep(8);
+    HiveClientPool clientPool2 = clientPool.clientPool();
+    Assert.assertTrue(clientPool1 == clientPool2);
+    TimeUnit.SECONDS.sleep(15);

Review comment:
       Good idea. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] lcspinter commented on pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
lcspinter commented on pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#issuecomment-796820310


   @rymurr @pvary @marton-bod Could you please review this PR? Thank you


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] StefanXiepj commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
StefanXiepj commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r639376971



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/ClientPoolImpl.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import java.io.Closeable;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ClientPoolImpl<C, E extends Exception> implements Closeable, ClientPool<C, E> {
+  private static final Logger LOG = LoggerFactory.getLogger(ClientPoolImpl.class);
+
+  private final int poolSize;
+  private final Deque<C> clients;
+  private final Class<? extends E> reconnectExc;
+  private final Object signal = new Object();
+  private volatile int currentSize;
+  private boolean closed;
+
+  ClientPoolImpl(int poolSize, Class<? extends E> reconnectExc) {
+    this.poolSize = poolSize;
+    this.reconnectExc = reconnectExc;
+    this.clients = new ArrayDeque<>(poolSize);
+    this.currentSize = 0;
+    this.closed = false;
+  }
+
+  @Override
+  public <R> R run(Action<R, C, E> action) throws E, InterruptedException {
+    C client = get();
+    try {
+      return action.run(client);
+
+    } catch (Exception exc) {
+      if (isConnectionException(exc)) {
+        try {
+          client = reconnect(client);
+        } catch (Exception ignored) {
+          // if reconnection throws any exception, rethrow the original failure
+          throw reconnectExc.cast(exc);
+        }
+
+        return action.run(client);
+      }
+
+      throw exc;
+
+    } finally {
+      release(client);
+    }
+  }
+
+  protected abstract C newClient();
+
+  protected abstract C reconnect(C client);
+
+  protected boolean isConnectionException(Exception exc) {
+    return reconnectExc.isInstance(exc);
+  }
+
+  protected abstract void close(C client);
+
+  @Override
+  public void close() {
+    this.closed = true;
+    try {
+      while (currentSize > 0) {
+        if (!clients.isEmpty()) {
+          synchronized (this) {
+            if (!clients.isEmpty()) {
+              C client = clients.removeFirst();
+              close(client);
+              currentSize -= 1;
+            }
+          }
+        }
+        if (clients.isEmpty() && currentSize > 0) {
+          // wake every second in case this missed the signal
+          synchronized (signal) {
+            signal.wait(1000);
+          }
+        }
+      }
+
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.warn("Interrupted while shutting down pool. Some clients may not be closed.", e);

Review comment:
       @lcspinter  hi, i have a question for u, why not catching InterruptedException in the while-loop and continue to close others client? or throwing exception directly? thx~~~




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] StefanXiepj commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
StefanXiepj commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r639376971



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/ClientPoolImpl.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import java.io.Closeable;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ClientPoolImpl<C, E extends Exception> implements Closeable, ClientPool<C, E> {
+  private static final Logger LOG = LoggerFactory.getLogger(ClientPoolImpl.class);
+
+  private final int poolSize;
+  private final Deque<C> clients;
+  private final Class<? extends E> reconnectExc;
+  private final Object signal = new Object();
+  private volatile int currentSize;
+  private boolean closed;
+
+  ClientPoolImpl(int poolSize, Class<? extends E> reconnectExc) {
+    this.poolSize = poolSize;
+    this.reconnectExc = reconnectExc;
+    this.clients = new ArrayDeque<>(poolSize);
+    this.currentSize = 0;
+    this.closed = false;
+  }
+
+  @Override
+  public <R> R run(Action<R, C, E> action) throws E, InterruptedException {
+    C client = get();
+    try {
+      return action.run(client);
+
+    } catch (Exception exc) {
+      if (isConnectionException(exc)) {
+        try {
+          client = reconnect(client);
+        } catch (Exception ignored) {
+          // if reconnection throws any exception, rethrow the original failure
+          throw reconnectExc.cast(exc);
+        }
+
+        return action.run(client);
+      }
+
+      throw exc;
+
+    } finally {
+      release(client);
+    }
+  }
+
+  protected abstract C newClient();
+
+  protected abstract C reconnect(C client);
+
+  protected boolean isConnectionException(Exception exc) {
+    return reconnectExc.isInstance(exc);
+  }
+
+  protected abstract void close(C client);
+
+  @Override
+  public void close() {
+    this.closed = true;
+    try {
+      while (currentSize > 0) {
+        if (!clients.isEmpty()) {
+          synchronized (this) {
+            if (!clients.isEmpty()) {
+              C client = clients.removeFirst();
+              close(client);
+              currentSize -= 1;
+            }
+          }
+        }
+        if (clients.isEmpty() && currentSize > 0) {
+          // wake every second in case this missed the signal
+          synchronized (signal) {
+            signal.wait(1000);
+          }
+        }
+      }
+
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.warn("Interrupted while shutting down pool. Some clients may not be closed.", e);

Review comment:
       hi,why not catching InterruptedException in the while-loop and continue to close others client? or thrown exception directly?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r615323627



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/ClientPoolImpl.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import java.io.Closeable;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ClientPoolImpl<C, E extends Exception> implements Closeable, ClientPool<C, E> {

Review comment:
       @lcspinter, @pvary, why was it necessary to create an abstract client pool implementation?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] lcspinter commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r592700768



##########
File path: spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java
##########
@@ -73,9 +73,6 @@ public static void startMetastoreAndSpark() {
 
   @AfterClass
   public static void stopMetastoreAndSpark() {
-    if (catalog != null) {
-      catalog.close();
-    }

Review comment:
       It is nulled out in the next line. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] lcspinter commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r595338438



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
##########
@@ -547,4 +541,42 @@ public void setConf(Configuration conf) {
   public Configuration getConf() {
     return conf;
   }
+
+  @VisibleForTesting
+  HiveClientPool clientPool() {
+    synchronized (CLIENT_POOL_CACHE) {
+      String metastoreUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, "");
+      Pair<HiveClientPool, Long> cacheEntry = CLIENT_POOL_CACHE.getIfPresent(metastoreUri);
+      HiveClientPool clientPool = cacheEntry == null ? new HiveClientPool(clientPoolSize, conf) : cacheEntry.first();
+      CLIENT_POOL_CACHE.put(metastoreUri, Pair.of(clientPool, System.currentTimeMillis() + evictionInterval));

Review comment:
       Good idea. I delegated the complete update/invalidate to the cache. 

##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
##########
@@ -547,4 +541,42 @@ public void setConf(Configuration conf) {
   public Configuration getConf() {
     return conf;
   }
+
+  @VisibleForTesting
+  HiveClientPool clientPool() {
+    synchronized (CLIENT_POOL_CACHE) {
+      String metastoreUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, "");
+      Pair<HiveClientPool, Long> cacheEntry = CLIENT_POOL_CACHE.getIfPresent(metastoreUri);
+      HiveClientPool clientPool = cacheEntry == null ? new HiveClientPool(clientPoolSize, conf) : cacheEntry.first();
+      CLIENT_POOL_CACHE.put(metastoreUri, Pair.of(clientPool, System.currentTimeMillis() + evictionInterval));
+      return clientPool;
+    }
+  }
+
+  private void scheduleCacheCleaner() {
+    if (cleaner == null) {
+      synchronized (HiveCatalog.class) {
+        if (cleaner == null) {
+          cleaner = Executors.newSingleThreadScheduledExecutor(
+                  new ThreadFactoryBuilder()
+                          .setDaemon(true)
+                          .setNameFormat("iceberg-client-pool-cache-cleaner-%d")
+                          .build());
+        }
+        long cleanerInterval = conf.getLong(CACHE_CLEANER_INTERVAL, CACHE_CLEANER_INTERVAL_DEFAULT);
+        ScheduledFuture<?> futures = cleaner.scheduleWithFixedDelay(() -> {
+          synchronized (CLIENT_POOL_CACHE) {
+            long currentTime = System.currentTimeMillis();
+            CLIENT_POOL_CACHE.asMap().entrySet().stream()
+                    .filter(e -> e.getValue().second() <= currentTime)

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] lcspinter commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r596363192



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPoolProvider.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+
+public class HiveClientPoolProvider {

Review comment:
       Refactored it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rymurr commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
rymurr commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r593053021



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
##########
@@ -98,15 +97,16 @@
   private final long lockCheckMinWaitTime;
   private final long lockCheckMaxWaitTime;
   private final FileIO fileIO;
+  private final HiveCatalog catalog;
 
-  protected HiveTableOperations(Configuration conf, HiveClientPool metaClients, FileIO fileIO,
-                                String catalogName, String database, String table) {
+  protected HiveTableOperations(Configuration conf, FileIO fileIO, HiveCatalog catalog,

Review comment:
       Along w/ my comment above I don't think the table ops should get to see the entire catalog, rather just the client pool

##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
##########
@@ -51,36 +55,48 @@
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class HiveCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Configurable {
   private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);
 
+  private static final String CACHE_CLEANER_INTERVAL = "iceberg.hive.client-pool-cache-cleaner-interval";
+  private static final long CACHE_CLEANER_INTERVAL_DEFAULT = TimeUnit.SECONDS.toMillis(30);
+  private static final String CACHE_EVICTION_INTERVAL = "iceberg.hive.client-pool-cache-eviction-interval";
+  private static final long CACHE_EVICTION_INTERVAL_DEFAULT = TimeUnit.MINUTES.toMillis(5);
+
+  @VisibleForTesting
+  static final Cache<String, Pair<HiveClientPool, Long>> CLIENT_POOL_CACHE = Caffeine.newBuilder()

Review comment:
       I agree. From my reading of this PR most of the client pool related stuff is a separate concern to the catalog itself and should be in a separate class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] marton-bod commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
marton-bod commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r597029835



##########
File path: hive-metastore/src/test/java/org/apache/iceberg/hive/TestCachedClientPool.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestCachedClientPool extends HiveMetastoreTest {
+
+  @Test
+  public void testClientPoolCleaner() throws InterruptedException {
+    String metastoreUri = hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname, "");
+    CachedClientPool clientPool = new CachedClientPool(hiveConf, Collections.emptyMap());
+    HiveClientPool clientPool1 = clientPool.clientPool();
+    Assert.assertTrue(CachedClientPool.clientPoolCache().getIfPresent(metastoreUri) == clientPool1);
+    TimeUnit.SECONDS.sleep(8);
+    HiveClientPool clientPool2 = clientPool.clientPool();
+    Assert.assertTrue(clientPool1 == clientPool2);
+    TimeUnit.SECONDS.sleep(15);

Review comment:
       Shall we reference the 10 second eviction interval here from the base class? That way we could replace the magic numbers with stuff like `HiveMetastoreTest.EVICTION_INTERVAL - TimeUnit.Seconds(5).toMillis()` and `HiveMetastoreTest.EVICTION_INTERVAL + TimeUnit.Seconds(5).toMillis()`. What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] lcspinter commented on a change in pull request #2325: Hive: Use HiveClientPool cache instead of HiveCatalog global cache.

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2325:
URL: https://github.com/apache/iceberg/pull/2325#discussion_r595319019



##########
File path: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
##########
@@ -547,4 +541,42 @@ public void setConf(Configuration conf) {
   public Configuration getConf() {
     return conf;
   }
+
+  @VisibleForTesting
+  HiveClientPool clientPool() {
+    synchronized (CLIENT_POOL_CACHE) {
+      String metastoreUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, "");
+      Pair<HiveClientPool, Long> cacheEntry = CLIENT_POOL_CACHE.getIfPresent(metastoreUri);
+      HiveClientPool clientPool = cacheEntry == null ? new HiveClientPool(clientPoolSize, conf) : cacheEntry.first();

Review comment:
       @marton-bod Thanks for the review. I delegated the cache update/cleanup to the cache itself, so now it became threadsafe. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org