You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2020/08/12 03:13:10 UTC

[hbase] 01/02: Revert "BackPort HBASE-11554 Remove Reusable poolmap Rpc client type. (#2208)"

This is an automated email from the ASF dual-hosted git repository.

busbey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit b9c415ac6113b7a07373c0ccfbc5cb2f340410c1
Author: Sean Busbey <bu...@apache.org>
AuthorDate: Tue Aug 11 22:07:46 2020 -0500

    Revert "BackPort HBASE-11554 Remove Reusable poolmap Rpc client type. (#2208)"
    
    incorrect commit message and author
    
    This reverts commit c645cb54e61e2f5d2b61a407cfa03783b129e313.
---
 .../java/org/apache/hadoop/hbase/util/PoolMap.java | 49 +++++++++++-
 .../hadoop/hbase/util/TestReusablePoolMap.java     | 90 ++++++++++++++++++++++
 2 files changed, 138 insertions(+), 1 deletion(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java
index 3cd38f3..f174c96 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java
@@ -228,7 +228,7 @@ public class PoolMap<K, V> implements Map<K, V> {
   }
 
   public enum PoolType {
-    ThreadLocal, RoundRobin;
+    Reusable, ThreadLocal, RoundRobin;
 
     public static PoolType valueOf(String poolTypeName,
         PoolType defaultPoolType, PoolType... allowedPoolTypes) {
@@ -270,6 +270,8 @@ public class PoolMap<K, V> implements Map<K, V> {
 
   protected Pool<V> createPool() {
     switch (poolType) {
+    case Reusable:
+      return new ReusablePool<>(poolMaxSize);
     case RoundRobin:
       return new RoundRobinPool<>(poolMaxSize);
     case ThreadLocal:
@@ -279,6 +281,51 @@ public class PoolMap<K, V> implements Map<K, V> {
   }
 
   /**
+   * The <code>ReusablePool</code> represents a {@link PoolMap.Pool} that builds
+   * on the {@link java.util.LinkedList} class. It essentially allows resources to be
+   * checked out, at which point it is removed from this pool. When the resource
+   * is no longer required, it should be returned to the pool in order to be
+   * reused.
+   *
+   * <p>
+   * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of
+   * the pool is unbounded. Otherwise, it caps the number of consumers that can
+   * check out a resource from this pool to the (non-zero positive) value
+   * specified in {@link #maxSize}.
+   * </p>
+   *
+   * @param <R>
+   *          the type of the resource
+   */
+  @SuppressWarnings("serial")
+  public static class ReusablePool<R> extends ConcurrentLinkedQueue<R> implements Pool<R> {
+    private int maxSize;
+
+    public ReusablePool(int maxSize) {
+      this.maxSize = maxSize;
+
+    }
+
+    @Override
+    public R get() {
+      return poll();
+    }
+
+    @Override
+    public R put(R resource) {
+      if (super.size() < maxSize) {
+        add(resource);
+      }
+      return null;
+    }
+
+    @Override
+    public Collection<R> values() {
+      return this;
+    }
+  }
+
+  /**
    * The <code>RoundRobinPool</code> represents a {@link PoolMap.Pool}, which
    * stores its resources in an {@link ArrayList}. It load-balances access to
    * its resources by returning a different resource every time a given key is
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestReusablePoolMap.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestReusablePoolMap.java
new file mode 100644
index 0000000..3fcaebb
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestReusablePoolMap.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.PoolMap.PoolType;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MiscTests.class, SmallTests.class })
+public class TestReusablePoolMap extends PoolMapTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestReusablePoolMap.class);
+
+  @Override
+  protected PoolType getPoolType() {
+    return PoolType.Reusable;
+  }
+
+  @Test
+  public void testSingleThreadedClient() throws InterruptedException, ExecutionException {
+    Random rand = ThreadLocalRandom.current();
+    String randomKey = String.valueOf(rand.nextInt());
+    String randomValue = String.valueOf(rand.nextInt());
+    // As long as we poll values we put, the pool size should remain zero
+    runThread(randomKey, randomValue, randomValue);
+    assertEquals(0, poolMap.size(randomKey));
+  }
+
+  @Test
+  public void testMultiThreadedClients() throws InterruptedException, ExecutionException {
+    Random rand = ThreadLocalRandom.current();
+    // As long as we poll values we put, the pool size should remain zero
+    for (int i = 0; i < POOL_SIZE; i++) {
+      String randomKey = String.valueOf(rand.nextInt());
+      String randomValue = String.valueOf(rand.nextInt());
+      runThread(randomKey, randomValue, randomValue);
+      assertEquals(0, poolMap.size(randomKey));
+    }
+    poolMap.clear();
+    String randomKey = String.valueOf(rand.nextInt());
+    for (int i = 0; i < POOL_SIZE - 1; i++) {
+      String randomValue = String.valueOf(rand.nextInt());
+      runThread(randomKey, randomValue, randomValue);
+      assertEquals(0, poolMap.size(randomKey));
+    }
+    assertEquals(0, poolMap.size(randomKey));
+  }
+
+  @Test
+  public void testPoolCap() throws InterruptedException, ExecutionException {
+    Random rand = ThreadLocalRandom.current();
+    // As long as we poll values we put, the pool size should remain zero
+    String randomKey = String.valueOf(rand.nextInt());
+    List<String> randomValues = new ArrayList<>();
+    for (int i = 0; i < POOL_SIZE * 2; i++) {
+      String randomValue = String.valueOf(rand.nextInt());
+      randomValues.add(randomValue);
+      runThread(randomKey, randomValue, randomValue);
+    }
+    assertEquals(0, poolMap.size(randomKey));
+  }
+}