You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/03/24 18:41:25 UTC
[iceberg] 10/18: Hive: Fix connection pool fails to reconnect to
the Hive Metastore #1994 (#2119)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch 0.11.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit 49b944257ec59e6886119e9b44184ec0e1818d4a
Author: Carm <15...@qq.com>
AuthorDate: Tue Feb 9 16:16:59 2021 +0800
Hive: Fix connection pool fails to reconnect to the Hive Metastore #1994 (#2119)
---
.../java/org/apache/iceberg/hive/ClientPool.java | 6 +-
.../org/apache/iceberg/hive/HiveClientPool.java | 6 ++
.../org/apache/iceberg/hive/TestClientPool.java | 110 +++++++++++++++++----
3 files changed, 101 insertions(+), 21 deletions(-)
diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/ClientPool.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/ClientPool.java
index 85ce587..7fe9b66 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/ClientPool.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/ClientPool.java
@@ -54,7 +54,7 @@ public abstract class ClientPool<C, E extends Exception> implements Closeable {
return action.run(client);
} catch (Exception exc) {
- if (reconnectExc.isInstance(exc)) {
+ if (isConnectionException(exc)) {
try {
client = reconnect(client);
} catch (Exception ignored) {
@@ -76,6 +76,10 @@ public abstract class ClientPool<C, E extends Exception> implements Closeable {
protected abstract C reconnect(C client);
+ protected boolean isConnectionException(Exception exc) {
+ return reconnectExc.isInstance(exc);
+ }
+
protected abstract void close(C client);
@Override
diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPool.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
index 1df705b..e6d70cb 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
@@ -84,6 +84,12 @@ public class HiveClientPool extends ClientPool<HiveMetaStoreClient, TException>
}
@Override
+ protected boolean isConnectionException(Exception e) {
+ return super.isConnectionException(e) || (e != null && e instanceof MetaException &&
+ e.getMessage().contains("Got exception: org.apache.thrift.transport.TTransportException"));
+ }
+
+ @Override
protected void close(HiveMetaStoreClient client) {
client.close();
}
diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestClientPool.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestClientPool.java
index cdf59d8..f88f385 100644
--- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestClientPool.java
+++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestClientPool.java
@@ -19,36 +19,106 @@
package org.apache.iceberg.hive;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.FunctionType;
+import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
public class TestClientPool {
- @Test(expected = RuntimeException.class)
- public void testNewClientFailure() throws Exception {
- try (MockClientPool pool = new MockClientPool(2, Exception.class)) {
- pool.run(Object::toString);
- }
+ HiveClientPool clients;
+
+ @Before
+ public void before() {
+ HiveClientPool clientPool = new HiveClientPool(2, new Configuration());
+ clients = Mockito.spy(clientPool);
}
- private static class MockClientPool extends ClientPool<Object, Exception> {
+ @After
+ public void after() {
+ clients.close();
+ clients = null;
+ }
- MockClientPool(int poolSize, Class<? extends Exception> reconnectExc) {
- super(poolSize, reconnectExc);
- }
+ @Test
+ public void testNewClientFailure() {
+ Mockito.doThrow(new RuntimeException("Connection exception")).when(clients).newClient();
+ AssertHelpers.assertThrows("Should throw exception", RuntimeException.class,
+ "Connection exception", () -> clients.run(Object::toString));
+ }
+
+ @Test
+ public void testGetTablesFailsForNonReconnectableException() throws Exception {
+ HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class);
+ Mockito.doReturn(hmsClient).when(clients).newClient();
+ Mockito.doThrow(new MetaException("Another meta exception"))
+ .when(hmsClient).getTables(Mockito.anyString(), Mockito.anyString());
+ AssertHelpers.assertThrows("Should throw exception", MetaException.class,
+ "Another meta exception", () -> clients.run(client -> client.getTables("default", "t")));
+ }
- @Override
- protected Object newClient() {
- throw new RuntimeException();
- }
+ @Test
+ public void testConnectionFailureRestoreForMetaException() throws Exception {
+ HiveMetaStoreClient hmsClient = newClient();
- @Override
- protected Object reconnect(Object client) {
- return null;
- }
+ // Throwing an exception may trigger the client to reconnect.
+ String metaMessage = "Got exception: org.apache.thrift.transport.TTransportException";
+ Mockito.doThrow(new MetaException(metaMessage)).when(hmsClient).getAllDatabases();
- @Override
- protected void close(Object client) {
+ // Create a new client when the reconnect method is called.
+ HiveMetaStoreClient newClient = reconnect(hmsClient);
+
+ List<String> databases = Lists.newArrayList("db1", "db2");
+
+ Mockito.doReturn(databases).when(newClient).getAllDatabases();
+ // The return is OK when the reconnect method is called.
+ Assert.assertEquals(databases, clients.run(client -> client.getAllDatabases()));
+
+ // Verify that the method is called.
+ Mockito.verify(clients).reconnect(hmsClient);
+ Mockito.verify(clients, Mockito.never()).reconnect(newClient);
+ }
+
+ @Test
+ public void testConnectionFailureRestoreForTTransportException() throws Exception {
+ HiveMetaStoreClient hmsClient = newClient();
+ Mockito.doThrow(new TTransportException()).when(hmsClient).getAllFunctions();
+
+ // Create a new client when getAllFunctions() failed.
+ HiveMetaStoreClient newClient = reconnect(hmsClient);
+
+ GetAllFunctionsResponse response = new GetAllFunctionsResponse();
+ response.addToFunctions(
+ new Function("concat", "db1", "classname", "root", PrincipalType.USER, 100, FunctionType.JAVA, null));
+ Mockito.doReturn(response).when(newClient).getAllFunctions();
+
+ Assert.assertEquals(response, clients.run(client -> client.getAllFunctions()));
+
+ Mockito.verify(clients).reconnect(hmsClient);
+ Mockito.verify(clients, Mockito.never()).reconnect(newClient);
+ }
+
+ private HiveMetaStoreClient newClient() {
+ HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class);
+ Mockito.doReturn(hmsClient).when(clients).newClient();
+ return hmsClient;
+ }
- }
+ private HiveMetaStoreClient reconnect(HiveMetaStoreClient obsoleteClient) {
+ HiveMetaStoreClient newClient = Mockito.mock(HiveMetaStoreClient.class);
+ Mockito.doReturn(newClient).when(clients).reconnect(obsoleteClient);
+ return newClient;
}
}