You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bb...@apache.org on 2022/06/24 14:47:39 UTC

[hbase] branch master updated: HBASE-27060 Allow sharing connections between AggregationClient instances (#4566)

This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 25c56caec88 HBASE-27060 Allow sharing connections between AggregationClient instances (#4566)
25c56caec88 is described below

commit 25c56caec889b80a6db4d78b3302e81bfb406710
Author: Bryan Beaudreault <bb...@hubspot.com>
AuthorDate: Fri Jun 24 10:47:33 2022 -0400

    HBASE-27060 Allow sharing connections between AggregationClient instances (#4566)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../client/coprocessor/AggregationClient.java      |  73 ++++++++++--
 .../client/coprocessor/TestAggregationClient.java  | 124 +++++++++++++++++++++
 2 files changed, 185 insertions(+), 12 deletions(-)

diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
index c58ff3165c5..51a2e722955 100644
--- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
+++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
@@ -80,6 +80,7 @@ public class AggregationClient implements Closeable {
   // TODO: This class is not used. Move to examples?
   private static final Logger log = LoggerFactory.getLogger(AggregationClient.class);
   private final Connection connection;
+  private final boolean manageConnection;
 
   /**
    * An RpcController implementation for use here in this endpoint.
@@ -129,25 +130,73 @@ public class AggregationClient implements Closeable {
   }
 
   /**
-   * Constructor with Conf object
-   * @param cfg Configuration to use
+   * Creates AggregationClient with no underlying Connection. Users of this constructor should limit
+   * themselves to methods here which take a {@link Table} argument, such as
+   * {@link #rowCount(Table, ColumnInterpreter, Scan)}. Use of methods which instead take a
+   * TableName, such as {@link #rowCount(TableName, ColumnInterpreter, Scan)}, will throw an
+   * IOException.
+   */
+  public AggregationClient() {
+    this(null, false);
+  }
+
+  /**
+   * Creates AggregationClient using the passed in Connection, which will be used by methods taking
+   * a {@link TableName} to create the necessary {@link Table} for the call. The Connection is
+   * externally managed by the caller and will not be closed if {@link #close()} is called. There is
+   * no need to call {@link #close()} for AggregationClients created this way.
+   * @param connection the connection to use
+   */
+  public AggregationClient(Connection connection) {
+    this(connection, false);
+  }
+
+  /**
+   * Creates AggregationClient with internally managed Connection, which will be used by methods
+   * taking a {@link TableName} to create the necessary {@link Table} for the call. The Connection
+   * will immediately be created will be closed when {@link #close()} is called. It's important to
+   * call {@link #close()} when done with this AggregationClient and to otherwise treat it as a
+   * shared Singleton.
+   * @param cfg Configuration to use to create connection
    */
   public AggregationClient(Configuration cfg) {
+    // Create a connection on construction. Will use it making each of the calls below.
+    this(createConnection(cfg), true);
+  }
+
+  private static Connection createConnection(Configuration cfg) {
     try {
-      // Create a connection on construction. Will use it making each of the calls below.
-      this.connection = ConnectionFactory.createConnection(cfg);
+      return ConnectionFactory.createConnection(cfg);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
   }
 
+  private AggregationClient(Connection connection, boolean manageConnection) {
+    this.connection = connection;
+    this.manageConnection = manageConnection;
+  }
+
   @Override
   public void close() throws IOException {
-    if (this.connection != null && !this.connection.isClosed()) {
+    if (manageConnection && this.connection != null && !this.connection.isClosed()) {
       this.connection.close();
     }
   }
 
+  // visible for tests
+  boolean isClosed() {
+    return manageConnection && this.connection != null && this.connection.isClosed();
+  }
+
+  private Connection getConnection() throws IOException {
+    if (connection == null) {
+      throw new IOException(
+        "Connection not initialized. Use the correct constructor, or use the methods taking a Table");
+    }
+    return connection;
+  }
+
   /**
    * It gives the maximum value of a column for a given column family for the given range. In case
    * qualifier is null, a max of all values for the given family is returned.
@@ -161,7 +210,7 @@ public class AggregationClient implements Closeable {
   public <R, S, P extends Message, Q extends Message, T extends Message> R
     max(final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
       throws Throwable {
-    try (Table table = connection.getTable(tableName)) {
+    try (Table table = getConnection().getTable(tableName)) {
       return max(table, ci, scan);
     }
   }
@@ -228,7 +277,7 @@ public class AggregationClient implements Closeable {
   public <R, S, P extends Message, Q extends Message, T extends Message> R
     min(final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
       throws Throwable {
-    try (Table table = connection.getTable(tableName)) {
+    try (Table table = getConnection().getTable(tableName)) {
       return min(table, ci, scan);
     }
   }
@@ -300,7 +349,7 @@ public class AggregationClient implements Closeable {
   public <R, S, P extends Message, Q extends Message, T extends Message> long
     rowCount(final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
       throws Throwable {
-    try (Table table = connection.getTable(tableName)) {
+    try (Table table = getConnection().getTable(tableName)) {
       return rowCount(table, ci, scan);
     }
   }
@@ -370,7 +419,7 @@ public class AggregationClient implements Closeable {
   public <R, S, P extends Message, Q extends Message, T extends Message> S
     sum(final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
       throws Throwable {
-    try (Table table = connection.getTable(tableName)) {
+    try (Table table = getConnection().getTable(tableName)) {
       return sum(table, ci, scan);
     }
   }
@@ -439,7 +488,7 @@ public class AggregationClient implements Closeable {
   private <R, S, P extends Message, Q extends Message, T extends Message> Pair<S, Long> getAvgArgs(
     final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
     throws Throwable {
-    try (Table table = connection.getTable(tableName)) {
+    try (Table table = getConnection().getTable(tableName)) {
       return getAvgArgs(table, ci, scan);
     }
   }
@@ -625,7 +674,7 @@ public class AggregationClient implements Closeable {
    */
   public <R, S, P extends Message, Q extends Message, T extends Message> double std(
     final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
-    try (Table table = connection.getTable(tableName)) {
+    try (Table table = getConnection().getTable(tableName)) {
       return std(table, ci, scan);
     }
   }
@@ -729,7 +778,7 @@ public class AggregationClient implements Closeable {
    */
   public <R, S, P extends Message, Q extends Message, T extends Message> R median(
     final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
-    try (Table table = connection.getTable(tableName)) {
+    try (Table table = getConnection().getTable(tableName)) {
       return median(table, ci, scan);
     }
   }
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/coprocessor/TestAggregationClient.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/coprocessor/TestAggregationClient.java
new file mode 100644
index 00000000000..2d8419c12cb
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/coprocessor/TestAggregationClient.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.coprocessor;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.AggregateImplementation;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, CoprocessorTests.class })
+public class TestAggregationClient {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestAggregationClient.class);
+
+  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
+
+  private static final TableName TABLE_NAME = TableName.valueOf("TestAggregationClient");
+
+  private static final byte[] CF = Bytes.toBytes("CF");
+
+  private static Connection CONN;
+
+  private static Table TABLE;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    Configuration conf = UTIL.getConfiguration();
+    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+      AggregateImplementation.class.getName());
+    UTIL.startMiniCluster(1);
+    UTIL.createTable(TABLE_NAME, CF);
+    CONN = ConnectionFactory.createConnection(conf);
+    TABLE = CONN.getTable(TABLE_NAME);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    CONN.close();
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void itCreatesConnectionless() throws Throwable {
+    AggregationClient client = new AggregationClient();
+    assertFalse(client.isClosed());
+
+    try {
+      client.rowCount(TABLE_NAME, new LongColumnInterpreter(), new Scan());
+      fail("Expected IOException");
+    } catch (Throwable e) {
+      assertTrue(e instanceof IOException);
+      assertTrue(e.getMessage().contains("Connection not initialized"));
+    }
+
+    client.rowCount(TABLE, new LongColumnInterpreter(), new Scan());
+
+    client.close();
+    assertFalse(CONN.isClosed());
+    assertFalse(client.isClosed());
+
+  }
+
+  @Test
+  public void itCreatesExternalConnection() throws Throwable {
+    AggregationClient client = new AggregationClient(CONN);
+    assertFalse(client.isClosed());
+
+    client.rowCount(TABLE_NAME, new LongColumnInterpreter(), new Scan());
+    client.rowCount(TABLE, new LongColumnInterpreter(), new Scan());
+
+    client.close();
+    assertFalse(CONN.isClosed());
+    assertFalse(client.isClosed());
+  }
+
+  @Test
+  public void itCreatesManagedConnection() throws Throwable {
+    AggregationClient client = new AggregationClient(CONN.getConfiguration());
+    assertFalse(client.isClosed());
+
+    client.rowCount(TABLE_NAME, new LongColumnInterpreter(), new Scan());
+    client.rowCount(TABLE, new LongColumnInterpreter(), new Scan());
+
+    client.close();
+    assertFalse(CONN.isClosed());
+    assertTrue(client.isClosed());
+  }
+}