You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bb...@apache.org on 2022/06/24 14:47:39 UTC
[hbase] branch master updated: HBASE-27060 Allow sharing connections between AggregationClient instances (#4566)
This is an automated email from the ASF dual-hosted git repository.
bbeaudreault pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 25c56caec88 HBASE-27060 Allow sharing connections between AggregationClient instances (#4566)
25c56caec88 is described below
commit 25c56caec889b80a6db4d78b3302e81bfb406710
Author: Bryan Beaudreault <bb...@hubspot.com>
AuthorDate: Fri Jun 24 10:47:33 2022 -0400
HBASE-27060 Allow sharing connections between AggregationClient instances (#4566)
Signed-off-by: Duo Zhang <zh...@apache.org>
---
.../client/coprocessor/AggregationClient.java | 73 ++++++++++--
.../client/coprocessor/TestAggregationClient.java | 124 +++++++++++++++++++++
2 files changed, 185 insertions(+), 12 deletions(-)
diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
index c58ff3165c5..51a2e722955 100644
--- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
+++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
@@ -80,6 +80,7 @@ public class AggregationClient implements Closeable {
// TODO: This class is not used. Move to examples?
private static final Logger log = LoggerFactory.getLogger(AggregationClient.class);
private final Connection connection;
+ private final boolean manageConnection;
/**
* An RpcController implementation for use here in this endpoint.
@@ -129,25 +130,73 @@ public class AggregationClient implements Closeable {
}
/**
- * Constructor with Conf object
- * @param cfg Configuration to use
+ * Creates AggregationClient with no underlying Connection. Users of this constructor should limit
+ * themselves to methods here which take a {@link Table} argument, such as
+ * {@link #rowCount(Table, ColumnInterpreter, Scan)}. Use of methods which instead take a
+ * TableName, such as {@link #rowCount(TableName, ColumnInterpreter, Scan)}, will throw an
+ * IOException.
+ */
+ public AggregationClient() {
+ this(null, false);
+ }
+
+ /**
+ * Creates AggregationClient using the passed in Connection, which will be used by methods taking
+ * a {@link TableName} to create the necessary {@link Table} for the call. The Connection is
+ * externally managed by the caller and will not be closed if {@link #close()} is called. There is
+ * no need to call {@link #close()} for AggregationClients created this way.
+ * @param connection the connection to use
+ */
+ public AggregationClient(Connection connection) {
+ this(connection, false);
+ }
+
+ /**
+ * Creates AggregationClient with internally managed Connection, which will be used by methods
+ * taking a {@link TableName} to create the necessary {@link Table} for the call. The Connection
+ * will immediately be created will be closed when {@link #close()} is called. It's important to
+ * call {@link #close()} when done with this AggregationClient and to otherwise treat it as a
+ * shared Singleton.
+ * @param cfg Configuration to use to create connection
*/
public AggregationClient(Configuration cfg) {
+ // Create a connection on construction. Will use it making each of the calls below.
+ this(createConnection(cfg), true);
+ }
+
+ private static Connection createConnection(Configuration cfg) {
try {
- // Create a connection on construction. Will use it making each of the calls below.
- this.connection = ConnectionFactory.createConnection(cfg);
+ return ConnectionFactory.createConnection(cfg);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
+ private AggregationClient(Connection connection, boolean manageConnection) {
+ this.connection = connection;
+ this.manageConnection = manageConnection;
+ }
+
@Override
public void close() throws IOException {
- if (this.connection != null && !this.connection.isClosed()) {
+ if (manageConnection && this.connection != null && !this.connection.isClosed()) {
this.connection.close();
}
}
+ // visible for tests
+ boolean isClosed() {
+ return manageConnection && this.connection != null && this.connection.isClosed();
+ }
+
+ private Connection getConnection() throws IOException {
+ if (connection == null) {
+ throw new IOException(
+ "Connection not initialized. Use the correct constructor, or use the methods taking a Table");
+ }
+ return connection;
+ }
+
/**
* It gives the maximum value of a column for a given column family for the given range. In case
* qualifier is null, a max of all values for the given family is returned.
@@ -161,7 +210,7 @@ public class AggregationClient implements Closeable {
public <R, S, P extends Message, Q extends Message, T extends Message> R
max(final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
throws Throwable {
- try (Table table = connection.getTable(tableName)) {
+ try (Table table = getConnection().getTable(tableName)) {
return max(table, ci, scan);
}
}
@@ -228,7 +277,7 @@ public class AggregationClient implements Closeable {
public <R, S, P extends Message, Q extends Message, T extends Message> R
min(final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
throws Throwable {
- try (Table table = connection.getTable(tableName)) {
+ try (Table table = getConnection().getTable(tableName)) {
return min(table, ci, scan);
}
}
@@ -300,7 +349,7 @@ public class AggregationClient implements Closeable {
public <R, S, P extends Message, Q extends Message, T extends Message> long
rowCount(final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
throws Throwable {
- try (Table table = connection.getTable(tableName)) {
+ try (Table table = getConnection().getTable(tableName)) {
return rowCount(table, ci, scan);
}
}
@@ -370,7 +419,7 @@ public class AggregationClient implements Closeable {
public <R, S, P extends Message, Q extends Message, T extends Message> S
sum(final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
throws Throwable {
- try (Table table = connection.getTable(tableName)) {
+ try (Table table = getConnection().getTable(tableName)) {
return sum(table, ci, scan);
}
}
@@ -439,7 +488,7 @@ public class AggregationClient implements Closeable {
private <R, S, P extends Message, Q extends Message, T extends Message> Pair<S, Long> getAvgArgs(
final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
throws Throwable {
- try (Table table = connection.getTable(tableName)) {
+ try (Table table = getConnection().getTable(tableName)) {
return getAvgArgs(table, ci, scan);
}
}
@@ -625,7 +674,7 @@ public class AggregationClient implements Closeable {
*/
public <R, S, P extends Message, Q extends Message, T extends Message> double std(
final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
- try (Table table = connection.getTable(tableName)) {
+ try (Table table = getConnection().getTable(tableName)) {
return std(table, ci, scan);
}
}
@@ -729,7 +778,7 @@ public class AggregationClient implements Closeable {
*/
public <R, S, P extends Message, Q extends Message, T extends Message> R median(
final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
- try (Table table = connection.getTable(tableName)) {
+ try (Table table = getConnection().getTable(tableName)) {
return median(table, ci, scan);
}
}
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/coprocessor/TestAggregationClient.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/coprocessor/TestAggregationClient.java
new file mode 100644
index 00000000000..2d8419c12cb
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/coprocessor/TestAggregationClient.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.coprocessor;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.AggregateImplementation;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, CoprocessorTests.class })
+public class TestAggregationClient {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestAggregationClient.class);
+
+ private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
+
+ private static final TableName TABLE_NAME = TableName.valueOf("TestAggregationClient");
+
+ private static final byte[] CF = Bytes.toBytes("CF");
+
+ private static Connection CONN;
+
+ private static Table TABLE;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ Configuration conf = UTIL.getConfiguration();
+ conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ AggregateImplementation.class.getName());
+ UTIL.startMiniCluster(1);
+ UTIL.createTable(TABLE_NAME, CF);
+ CONN = ConnectionFactory.createConnection(conf);
+ TABLE = CONN.getTable(TABLE_NAME);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ CONN.close();
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void itCreatesConnectionless() throws Throwable {
+ AggregationClient client = new AggregationClient();
+ assertFalse(client.isClosed());
+
+ try {
+ client.rowCount(TABLE_NAME, new LongColumnInterpreter(), new Scan());
+ fail("Expected IOException");
+ } catch (Throwable e) {
+ assertTrue(e instanceof IOException);
+ assertTrue(e.getMessage().contains("Connection not initialized"));
+ }
+
+ client.rowCount(TABLE, new LongColumnInterpreter(), new Scan());
+
+ client.close();
+ assertFalse(CONN.isClosed());
+ assertFalse(client.isClosed());
+
+ }
+
+ @Test
+ public void itCreatesExternalConnection() throws Throwable {
+ AggregationClient client = new AggregationClient(CONN);
+ assertFalse(client.isClosed());
+
+ client.rowCount(TABLE_NAME, new LongColumnInterpreter(), new Scan());
+ client.rowCount(TABLE, new LongColumnInterpreter(), new Scan());
+
+ client.close();
+ assertFalse(CONN.isClosed());
+ assertFalse(client.isClosed());
+ }
+
+ @Test
+ public void itCreatesManagedConnection() throws Throwable {
+ AggregationClient client = new AggregationClient(CONN.getConfiguration());
+ assertFalse(client.isClosed());
+
+ client.rowCount(TABLE_NAME, new LongColumnInterpreter(), new Scan());
+ client.rowCount(TABLE, new LongColumnInterpreter(), new Scan());
+
+ client.close();
+ assertFalse(CONN.isClosed());
+ assertTrue(client.isClosed());
+ }
+}