You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/04/19 20:37:42 UTC
svn commit: r1095163 - in /hbase/trunk/src:
main/java/org/apache/hadoop/hbase/util/PoolMap.java
test/java/org/apache/hadoop/hbase/util/TestPoolMap.java
Author: stack
Date: Tue Apr 19 18:37:42 2011
New Revision: 1095163
URL: http://svn.apache.org/viewvc?rev=1095163&view=rev
Log:
HBASE-2939 Allow Client-Side Connection Pooling
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/util/TestPoolMap.java
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java?rev=1095163&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java Tue Apr 19 18:37:42 2011
@@ -0,0 +1,431 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ *
+ * The <code>PoolMap</code> maps a key to a collection of values, the elements
+ * of which are managed by a pool. In effect, that collection acts as a shared
+ * pool of resources, access to which is closely controlled as per the semantics
+ * of the pool.
+ *
+ * <p>
+ * In case the size of the pool is set to a non-zero positive number, that is
+ * used to cap the number of resources that a pool may contain for any given
+ * key. A size of {@link Integer#MAX_VALUE} is interpreted as an unbounded pool.
+ * </p>
+ *
+ * @param <K>
+ * the type of the key to the resource
+ * @param <V>
+ * the type of the resource being pooled
+ */
+public class PoolMap<K, V> implements Map<K, V> {
+ private PoolType poolType;
+
+ private int poolMaxSize;
+
+ private Map<K, Pool<V>> pools = Collections
+ .synchronizedMap(new HashMap<K, Pool<V>>());
+
+ public PoolMap(PoolType poolType, int poolMaxSize) {
+ this.poolType = poolType;
+ this.poolMaxSize = poolMaxSize;
+ }
+
+ @Override
+ public V get(Object key) {
+ Pool<V> pool = pools.get(key);
+ return pool != null ? pool.get() : null;
+ }
+
+ @Override
+ public V put(K key, V value) {
+ Pool<V> pool = pools.get(key);
+ if (pool == null) {
+ pools.put(key, pool = createPool());
+ }
+ return pool != null ? pool.put(value) : null;
+ }
+
+ @Override
+ public V remove(Object key) {
+ Pool<V> pool = pools.remove(key);
+ if (pool != null) {
+ pool.clear();
+ }
+ return null;
+ }
+
+ public boolean remove(K key, V value) {
+ Pool<V> pool = pools.get(key);
+ return pool != null ? pool.remove(value) : false;
+ }
+
+ @Override
+ public Collection<V> values() {
+ Collection<V> values = new ArrayList<V>();
+ for (Pool<V> pool : pools.values()) {
+ Collection<V> poolValues = pool.values();
+ if (poolValues != null) {
+ values.addAll(poolValues);
+ }
+ }
+ return values;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return pools.isEmpty();
+ }
+
+ @Override
+ public int size() {
+ return pools.size();
+ }
+
+ public int size(K key) {
+ Pool<V> pool = pools.get(key);
+ return pool != null ? pool.size() : 0;
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return pools.containsKey(key);
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ if (value == null) {
+ return false;
+ }
+ for (Pool<V> pool : pools.values()) {
+ if (value.equals(pool.get())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void putAll(Map<? extends K, ? extends V> map) {
+ for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) {
+ put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ @Override
+ public void clear() {
+ for (Pool<V> pool : pools.values()) {
+ pool.clear();
+ }
+ pools.clear();
+ }
+
+ @Override
+ public Set<K> keySet() {
+ return pools.keySet();
+ }
+
+ @Override
+ public Set<Map.Entry<K, V>> entrySet() {
+ Set<Map.Entry<K, V>> entries = new HashSet<Entry<K, V>>();
+ for (Map.Entry<K, Pool<V>> poolEntry : pools.entrySet()) {
+ final K poolKey = poolEntry.getKey();
+ final Pool<V> pool = poolEntry.getValue();
+ for (final V poolValue : pool.values()) {
+ if (pool != null) {
+ entries.add(new Map.Entry<K, V>() {
+ @Override
+ public K getKey() {
+ return poolKey;
+ }
+
+ @Override
+ public V getValue() {
+ return poolValue;
+ }
+
+ @Override
+ public V setValue(V value) {
+ return pool.put(value);
+ }
+ });
+ }
+ }
+ }
+ return null;
+ }
+
+ protected interface Pool<R> {
+ public R get();
+
+ public R put(R resource);
+
+ public boolean remove(R resource);
+
+ public void clear();
+
+ public Collection<R> values();
+
+ public int size();
+ }
+
+ public enum PoolType {
+ Reusable, ThreadLocal, RoundRobin;
+
+ public static PoolType valueOf(String poolTypeName,
+ PoolType defaultPoolType, PoolType... allowedPoolTypes) {
+ PoolType poolType = PoolType.fuzzyMatch(poolTypeName);
+ if (poolType != null) {
+ boolean allowedType = false;
+ if (poolType.equals(defaultPoolType)) {
+ allowedType = true;
+ } else {
+ if (allowedPoolTypes != null) {
+ for (PoolType allowedPoolType : allowedPoolTypes) {
+ if (poolType.equals(allowedPoolType)) {
+ allowedType = true;
+ break;
+ }
+ }
+ }
+ }
+ if (!allowedType) {
+ poolType = null;
+ }
+ }
+ return (poolType != null) ? poolType : defaultPoolType;
+ }
+
+ public static String fuzzyNormalize(String name) {
+ return name != null ? name.replaceAll("-", "").trim().toLowerCase() : "";
+ }
+
+ public static PoolType fuzzyMatch(String name) {
+ for (PoolType poolType : values()) {
+ if (fuzzyNormalize(name).equals(fuzzyNormalize(poolType.name()))) {
+ return poolType;
+ }
+ }
+ return null;
+ }
+ }
+
+ protected Pool<V> createPool() {
+ switch (poolType) {
+ case Reusable:
+ return new ReusablePool<V>(poolMaxSize);
+ case RoundRobin:
+ return new RoundRobinPool<V>(poolMaxSize);
+ case ThreadLocal:
+ return new ThreadLocalPool<V>(poolMaxSize);
+ }
+ return null;
+ }
+
+ /**
+ * The <code>ReusablePool</code> represents a {@link PoolMap.Pool} that builds
+ * on the {@link LinkedList} class. It essentially allows resources to be
+ * checked out, at which point it is removed from this pool. When the resource
+ * is no longer required, it should be returned to the pool in order to be
+ * reused.
+ *
+ * <p>
+ * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of
+ * the pool is unbounded. Otherwise, it caps the number of consumers that can
+ * check out a resource from this pool to the (non-zero positive) value
+ * specified in {@link #maxSize}.
+ * </p>
+ *
+ * @param <R>
+ * the type of the resource
+ */
+ @SuppressWarnings("serial")
+ public class ReusablePool<R> extends LinkedList<R> implements Pool<R> {
+ private int maxSize;
+
+ public ReusablePool(int maxSize) {
+ this.maxSize = maxSize;
+
+ }
+
+ @Override
+ public R get() {
+ return poll();
+ }
+
+ @Override
+ public R put(R resource) {
+ if (size() < maxSize) {
+ add(resource);
+ }
+ return null;
+ }
+
+ @Override
+ public Collection<R> values() {
+ return this;
+ }
+ }
+
+ /**
+ * The <code>RoundRobinPool</code> represents a {@link PoolMap.Pool}, which
+ * stores its resources in an {@link ArrayList}. It load-balances access to
+ * its resources by returning a different resource every time a given key is
+ * looked up.
+ *
+ * <p>
+ * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of
+ * the pool is unbounded. Otherwise, it caps the number of resources in this
+ * pool to the (non-zero positive) value specified in {@link #maxSize}.
+ * </p>
+ *
+ * @param <R>
+ * the type of the resource
+ *
+ */
+ @SuppressWarnings("serial")
+ class RoundRobinPool<R> extends ArrayList<R> implements Pool<R> {
+ private int maxSize;
+ private int nextResource = 0;
+
+ public RoundRobinPool(int maxSize) {
+ this.maxSize = maxSize;
+ }
+
+ @Override
+ public R put(R resource) {
+ if (size() < maxSize) {
+ add(resource);
+ }
+ return null;
+ }
+
+ @Override
+ public R get() {
+ if (size() < maxSize) {
+ return null;
+ }
+ nextResource %= size();
+ R resource = get(nextResource++);
+ return resource;
+ }
+
+ @Override
+ public Collection<R> values() {
+ return this;
+ }
+
+ }
+
+ /**
+ * The <code>ThreadLocalPool</code> represents a {@link PoolMap.Pool} that
+ * builds on the {@link ThreadLocal} class. It essentially binds the resource
+ * to the thread from which it is accessed.
+ *
+ * <p>
+ * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of
+ * the pool is bounded only by the number of threads that add resources to
+ * this pool. Otherwise, it caps the number of threads that can set a value on
+ * this {@link ThreadLocal} instance to the (non-zero positive) value
+ * specified in {@link #maxSize}.
+ * </p>
+ *
+ *
+ * @param <R>
+ * the type of the resource
+ */
+ static class ThreadLocalPool<R> extends ThreadLocal<R> implements Pool<R> {
+ private static final Map<ThreadLocalPool<?>, AtomicInteger> poolSizes = new HashMap<ThreadLocalPool<?>, AtomicInteger>();
+
+ private int maxSize;
+
+ public ThreadLocalPool(int maxSize) {
+ this.maxSize = maxSize;
+ }
+
+ @Override
+ public R put(R resource) {
+ R previousResource = get();
+ if (previousResource == null) {
+ AtomicInteger poolSize = poolSizes.get(this);
+ if (poolSize == null) {
+ poolSizes.put(this, poolSize = new AtomicInteger(0));
+ }
+ if (poolSize.intValue() >= maxSize) {
+ return null;
+ }
+ poolSize.incrementAndGet();
+ }
+ this.set(resource);
+ return previousResource;
+ }
+
+ @Override
+ public void remove() {
+ super.remove();
+ AtomicInteger poolSize = poolSizes.get(this);
+ if (poolSize != null) {
+ poolSize.decrementAndGet();
+ }
+ }
+
+ @Override
+ public int size() {
+ AtomicInteger poolSize = poolSizes.get(this);
+ return poolSize != null ? poolSize.get() : 0;
+ }
+
+ @Override
+ public boolean remove(R resource) {
+ R previousResource = super.get();
+ if (resource != null && resource.equals(previousResource)) {
+ remove();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void clear() {
+ super.remove();
+ }
+
+ @Override
+ public Collection<R> values() {
+ List<R> values = new ArrayList<R>();
+ values.add(get());
+ return values;
+ }
+ }
+}
Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/util/TestPoolMap.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/util/TestPoolMap.java?rev=1095163&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/util/TestPoolMap.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/util/TestPoolMap.java Tue Apr 19 18:37:42 2011
@@ -0,0 +1,235 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import org.apache.hadoop.hbase.util.PoolMap.PoolType;
+
+public class TestPoolMap {
+ public abstract static class TestPoolType extends TestCase {
+ protected PoolMap<String, String> poolMap;
+ protected Random random = new Random();
+
+ protected static final int POOL_SIZE = 3;
+
+ @Override
+ protected void setUp() throws Exception {
+ this.poolMap = new PoolMap<String, String>(getPoolType(), POOL_SIZE);
+ }
+
+ protected abstract PoolType getPoolType();
+
+ @Override
+ protected void tearDown() throws Exception {
+ this.poolMap.clear();
+ }
+
+ protected void runThread(final String randomKey, final String randomValue,
+ final String expectedValue) throws InterruptedException {
+ final AtomicBoolean matchFound = new AtomicBoolean(false);
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ poolMap.put(randomKey, randomValue);
+ String actualValue = poolMap.get(randomKey);
+ matchFound.set(expectedValue == null ? actualValue == null
+ : expectedValue.equals(actualValue));
+ }
+ });
+ thread.start();
+ thread.join();
+ assertTrue(matchFound.get());
+ }
+ }
+
+ public static class TestRoundRobinPoolType extends TestPoolType {
+ @Override
+ protected PoolType getPoolType() {
+ return PoolType.RoundRobin;
+ }
+
+ public void testSingleThreadedClient() throws InterruptedException,
+ ExecutionException {
+ String randomKey = String.valueOf(random.nextInt());
+ String randomValue = String.valueOf(random.nextInt());
+ // As long as the pool is not full, we'll get null back.
+ // This forces the user to create new values that can be used to populate
+ // the pool.
+ runThread(randomKey, randomValue, null);
+ assertEquals(1, poolMap.size(randomKey));
+ }
+
+ public void testMultiThreadedClients() throws InterruptedException,
+ ExecutionException {
+ for (int i = 0; i < POOL_SIZE; i++) {
+ String randomKey = String.valueOf(random.nextInt());
+ String randomValue = String.valueOf(random.nextInt());
+ // As long as the pool is not full, we'll get null back
+ runThread(randomKey, randomValue, null);
+ // As long as we use distinct keys, each pool will have one value
+ assertEquals(1, poolMap.size(randomKey));
+ }
+ poolMap.clear();
+ String randomKey = String.valueOf(random.nextInt());
+ for (int i = 0; i < POOL_SIZE - 1; i++) {
+ String randomValue = String.valueOf(random.nextInt());
+ // As long as the pool is not full, we'll get null back
+ runThread(randomKey, randomValue, null);
+ // since we use the same key, the pool size should grow
+ assertEquals(i + 1, poolMap.size(randomKey));
+ }
+ // at the end of the day, there should be as many values as we put
+ assertEquals(POOL_SIZE - 1, poolMap.size(randomKey));
+ }
+
+ public void testPoolCap() throws InterruptedException, ExecutionException {
+ String randomKey = String.valueOf(random.nextInt());
+ List<String> randomValues = new ArrayList<String>();
+ for (int i = 0; i < POOL_SIZE * 2; i++) {
+ String randomValue = String.valueOf(random.nextInt());
+ randomValues.add(randomValue);
+ if (i < POOL_SIZE - 1) {
+ // As long as the pool is not full, we'll get null back
+ runThread(randomKey, randomValue, null);
+ } else {
+ // when the pool becomes full, we expect the value we get back to be
+ // what we put earlier, in round-robin order
+ runThread(randomKey, randomValue,
+ randomValues.get((i - POOL_SIZE + 1) % POOL_SIZE));
+ }
+ }
+ assertEquals(POOL_SIZE, poolMap.size(randomKey));
+ }
+
+ }
+
+ public static class TestThreadLocalPoolType extends TestPoolType {
+ @Override
+ protected PoolType getPoolType() {
+ return PoolType.ThreadLocal;
+ }
+
+ public void testSingleThreadedClient() throws InterruptedException,
+ ExecutionException {
+ String randomKey = String.valueOf(random.nextInt());
+ String randomValue = String.valueOf(random.nextInt());
+ // As long as the pool is not full, we should get back what we put
+ runThread(randomKey, randomValue, randomValue);
+ assertEquals(1, poolMap.size(randomKey));
+ }
+
+ public void testMultiThreadedClients() throws InterruptedException,
+ ExecutionException {
+ // As long as the pool is not full, we should get back what we put
+ for (int i = 0; i < POOL_SIZE; i++) {
+ String randomKey = String.valueOf(random.nextInt());
+ String randomValue = String.valueOf(random.nextInt());
+ runThread(randomKey, randomValue, randomValue);
+ assertEquals(1, poolMap.size(randomKey));
+ }
+ String randomKey = String.valueOf(random.nextInt());
+ for (int i = 0; i < POOL_SIZE; i++) {
+ String randomValue = String.valueOf(random.nextInt());
+ runThread(randomKey, randomValue, randomValue);
+ assertEquals(i + 1, poolMap.size(randomKey));
+ }
+ }
+
+ public void testPoolCap() throws InterruptedException, ExecutionException {
+ String randomKey = String.valueOf(random.nextInt());
+ for (int i = 0; i < POOL_SIZE * 2; i++) {
+ String randomValue = String.valueOf(random.nextInt());
+ if (i < POOL_SIZE) {
+ runThread(randomKey, randomValue, randomValue);
+ } else {
+ // When the pool fills up, we should not be able to put any new values
+ runThread(randomKey, randomValue, null);
+ }
+ }
+ }
+
+ }
+
+ public static class TestReusablePoolType extends TestPoolType {
+ @Override
+ protected PoolType getPoolType() {
+ return PoolType.Reusable;
+ }
+
+ public void testSingleThreadedClient() throws InterruptedException,
+ ExecutionException {
+ String randomKey = String.valueOf(random.nextInt());
+ String randomValue = String.valueOf(random.nextInt());
+ // As long as we poll values we put, the pool size should remain zero
+ runThread(randomKey, randomValue, randomValue);
+ assertEquals(0, poolMap.size(randomKey));
+ }
+
+ public void testMultiThreadedClients() throws InterruptedException,
+ ExecutionException {
+ // As long as we poll values we put, the pool size should remain zero
+ for (int i = 0; i < POOL_SIZE; i++) {
+ String randomKey = String.valueOf(random.nextInt());
+ String randomValue = String.valueOf(random.nextInt());
+ runThread(randomKey, randomValue, randomValue);
+ assertEquals(0, poolMap.size(randomKey));
+ }
+ poolMap.clear();
+ String randomKey = String.valueOf(random.nextInt());
+ for (int i = 0; i < POOL_SIZE - 1; i++) {
+ String randomValue = String.valueOf(random.nextInt());
+ runThread(randomKey, randomValue, randomValue);
+ assertEquals(0, poolMap.size(randomKey));
+ }
+ assertEquals(0, poolMap.size(randomKey));
+ }
+
+ public void testPoolCap() throws InterruptedException, ExecutionException {
+ // As long as we poll values we put, the pool size should remain zero
+ String randomKey = String.valueOf(random.nextInt());
+ List<String> randomValues = new ArrayList<String>();
+ for (int i = 0; i < POOL_SIZE * 2; i++) {
+ String randomValue = String.valueOf(random.nextInt());
+ randomValues.add(randomValue);
+ runThread(randomKey, randomValue, randomValue);
+ }
+ assertEquals(0, poolMap.size(randomKey));
+ }
+
+ }
+
+ public static Test suite() {
+ TestSuite suite = new TestSuite();
+ suite.addTestSuite(TestRoundRobinPoolType.class);
+ suite.addTestSuite(TestThreadLocalPoolType.class);
+ suite.addTestSuite(TestReusablePoolType.class);
+ return suite;
+ }
+
+}