You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/04/19 20:37:42 UTC

svn commit: r1095163 - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/util/PoolMap.java test/java/org/apache/hadoop/hbase/util/TestPoolMap.java

Author: stack
Date: Tue Apr 19 18:37:42 2011
New Revision: 1095163

URL: http://svn.apache.org/viewvc?rev=1095163&view=rev
Log:
HBASE-2939 Allow Client-Side Connection Pooling

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/util/TestPoolMap.java

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java?rev=1095163&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java Tue Apr 19 18:37:42 2011
@@ -0,0 +1,431 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ *
+ * The <code>PoolMap</code> maps a key to a collection of values, the elements
+ * of which are managed by a pool. In effect, that collection acts as a shared
+ * pool of resources, access to which is closely controlled as per the semantics
+ * of the pool.
+ *
+ * <p>
+ * In case the size of the pool is set to a non-zero positive number, that is
+ * used to cap the number of resources that a pool may contain for any given
+ * key. A size of {@link Integer#MAX_VALUE} is interpreted as an unbounded pool.
+ * </p>
+ *
+ * @param <K>
+ *          the type of the key to the resource
+ * @param <V>
+ *          the type of the resource being pooled
+ */
+public class PoolMap<K, V> implements Map<K, V> {
+  private PoolType poolType;
+
+  private int poolMaxSize;
+
+  private Map<K, Pool<V>> pools = Collections
+      .synchronizedMap(new HashMap<K, Pool<V>>());
+
+  public PoolMap(PoolType poolType, int poolMaxSize) {
+    this.poolType = poolType;
+    this.poolMaxSize = poolMaxSize;
+  }
+
+  @Override
+  public V get(Object key) {
+    Pool<V> pool = pools.get(key);
+    return pool != null ? pool.get() : null;
+  }
+
+  @Override
+  public V put(K key, V value) {
+    Pool<V> pool = pools.get(key);
+    if (pool == null) {
+      pools.put(key, pool = createPool());
+    }
+    return pool != null ? pool.put(value) : null;
+  }
+
+  @Override
+  public V remove(Object key) {
+    Pool<V> pool = pools.remove(key);
+    if (pool != null) {
+      pool.clear();
+    }
+    return null;
+  }
+
+  public boolean remove(K key, V value) {
+    Pool<V> pool = pools.get(key);
+    return pool != null ? pool.remove(value) : false;
+  }
+
+  @Override
+  public Collection<V> values() {
+    Collection<V> values = new ArrayList<V>();
+    for (Pool<V> pool : pools.values()) {
+      Collection<V> poolValues = pool.values();
+      if (poolValues != null) {
+        values.addAll(poolValues);
+      }
+    }
+    return values;
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return pools.isEmpty();
+  }
+
+  @Override
+  public int size() {
+    return pools.size();
+  }
+
+  public int size(K key) {
+    Pool<V> pool = pools.get(key);
+    return pool != null ? pool.size() : 0;
+  }
+
+  @Override
+  public boolean containsKey(Object key) {
+    return pools.containsKey(key);
+  }
+
+  @Override
+  public boolean containsValue(Object value) {
+    if (value == null) {
+      return false;
+    }
+    for (Pool<V> pool : pools.values()) {
+      if (value.equals(pool.get())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public void putAll(Map<? extends K, ? extends V> map) {
+    for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) {
+      put(entry.getKey(), entry.getValue());
+    }
+  }
+
+  @Override
+  public void clear() {
+    for (Pool<V> pool : pools.values()) {
+      pool.clear();
+    }
+    pools.clear();
+  }
+
+  @Override
+  public Set<K> keySet() {
+    return pools.keySet();
+  }
+
+  @Override
+  public Set<Map.Entry<K, V>> entrySet() {
+    Set<Map.Entry<K, V>> entries = new HashSet<Entry<K, V>>();
+    for (Map.Entry<K, Pool<V>> poolEntry : pools.entrySet()) {
+      final K poolKey = poolEntry.getKey();
+      final Pool<V> pool = poolEntry.getValue();
+      for (final V poolValue : pool.values()) {
+        if (pool != null) {
+          entries.add(new Map.Entry<K, V>() {
+            @Override
+            public K getKey() {
+              return poolKey;
+            }
+
+            @Override
+            public V getValue() {
+              return poolValue;
+            }
+
+            @Override
+            public V setValue(V value) {
+              return pool.put(value);
+            }
+          });
+        }
+      }
+    }
+    return null;
+  }
+
+  protected interface Pool<R> {
+    public R get();
+
+    public R put(R resource);
+
+    public boolean remove(R resource);
+
+    public void clear();
+
+    public Collection<R> values();
+
+    public int size();
+  }
+
+  public enum PoolType {
+    Reusable, ThreadLocal, RoundRobin;
+
+    public static PoolType valueOf(String poolTypeName,
+        PoolType defaultPoolType, PoolType... allowedPoolTypes) {
+      PoolType poolType = PoolType.fuzzyMatch(poolTypeName);
+      if (poolType != null) {
+        boolean allowedType = false;
+        if (poolType.equals(defaultPoolType)) {
+          allowedType = true;
+        } else {
+          if (allowedPoolTypes != null) {
+            for (PoolType allowedPoolType : allowedPoolTypes) {
+              if (poolType.equals(allowedPoolType)) {
+                allowedType = true;
+                break;
+              }
+            }
+          }
+        }
+        if (!allowedType) {
+          poolType = null;
+        }
+      }
+      return (poolType != null) ? poolType : defaultPoolType;
+    }
+
+    public static String fuzzyNormalize(String name) {
+      return name != null ? name.replaceAll("-", "").trim().toLowerCase() : "";
+    }
+
+    public static PoolType fuzzyMatch(String name) {
+      for (PoolType poolType : values()) {
+        if (fuzzyNormalize(name).equals(fuzzyNormalize(poolType.name()))) {
+          return poolType;
+        }
+      }
+      return null;
+    }
+  }
+
+  protected Pool<V> createPool() {
+    switch (poolType) {
+    case Reusable:
+      return new ReusablePool<V>(poolMaxSize);
+    case RoundRobin:
+      return new RoundRobinPool<V>(poolMaxSize);
+    case ThreadLocal:
+      return new ThreadLocalPool<V>(poolMaxSize);
+    }
+    return null;
+  }
+
+  /**
+   * The <code>ReusablePool</code> represents a {@link PoolMap.Pool} that builds
+   * on the {@link LinkedList} class. It essentially allows resources to be
+   * checked out, at which point it is removed from this pool. When the resource
+   * is no longer required, it should be returned to the pool in order to be
+   * reused.
+   *
+   * <p>
+   * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of
+   * the pool is unbounded. Otherwise, it caps the number of consumers that can
+   * check out a resource from this pool to the (non-zero positive) value
+   * specified in {@link #maxSize}.
+   * </p>
+   *
+   * @param <R>
+   *          the type of the resource
+   */
+  @SuppressWarnings("serial")
+  public class ReusablePool<R> extends LinkedList<R> implements Pool<R> {
+    private int maxSize;
+
+    public ReusablePool(int maxSize) {
+      this.maxSize = maxSize;
+
+    }
+
+    @Override
+    public R get() {
+      return poll();
+    }
+
+    @Override
+    public R put(R resource) {
+      if (size() < maxSize) {
+        add(resource);
+      }
+      return null;
+    }
+
+    @Override
+    public Collection<R> values() {
+      return this;
+    }
+  }
+
+  /**
+   * The <code>RoundRobinPool</code> represents a {@link PoolMap.Pool}, which
+   * stores its resources in an {@link ArrayList}. It load-balances access to
+   * its resources by returning a different resource every time a given key is
+   * looked up.
+   *
+   * <p>
+   * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of
+   * the pool is unbounded. Otherwise, it caps the number of resources in this
+   * pool to the (non-zero positive) value specified in {@link #maxSize}.
+   * </p>
+   *
+   * @param <R>
+   *          the type of the resource
+   *
+   */
+  @SuppressWarnings("serial")
+  class RoundRobinPool<R> extends ArrayList<R> implements Pool<R> {
+    private int maxSize;
+    private int nextResource = 0;
+
+    public RoundRobinPool(int maxSize) {
+      this.maxSize = maxSize;
+    }
+
+    @Override
+    public R put(R resource) {
+      if (size() < maxSize) {
+        add(resource);
+      }
+      return null;
+    }
+
+    @Override
+    public R get() {
+      if (size() < maxSize) {
+        return null;
+      }
+      nextResource %= size();
+      R resource = get(nextResource++);
+      return resource;
+    }
+
+    @Override
+    public Collection<R> values() {
+      return this;
+    }
+
+  }
+
+  /**
+   * The <code>ThreadLocalPool</code> represents a {@link PoolMap.Pool} that
+   * builds on the {@link ThreadLocal} class. It essentially binds the resource
+   * to the thread from which it is accessed.
+   *
+   * <p>
+   * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of
+   * the pool is bounded only by the number of threads that add resources to
+   * this pool. Otherwise, it caps the number of threads that can set a value on
+   * this {@link ThreadLocal} instance to the (non-zero positive) value
+   * specified in {@link #maxSize}.
+   * </p>
+   *
+   *
+   * @param <R>
+   *          the type of the resource
+   */
+  static class ThreadLocalPool<R> extends ThreadLocal<R> implements Pool<R> {
+    private static final Map<ThreadLocalPool<?>, AtomicInteger> poolSizes = new HashMap<ThreadLocalPool<?>, AtomicInteger>();
+
+    private int maxSize;
+
+    public ThreadLocalPool(int maxSize) {
+      this.maxSize = maxSize;
+    }
+
+    @Override
+    public R put(R resource) {
+      R previousResource = get();
+      if (previousResource == null) {
+        AtomicInteger poolSize = poolSizes.get(this);
+        if (poolSize == null) {
+          poolSizes.put(this, poolSize = new AtomicInteger(0));
+        }
+        if (poolSize.intValue() >= maxSize) {
+          return null;
+        }
+        poolSize.incrementAndGet();
+      }
+      this.set(resource);
+      return previousResource;
+    }
+
+    @Override
+    public void remove() {
+      super.remove();
+      AtomicInteger poolSize = poolSizes.get(this);
+      if (poolSize != null) {
+        poolSize.decrementAndGet();
+      }
+    }
+
+    @Override
+    public int size() {
+      AtomicInteger poolSize = poolSizes.get(this);
+      return poolSize != null ? poolSize.get() : 0;
+    }
+
+    @Override
+    public boolean remove(R resource) {
+      R previousResource = super.get();
+      if (resource != null && resource.equals(previousResource)) {
+        remove();
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    @Override
+    public void clear() {
+      super.remove();
+    }
+
+    @Override
+    public Collection<R> values() {
+      List<R> values = new ArrayList<R>();
+      values.add(get());
+      return values;
+    }
+  }
+}

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/util/TestPoolMap.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/util/TestPoolMap.java?rev=1095163&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/util/TestPoolMap.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/util/TestPoolMap.java Tue Apr 19 18:37:42 2011
@@ -0,0 +1,235 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import org.apache.hadoop.hbase.util.PoolMap.PoolType;
+
+public class TestPoolMap {
+  public abstract static class TestPoolType extends TestCase {
+    protected PoolMap<String, String> poolMap;
+    protected Random random = new Random();
+
+    protected static final int POOL_SIZE = 3;
+
+    @Override
+    protected void setUp() throws Exception {
+      this.poolMap = new PoolMap<String, String>(getPoolType(), POOL_SIZE);
+    }
+
+    protected abstract PoolType getPoolType();
+
+    @Override
+    protected void tearDown() throws Exception {
+      this.poolMap.clear();
+    }
+
+    protected void runThread(final String randomKey, final String randomValue,
+        final String expectedValue) throws InterruptedException {
+      final AtomicBoolean matchFound = new AtomicBoolean(false);
+      Thread thread = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          poolMap.put(randomKey, randomValue);
+          String actualValue = poolMap.get(randomKey);
+          matchFound.set(expectedValue == null ? actualValue == null
+              : expectedValue.equals(actualValue));
+        }
+      });
+      thread.start();
+      thread.join();
+      assertTrue(matchFound.get());
+    }
+  }
+
+  public static class TestRoundRobinPoolType extends TestPoolType {
+    @Override
+    protected PoolType getPoolType() {
+      return PoolType.RoundRobin;
+    }
+
+    public void testSingleThreadedClient() throws InterruptedException,
+        ExecutionException {
+      String randomKey = String.valueOf(random.nextInt());
+      String randomValue = String.valueOf(random.nextInt());
+      // As long as the pool is not full, we'll get null back.
+      // This forces the user to create new values that can be used to populate
+      // the pool.
+      runThread(randomKey, randomValue, null);
+      assertEquals(1, poolMap.size(randomKey));
+    }
+
+    public void testMultiThreadedClients() throws InterruptedException,
+        ExecutionException {
+      for (int i = 0; i < POOL_SIZE; i++) {
+        String randomKey = String.valueOf(random.nextInt());
+        String randomValue = String.valueOf(random.nextInt());
+        // As long as the pool is not full, we'll get null back
+        runThread(randomKey, randomValue, null);
+        // As long as we use distinct keys, each pool will have one value
+        assertEquals(1, poolMap.size(randomKey));
+      }
+      poolMap.clear();
+      String randomKey = String.valueOf(random.nextInt());
+      for (int i = 0; i < POOL_SIZE - 1; i++) {
+        String randomValue = String.valueOf(random.nextInt());
+        // As long as the pool is not full, we'll get null back
+        runThread(randomKey, randomValue, null);
+        // since we use the same key, the pool size should grow
+        assertEquals(i + 1, poolMap.size(randomKey));
+      }
+      // at the end of the day, there should be as many values as we put
+      assertEquals(POOL_SIZE - 1, poolMap.size(randomKey));
+    }
+
+    public void testPoolCap() throws InterruptedException, ExecutionException {
+      String randomKey = String.valueOf(random.nextInt());
+      List<String> randomValues = new ArrayList<String>();
+      for (int i = 0; i < POOL_SIZE * 2; i++) {
+        String randomValue = String.valueOf(random.nextInt());
+        randomValues.add(randomValue);
+        if (i < POOL_SIZE - 1) {
+          // As long as the pool is not full, we'll get null back
+          runThread(randomKey, randomValue, null);
+        } else {
+          // when the pool becomes full, we expect the value we get back to be
+          // what we put earlier, in round-robin order
+          runThread(randomKey, randomValue,
+              randomValues.get((i - POOL_SIZE + 1) % POOL_SIZE));
+        }
+      }
+      assertEquals(POOL_SIZE, poolMap.size(randomKey));
+    }
+
+  }
+
+  public static class TestThreadLocalPoolType extends TestPoolType {
+    @Override
+    protected PoolType getPoolType() {
+      return PoolType.ThreadLocal;
+    }
+
+    public void testSingleThreadedClient() throws InterruptedException,
+        ExecutionException {
+      String randomKey = String.valueOf(random.nextInt());
+      String randomValue = String.valueOf(random.nextInt());
+      // As long as the pool is not full, we should get back what we put
+      runThread(randomKey, randomValue, randomValue);
+      assertEquals(1, poolMap.size(randomKey));
+    }
+
+    public void testMultiThreadedClients() throws InterruptedException,
+        ExecutionException {
+      // As long as the pool is not full, we should get back what we put
+      for (int i = 0; i < POOL_SIZE; i++) {
+        String randomKey = String.valueOf(random.nextInt());
+        String randomValue = String.valueOf(random.nextInt());
+        runThread(randomKey, randomValue, randomValue);
+        assertEquals(1, poolMap.size(randomKey));
+      }
+      String randomKey = String.valueOf(random.nextInt());
+      for (int i = 0; i < POOL_SIZE; i++) {
+        String randomValue = String.valueOf(random.nextInt());
+        runThread(randomKey, randomValue, randomValue);
+        assertEquals(i + 1, poolMap.size(randomKey));
+      }
+    }
+
+    public void testPoolCap() throws InterruptedException, ExecutionException {
+      String randomKey = String.valueOf(random.nextInt());
+      for (int i = 0; i < POOL_SIZE * 2; i++) {
+        String randomValue = String.valueOf(random.nextInt());
+        if (i < POOL_SIZE) {
+          runThread(randomKey, randomValue, randomValue);
+        } else {
+          // When the pool fills up, we should not be able to put any new values
+          runThread(randomKey, randomValue, null);
+        }
+      }
+    }
+
+  }
+
+  public static class TestReusablePoolType extends TestPoolType {
+    @Override
+    protected PoolType getPoolType() {
+      return PoolType.Reusable;
+    }
+
+    public void testSingleThreadedClient() throws InterruptedException,
+        ExecutionException {
+      String randomKey = String.valueOf(random.nextInt());
+      String randomValue = String.valueOf(random.nextInt());
+      // As long as we poll values we put, the pool size should remain zero
+      runThread(randomKey, randomValue, randomValue);
+      assertEquals(0, poolMap.size(randomKey));
+    }
+
+    public void testMultiThreadedClients() throws InterruptedException,
+        ExecutionException {
+      // As long as we poll values we put, the pool size should remain zero
+      for (int i = 0; i < POOL_SIZE; i++) {
+        String randomKey = String.valueOf(random.nextInt());
+        String randomValue = String.valueOf(random.nextInt());
+        runThread(randomKey, randomValue, randomValue);
+        assertEquals(0, poolMap.size(randomKey));
+      }
+      poolMap.clear();
+      String randomKey = String.valueOf(random.nextInt());
+      for (int i = 0; i < POOL_SIZE - 1; i++) {
+        String randomValue = String.valueOf(random.nextInt());
+        runThread(randomKey, randomValue, randomValue);
+        assertEquals(0, poolMap.size(randomKey));
+      }
+      assertEquals(0, poolMap.size(randomKey));
+    }
+
+    public void testPoolCap() throws InterruptedException, ExecutionException {
+      // As long as we poll values we put, the pool size should remain zero
+      String randomKey = String.valueOf(random.nextInt());
+      List<String> randomValues = new ArrayList<String>();
+      for (int i = 0; i < POOL_SIZE * 2; i++) {
+        String randomValue = String.valueOf(random.nextInt());
+        randomValues.add(randomValue);
+        runThread(randomKey, randomValue, randomValue);
+      }
+      assertEquals(0, poolMap.size(randomKey));
+    }
+
+  }
+
+  public static Test suite() {
+    TestSuite suite = new TestSuite();
+    suite.addTestSuite(TestRoundRobinPoolType.class);
+    suite.addTestSuite(TestThreadLocalPoolType.class);
+    suite.addTestSuite(TestReusablePoolType.class);
+    return suite;
+  }
+
+}