You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/04/18 01:00:50 UTC
[iceberg] branch master updated: Core: Move ClientPool and
ClientPoolImpl to core (#2491)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 186a078 Core: Move ClientPool and ClientPoolImpl to core (#2491)
186a078 is described below
commit 186a0786f43a5bfb9d5a199bfa205a58fba6c6bb
Author: ismail simsek <ge...@hotmail.com>
AuthorDate: Sun Apr 18 03:00:41 2021 +0200
Core: Move ClientPool and ClientPoolImpl to core (#2491)
---
.../main/java/org/apache/iceberg}/ClientPool.java | 2 +-
.../java/org/apache/iceberg}/ClientPoolImpl.java | 8 +-
.../org/apache/iceberg/hive/CachedClientPool.java | 1 +
.../java/org/apache/iceberg/hive/HiveCatalog.java | 1 +
.../org/apache/iceberg/hive/HiveClientPool.java | 1 +
.../apache/iceberg/hive/HiveTableOperations.java | 1 +
.../apache/iceberg/hive/TestClientPoolImpl.java | 124 ---------------------
.../apache/iceberg/hive/TestHiveClientPool.java | 98 ++++++++++++++++
8 files changed, 106 insertions(+), 130 deletions(-)
diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/ClientPool.java b/core/src/main/java/org/apache/iceberg/ClientPool.java
similarity index 96%
rename from hive-metastore/src/main/java/org/apache/iceberg/hive/ClientPool.java
rename to core/src/main/java/org/apache/iceberg/ClientPool.java
index 7f32cf9..9aa32bd 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/ClientPool.java
+++ b/core/src/main/java/org/apache/iceberg/ClientPool.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iceberg.hive;
+package org.apache.iceberg;
public interface ClientPool<C, E extends Exception> {
interface Action<R, C, E extends Exception> {
diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/ClientPoolImpl.java b/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
similarity index 94%
rename from hive-metastore/src/main/java/org/apache/iceberg/hive/ClientPoolImpl.java
rename to core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
index d1a44d3..7f31cc9 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/ClientPoolImpl.java
+++ b/core/src/main/java/org/apache/iceberg/ClientPoolImpl.java
@@ -17,12 +17,11 @@
* under the License.
*/
-package org.apache.iceberg.hive;
+package org.apache.iceberg;
import java.io.Closeable;
import java.util.ArrayDeque;
import java.util.Deque;
-import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,7 +36,7 @@ public abstract class ClientPoolImpl<C, E extends Exception> implements Closeabl
private volatile int currentSize;
private boolean closed;
- ClientPoolImpl(int poolSize, Class<? extends E> reconnectExc) {
+ public ClientPoolImpl(int poolSize, Class<? extends E> reconnectExc) {
this.poolSize = poolSize;
this.reconnectExc = reconnectExc;
this.clients = new ArrayDeque<>(poolSize);
@@ -138,8 +137,7 @@ public abstract class ClientPoolImpl<C, E extends Exception> implements Closeabl
}
}
- @VisibleForTesting
- int poolSize() {
+ public int poolSize() {
return poolSize;
}
}
diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/CachedClientPool.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/CachedClientPool.java
index eca505c..0079621 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/CachedClientPool.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/CachedClientPool.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.ClientPool;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.thrift.TException;
diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
index e89f6cd..9b8e0b4 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
@@ -38,6 +38,7 @@ import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.ClientPool;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.Namespace;
diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPool.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
index ef31e3b..fb2deed 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.iceberg.ClientPoolImpl;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.thrift.TException;
diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index 4aa1911..7c45882 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.ClientPool;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.TableMetadata;
diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestClientPoolImpl.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestClientPoolImpl.java
deleted file mode 100644
index c35247d..0000000
--- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestClientPoolImpl.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.hive;
-
-import java.util.List;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.Function;
-import org.apache.hadoop.hive.metastore.api.FunctionType;
-import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.PrincipalType;
-import org.apache.iceberg.AssertHelpers;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.thrift.transport.TTransportException;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-public class TestClientPoolImpl {
-
- HiveClientPool clients;
-
- @Before
- public void before() {
- HiveClientPool clientPool = new HiveClientPool(2, new Configuration());
- clients = Mockito.spy(clientPool);
- }
-
- @After
- public void after() {
- clients.close();
- clients = null;
- }
-
- @Test
- public void testNewClientFailure() {
- Mockito.doThrow(new RuntimeException("Connection exception")).when(clients).newClient();
- AssertHelpers.assertThrows("Should throw exception", RuntimeException.class,
- "Connection exception", () -> clients.run(Object::toString));
- }
-
- @Test
- public void testGetTablesFailsForNonReconnectableException() throws Exception {
- HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class);
- Mockito.doReturn(hmsClient).when(clients).newClient();
- Mockito.doThrow(new MetaException("Another meta exception"))
- .when(hmsClient).getTables(Mockito.anyString(), Mockito.anyString());
- AssertHelpers.assertThrows("Should throw exception", MetaException.class,
- "Another meta exception", () -> clients.run(client -> client.getTables("default", "t")));
- }
-
- @Test
- public void testConnectionFailureRestoreForMetaException() throws Exception {
- HiveMetaStoreClient hmsClient = newClient();
-
- // Throwing an exception may trigger the client to reconnect.
- String metaMessage = "Got exception: org.apache.thrift.transport.TTransportException";
- Mockito.doThrow(new MetaException(metaMessage)).when(hmsClient).getAllDatabases();
-
- // Create a new client when the reconnect method is called.
- HiveMetaStoreClient newClient = reconnect(hmsClient);
-
- List<String> databases = Lists.newArrayList("db1", "db2");
-
- Mockito.doReturn(databases).when(newClient).getAllDatabases();
- // The return is OK when the reconnect method is called.
- Assert.assertEquals(databases, clients.run(client -> client.getAllDatabases()));
-
- // Verify that the method is called.
- Mockito.verify(clients).reconnect(hmsClient);
- Mockito.verify(clients, Mockito.never()).reconnect(newClient);
- }
-
- @Test
- public void testConnectionFailureRestoreForTTransportException() throws Exception {
- HiveMetaStoreClient hmsClient = newClient();
- Mockito.doThrow(new TTransportException()).when(hmsClient).getAllFunctions();
-
- // Create a new client when getAllFunctions() failed.
- HiveMetaStoreClient newClient = reconnect(hmsClient);
-
- GetAllFunctionsResponse response = new GetAllFunctionsResponse();
- response.addToFunctions(
- new Function("concat", "db1", "classname", "root", PrincipalType.USER, 100, FunctionType.JAVA, null));
- Mockito.doReturn(response).when(newClient).getAllFunctions();
-
- Assert.assertEquals(response, clients.run(client -> client.getAllFunctions()));
-
- Mockito.verify(clients).reconnect(hmsClient);
- Mockito.verify(clients, Mockito.never()).reconnect(newClient);
- }
-
- private HiveMetaStoreClient newClient() {
- HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class);
- Mockito.doReturn(hmsClient).when(clients).newClient();
- return hmsClient;
- }
-
- private HiveMetaStoreClient reconnect(HiveMetaStoreClient obsoleteClient) {
- HiveMetaStoreClient newClient = Mockito.mock(HiveMetaStoreClient.class);
- Mockito.doReturn(newClient).when(clients).reconnect(obsoleteClient);
- return newClient;
- }
-}
diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveClientPool.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveClientPool.java
index b7870af..33b4a78 100644
--- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveClientPool.java
+++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveClientPool.java
@@ -23,9 +23,23 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.FunctionType;
+import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
public class TestHiveClientPool {
@@ -38,6 +52,20 @@ public class TestHiveClientPool {
" </property>\n" +
"</configuration>\n";
+ HiveClientPool clients;
+
+ @Before
+ public void before() {
+ HiveClientPool clientPool = new HiveClientPool(2, new Configuration());
+ clients = Mockito.spy(clientPool);
+ }
+
+ @After
+ public void after() {
+ clients.close();
+ clients = null;
+ }
+
@Test
public void testConf() {
HiveConf conf = createHiveConf();
@@ -65,4 +93,74 @@ public class TestHiveClientPool {
}
return hiveConf;
}
+
+ @Test
+ public void testNewClientFailure() {
+ Mockito.doThrow(new RuntimeException("Connection exception")).when(clients).newClient();
+ AssertHelpers.assertThrows("Should throw exception", RuntimeException.class,
+ "Connection exception", () -> clients.run(Object::toString));
+ }
+
+ @Test
+ public void testGetTablesFailsForNonReconnectableException() throws Exception {
+ HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class);
+ Mockito.doReturn(hmsClient).when(clients).newClient();
+ Mockito.doThrow(new MetaException("Another meta exception"))
+ .when(hmsClient).getTables(Mockito.anyString(), Mockito.anyString());
+ AssertHelpers.assertThrows("Should throw exception", MetaException.class,
+ "Another meta exception", () -> clients.run(client -> client.getTables("default", "t")));
+ }
+
+ @Test
+ public void testConnectionFailureRestoreForMetaException() throws Exception {
+ HiveMetaStoreClient hmsClient = newClient();
+
+ // Throwing an exception may trigger the client to reconnect.
+ String metaMessage = "Got exception: org.apache.thrift.transport.TTransportException";
+ Mockito.doThrow(new MetaException(metaMessage)).when(hmsClient).getAllDatabases();
+
+ // Create a new client when the reconnect method is called.
+ HiveMetaStoreClient newClient = reconnect(hmsClient);
+
+ List<String> databases = Lists.newArrayList("db1", "db2");
+
+ Mockito.doReturn(databases).when(newClient).getAllDatabases();
+ // The return is OK when the reconnect method is called.
+ Assert.assertEquals(databases, clients.run(client -> client.getAllDatabases()));
+
+ // Verify that the method is called.
+ Mockito.verify(clients).reconnect(hmsClient);
+ Mockito.verify(clients, Mockito.never()).reconnect(newClient);
+ }
+
+ @Test
+ public void testConnectionFailureRestoreForTTransportException() throws Exception {
+ HiveMetaStoreClient hmsClient = newClient();
+ Mockito.doThrow(new TTransportException()).when(hmsClient).getAllFunctions();
+
+ // Create a new client when getAllFunctions() failed.
+ HiveMetaStoreClient newClient = reconnect(hmsClient);
+
+ GetAllFunctionsResponse response = new GetAllFunctionsResponse();
+ response.addToFunctions(
+ new Function("concat", "db1", "classname", "root", PrincipalType.USER, 100, FunctionType.JAVA, null));
+ Mockito.doReturn(response).when(newClient).getAllFunctions();
+
+ Assert.assertEquals(response, clients.run(client -> client.getAllFunctions()));
+
+ Mockito.verify(clients).reconnect(hmsClient);
+ Mockito.verify(clients, Mockito.never()).reconnect(newClient);
+ }
+
+ private HiveMetaStoreClient newClient() {
+ HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class);
+ Mockito.doReturn(hmsClient).when(clients).newClient();
+ return hmsClient;
+ }
+
+ private HiveMetaStoreClient reconnect(HiveMetaStoreClient obsoleteClient) {
+ HiveMetaStoreClient newClient = Mockito.mock(HiveMetaStoreClient.class);
+ Mockito.doReturn(newClient).when(clients).reconnect(obsoleteClient);
+ return newClient;
+ }
}