You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/03/24 18:41:25 UTC

[iceberg] 10/18: Hive: Fix connection pool fails to reconnect to the Hive Metastore #1994 (#2119)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch 0.11.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 49b944257ec59e6886119e9b44184ec0e1818d4a
Author: Carm <15...@qq.com>
AuthorDate: Tue Feb 9 16:16:59 2021 +0800

    Hive: Fix connection pool fails to reconnect to the Hive Metastore #1994 (#2119)
---
 .../java/org/apache/iceberg/hive/ClientPool.java   |   6 +-
 .../org/apache/iceberg/hive/HiveClientPool.java    |   6 ++
 .../org/apache/iceberg/hive/TestClientPool.java    | 110 +++++++++++++++++----
 3 files changed, 101 insertions(+), 21 deletions(-)

diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/ClientPool.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/ClientPool.java
index 85ce587..7fe9b66 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/ClientPool.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/ClientPool.java
@@ -54,7 +54,7 @@ public abstract class ClientPool<C, E extends Exception> implements Closeable {
       return action.run(client);
 
     } catch (Exception exc) {
-      if (reconnectExc.isInstance(exc)) {
+      if (isConnectionException(exc)) {
         try {
           client = reconnect(client);
         } catch (Exception ignored) {
@@ -76,6 +76,10 @@ public abstract class ClientPool<C, E extends Exception> implements Closeable {
 
   protected abstract C reconnect(C client);
 
+  protected boolean isConnectionException(Exception exc) {
+    return reconnectExc.isInstance(exc);
+  }
+
   protected abstract void close(C client);
 
   @Override
diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPool.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
index 1df705b..e6d70cb 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
@@ -84,6 +84,12 @@ public class HiveClientPool extends ClientPool<HiveMetaStoreClient, TException>
   }
 
   @Override
+  protected boolean isConnectionException(Exception e) {
+    return super.isConnectionException(e) || (e != null && e instanceof MetaException &&
+        e.getMessage().contains("Got exception: org.apache.thrift.transport.TTransportException"));
+  }
+
+  @Override
   protected void close(HiveMetaStoreClient client) {
     client.close();
   }
diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestClientPool.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestClientPool.java
index cdf59d8..f88f385 100644
--- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestClientPool.java
+++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestClientPool.java
@@ -19,36 +19,106 @@
 
 package org.apache.iceberg.hive;
 
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.FunctionType;
+import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 public class TestClientPool {
 
-  @Test(expected = RuntimeException.class)
-  public void testNewClientFailure() throws Exception {
-    try (MockClientPool pool = new MockClientPool(2, Exception.class)) {
-      pool.run(Object::toString);
-    }
+  HiveClientPool clients;
+
+  @Before
+  public void before() {
+    HiveClientPool clientPool = new HiveClientPool(2, new Configuration());
+    clients = Mockito.spy(clientPool);
   }
 
-  private static class MockClientPool extends ClientPool<Object, Exception> {
+  @After
+  public void after() {
+    clients.close();
+    clients = null;
+  }
 
-    MockClientPool(int poolSize, Class<? extends Exception> reconnectExc) {
-      super(poolSize, reconnectExc);
-    }
+  @Test
+  public void testNewClientFailure() {
+    Mockito.doThrow(new RuntimeException("Connection exception")).when(clients).newClient();
+    AssertHelpers.assertThrows("Should throw exception", RuntimeException.class,
+        "Connection exception", () -> clients.run(Object::toString));
+  }
+
+  @Test
+  public void testGetTablesFailsForNonReconnectableException() throws Exception {
+    HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class);
+    Mockito.doReturn(hmsClient).when(clients).newClient();
+    Mockito.doThrow(new MetaException("Another meta exception"))
+      .when(hmsClient).getTables(Mockito.anyString(), Mockito.anyString());
+    AssertHelpers.assertThrows("Should throw exception", MetaException.class,
+        "Another meta exception", () -> clients.run(client -> client.getTables("default", "t")));
+  }
 
-    @Override
-    protected Object newClient() {
-      throw new RuntimeException();
-    }
+  @Test
+  public void testConnectionFailureRestoreForMetaException() throws Exception {
+    HiveMetaStoreClient hmsClient = newClient();
 
-    @Override
-    protected Object reconnect(Object client) {
-      return null;
-    }
+    // Throwing an exception may trigger the client to reconnect.
+    String metaMessage = "Got exception: org.apache.thrift.transport.TTransportException";
+    Mockito.doThrow(new MetaException(metaMessage)).when(hmsClient).getAllDatabases();
 
-    @Override
-    protected void close(Object client) {
+    // Create a new client when the reconnect method is called.
+    HiveMetaStoreClient newClient = reconnect(hmsClient);
+
+    List<String> databases = Lists.newArrayList("db1", "db2");
+
+    Mockito.doReturn(databases).when(newClient).getAllDatabases();
+    // The return is OK when the reconnect method is called.
+    Assert.assertEquals(databases, clients.run(client -> client.getAllDatabases()));
+
+    // Verify that the method is called.
+    Mockito.verify(clients).reconnect(hmsClient);
+    Mockito.verify(clients, Mockito.never()).reconnect(newClient);
+  }
+
+  @Test
+  public void testConnectionFailureRestoreForTTransportException() throws Exception {
+    HiveMetaStoreClient hmsClient = newClient();
+    Mockito.doThrow(new TTransportException()).when(hmsClient).getAllFunctions();
+
+    // Create a new client when getAllFunctions() failed.
+    HiveMetaStoreClient newClient = reconnect(hmsClient);
+
+    GetAllFunctionsResponse response = new GetAllFunctionsResponse();
+    response.addToFunctions(
+        new Function("concat", "db1", "classname", "root", PrincipalType.USER, 100, FunctionType.JAVA, null));
+    Mockito.doReturn(response).when(newClient).getAllFunctions();
+
+    Assert.assertEquals(response, clients.run(client -> client.getAllFunctions()));
+
+    Mockito.verify(clients).reconnect(hmsClient);
+    Mockito.verify(clients, Mockito.never()).reconnect(newClient);
+  }
+
+  private HiveMetaStoreClient newClient() {
+    HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class);
+    Mockito.doReturn(hmsClient).when(clients).newClient();
+    return hmsClient;
+  }
 
-    }
+  private HiveMetaStoreClient reconnect(HiveMetaStoreClient obsoleteClient) {
+    HiveMetaStoreClient newClient = Mockito.mock(HiveMetaStoreClient.class);
+    Mockito.doReturn(newClient).when(clients).reconnect(obsoleteClient);
+    return newClient;
   }
 }