You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/11/03 06:31:34 UTC
svn commit: r1538320 - in /hbase/trunk:
hbase-common/src/main/java/org/apache/hadoop/hbase/util/
hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/
hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/
Author: stack
Date: Sun Nov 3 05:31:34 2013
New Revision: 1538320
URL: http://svn.apache.org/r1538320
Log:
HBASE-9855 evictBlocksByHfileName improvement for bucket cache
Added:
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentIndex.java
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
Added: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentIndex.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentIndex.java?rev=1538320&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentIndex.java (added)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentIndex.java Sun Nov 3 05:31:34 2013
@@ -0,0 +1,173 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.Multiset;
+
+import java.util.Comparator;
+import java.util.ConcurrentModificationException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A simple concurrent map of sets. This is similar in concept to
+ * {@link Multiset}, with the following exceptions:
+ * <ul>
+ * <li>The set is thread-safe and concurrent: no external locking or
+ * synchronization is required. This is important for the use case where
+ * this class is used to index cached blocks by filename for their
+ * efficient eviction from cache when the file is closed or compacted.</li>
+ * <li>The expectation is that all entries may only be removed for a key
+ * once no more additions of values are being made under that key.</li>
+ * </ul>
+ * @param <K> Key type
+ * @param <V> Value type
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ConcurrentIndex<K, V> {
+
+ /** Container for the sets, indexed by key */
+ private final ConcurrentMap<K, Set<V>> container;
+
+ /**
+ * A factory that constructs new instances of the sets if no set is
+ * associated with a given key.
+ */
+ private final Supplier<Set<V>> valueSetFactory;
+
+ /**
+ * Creates an instance with a specified factory object for sets to be
+ * associated with a given key.
+ * @param valueSetFactory The factory instance
+ */
+ public ConcurrentIndex(Supplier<Set<V>> valueSetFactory) {
+ this.valueSetFactory = valueSetFactory;
+ this.container = new ConcurrentHashMap<K, Set<V>>();
+ }
+
+ /**
+ * Creates an instance using the DefaultValueSetFactory for sets,
+ * which in turn creates instances of {@link ConcurrentSkipListSet}
+ * @param valueComparator A {@link Comparator} for value types
+ */
+ public ConcurrentIndex(Comparator<V> valueComparator) {
+ this(new DefaultValueSetFactory<V>(valueComparator));
+ }
+
+ /**
+ * Associate a new unique value with a specified key. Under the covers, the
+ * method employs optimistic concurrency: if no set is associated with a
+ * given key, we create a new set; if another thread comes in, creates,
+ * and associates a set with the same key in the mean-time, we simply add
+ * the value to the already created set.
+ * @param key The key
+ * @param value An additional unique value we want to associate with a key
+ */
+ public void put(K key, V value) {
+ Set<V> set = container.get(key);
+ if (set != null) {
+ set.add(value);
+ } else {
+ set = valueSetFactory.get();
+ set.add(value);
+ Set<V> existing = container.putIfAbsent(key, set);
+ if (existing != null) {
+ // If a set is already associated with a key, that means another
+ // writer has already come in and created the set for the given key.
+ // Pursuant to an optimistic concurrency policy, in this case we will
+ // simply add the value to the existing set associated with the key.
+ existing.add(value);
+ }
+ }
+ }
+
+ /**
+ * Get all values associated with a specified key or null if no values are
+ * associated. <b>Note:</b> if the caller wishes to add or removes values
+ * to under the specified as they're iterating through the returned value,
+ * they should make a defensive copy; otherwise, a
+ * {@link ConcurrentModificationException} may be thrown.
+ * @param key The key
+ * @return All values associated with the specified key or null if no values
+ * are associated with the key.
+ */
+ public Set<V> values(K key) {
+ return container.get(key);
+ }
+
+ /**
+ * Removes the association between a specified key and value. If as a
+ * result of removing a value a set becomes empty, we remove the given
+ * set from the mapping as well.
+ * @param key The specified key
+ * @param value The value to disassociate with the key
+ */
+ public boolean remove(K key, V value) {
+ Set<V> set = container.get(key);
+ boolean success = false;
+ if (set != null) {
+ success = set.remove(value);
+ if (set.isEmpty()) {
+ container.remove(key);
+ }
+ }
+ return success;
+ }
+
+ /**
+ * Default factory class for the sets associated with given keys. Creates
+ * a {@link ConcurrentSkipListSet} using the comparator passed into the
+ * constructor.
+ * @see ConcurrentSkipListSet
+ * @see Supplier
+ * @param <V> The value type. Should match value type of the
+ * ConcurrentIndex instances of this object are passed to.
+ */
+ private static class DefaultValueSetFactory<V> implements Supplier<Set<V>> {
+ private final Comparator<V> comparator;
+
+ /**
+ * Creates an instance that passes a specified comparator to the
+ * {@link ConcurrentSkipListSet}
+ * @param comparator The specified comparator
+ */
+ public DefaultValueSetFactory(Comparator<V> comparator) {
+ this.comparator = comparator;
+ }
+
+ /**
+ * Creates a new {@link ConcurrentSkipListSet} instance using the
+ * comparator specified when the class instance was constructed.
+ * @return The instantiated {@link ConcurrentSkipListSet} object
+ */
+ @Override
+ public Set<V> get() {
+ return new ConcurrentSkipListSet<V>(comparator);
+ }
+ }
+}
\ No newline at end of file
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java?rev=1538320&r1=1538319&r2=1538320&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java Sun Nov 3 05:31:34 2013
@@ -97,4 +97,8 @@ public class BlockCacheKey implements He
public DataBlockEncoding getDataBlockEncoding() {
return encoding;
}
+
+ public long getOffset() {
+ return offset;
+ }
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java?rev=1538320&r1=1538319&r2=1538320&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java Sun Nov 3 05:31:34 2013
@@ -30,9 +30,11 @@ import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
+import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -58,11 +60,13 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.ConcurrentIndex;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.IdLock;
import org.apache.hadoop.util.StringUtils;
+import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
@@ -169,8 +173,19 @@ public class BucketCache implements Bloc
*/
private IdLock offsetLock = new IdLock();
+ private final ConcurrentIndex<String, BlockCacheKey> blocksByHFile =
+ new ConcurrentIndex<String, BlockCacheKey>(new Comparator<BlockCacheKey>() {
+ @Override
+ public int compare(BlockCacheKey a, BlockCacheKey b) {
+ if (a.getOffset() == b.getOffset()) {
+ return 0;
+ } else if (a.getOffset() < b.getOffset()) {
+ return -1;
+ }
+ return 1;
+ }
+ });
-
/** Statistics thread schedule pool (for heavy debugging, could remove) */
private final ScheduledExecutorService scheduleThreadPool =
Executors.newScheduledThreadPool(1,
@@ -322,6 +337,7 @@ public class BucketCache implements Bloc
} else {
this.blockNumber.incrementAndGet();
this.heapSize.addAndGet(cachedItem.heapSize());
+ blocksByHFile.put(cacheKey.getHfileName(), cacheKey);
}
}
@@ -392,6 +408,7 @@ public class BucketCache implements Bloc
if (bucketEntry.equals(backingMap.remove(cacheKey))) {
bucketAllocator.freeBlock(bucketEntry.offset());
realCacheSize.addAndGet(-1 * bucketEntry.getLength());
+ blocksByHFile.remove(cacheKey.getHfileName(), cacheKey);
if (removedBlock == null) {
this.blockNumber.decrementAndGet();
}
@@ -914,10 +931,7 @@ public class BucketCache implements Bloc
}
/**
- * Evicts all blocks for a specific HFile. This is an expensive operation
- * implemented as a linear-time search through all blocks in the cache.
- * Ideally this should be a search in a log-access-time map.
- *
+ * Evicts all blocks for a specific HFile.
* <p>
* This is used for evict-on-close to remove all blocks of a specific HFile.
*
@@ -925,13 +939,20 @@ public class BucketCache implements Bloc
*/
@Override
public int evictBlocksByHfileName(String hfileName) {
+ // Copy the list to avoid ConcurrentModificationException
+ // as evictBlockKey removes the key from the index
+ Set<BlockCacheKey> keySet = blocksByHFile.values(hfileName);
+ if (keySet == null) {
+ return 0;
+ }
int numEvicted = 0;
- for (BlockCacheKey key : this.backingMap.keySet()) {
- if (key.getHfileName().equals(hfileName)) {
- if (evictBlock(key))
+ List<BlockCacheKey> keysForHFile = ImmutableList.copyOf(keySet);
+ for (BlockCacheKey key : keysForHFile) {
+ if (evictBlock(key)) {
++numEvicted;
}
}
+
return numEvicted;
}