You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/11/03 06:30:57 UTC

svn commit: r1538319 - in /hbase/branches/0.96: hbase-common/src/main/java/org/apache/hadoop/hbase/util/ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/

Author: stack
Date: Sun Nov  3 05:30:57 2013
New Revision: 1538319

URL: http://svn.apache.org/r1538319
Log:
HBASE-9855 evictBlocksByHfileName improvement for bucket cache

Added:
    hbase/branches/0.96/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentIndex.java
Modified:
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java

Added: hbase/branches/0.96/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentIndex.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentIndex.java?rev=1538319&view=auto
==============================================================================
--- hbase/branches/0.96/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentIndex.java (added)
+++ hbase/branches/0.96/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentIndex.java Sun Nov  3 05:30:57 2013
@@ -0,0 +1,173 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.Multiset;
+
+import java.util.Comparator;
+import java.util.ConcurrentModificationException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A simple concurrent map of sets. This is similar in concept to
+ * {@link Multiset}, with the following exceptions:
+ * <ul>
+ *   <li>The set is thread-safe and concurrent: no external locking or
+ *   synchronization is required. This is important for the use case where
+ *   this class is used to index cached blocks by filename for their
+ *   efficient eviction from cache when the file is closed or compacted.</li>
+ *   <li>The expectation is that all entries may only be removed for a key
+ *   once no more additions of values are being made under that key.</li>
+ * </ul>
+ * @param <K> Key type
+ * @param <V> Value type
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ConcurrentIndex<K, V> {
+
+  /** Container for the sets, indexed by key */
+  private final ConcurrentMap<K, Set<V>> container;
+
+  /**
+   * A factory that constructs new instances of the sets if no set is
+   * associated with a given key.
+   */
+  private final Supplier<Set<V>> valueSetFactory;
+
+  /**
+   * Creates an instance with a specified factory object for sets to be
+   * associated with a given key.
+   * @param valueSetFactory The factory instance
+   */
+  public ConcurrentIndex(Supplier<Set<V>> valueSetFactory) {
+    this.valueSetFactory = valueSetFactory;
+    this.container = new ConcurrentHashMap<K, Set<V>>();
+  }
+
+  /**
+   * Creates an instance using the DefaultValueSetFactory for sets,
+   * which in turn creates instances of {@link ConcurrentSkipListSet}
+   * @param valueComparator A {@link Comparator} for value types
+   */
+  public ConcurrentIndex(Comparator<V> valueComparator) {
+    this(new DefaultValueSetFactory<V>(valueComparator));
+  }
+
+  /**
+   * Associate a new unique value with a specified key. Under the covers, the
+   * method employs optimistic concurrency: if no set is associated with a
+   * given key, we create a new set; if another thread comes in, creates,
+   * and associates a set with the same key in the mean-time, we simply add
+   * the value to the already created set.
+   * @param key The key
+   * @param value An additional unique value we want to associate with a key
+   */
+  public void put(K key, V value) {
+    Set<V> set = container.get(key);
+    if (set != null) {
+      set.add(value);
+    } else {
+      set = valueSetFactory.get();
+      set.add(value);
+      Set<V> existing = container.putIfAbsent(key, set);
+      if (existing != null) {
+        // If a set is already associated with a key, that means another
+        // writer has already come in and created the set for the given key.
+        // Pursuant to an optimistic concurrency policy, in this case we will
+        // simply add the value to the existing set associated with the key.
+        existing.add(value);
+      }
+    }
+  }
+
+  /**
+   * Get all values associated with a specified key or null if no values are
+   * associated. <b>Note:</b> if the caller wishes to add or removes values
+   * to under the specified as they're iterating through the returned value,
+   * they should make a defensive copy; otherwise, a
+   * {@link ConcurrentModificationException} may be thrown.
+   * @param key The key
+   * @return All values associated with the specified key or null if no values
+   *         are associated with the key.
+   */
+  public Set<V> values(K key) {
+    return container.get(key);
+  }
+
+  /**
+   * Removes the association between a specified key and value. If as a
+   * result of removing a value a set becomes empty, we remove the given
+   * set from the mapping as well.
+   * @param key The specified key
+   * @param value The value to disassociate with the key
+   */
+  public boolean remove(K key, V value) {
+    Set<V> set = container.get(key);
+    boolean success = false;
+    if (set != null) {
+      success = set.remove(value);
+      if (set.isEmpty()) {
+        container.remove(key);
+      }
+    }
+    return success;
+  }
+
+  /**
+   * Default factory class for the sets associated with given keys. Creates
+   * a {@link ConcurrentSkipListSet} using the comparator passed into the
+   * constructor.
+   * @see ConcurrentSkipListSet
+   * @see Supplier
+   * @param <V> The value type. Should match value type of the
+   *           ConcurrentIndex instances of this object are passed to.
+   */
+  private static class DefaultValueSetFactory<V> implements Supplier<Set<V>> {
+    private final Comparator<V> comparator;
+
+    /**
+     * Creates an instance that passes a specified comparator to the
+     * {@link ConcurrentSkipListSet}
+     * @param comparator The specified comparator
+     */
+    public DefaultValueSetFactory(Comparator<V> comparator) {
+      this.comparator = comparator;
+    }
+
+    /**
+     * Creates a new {@link ConcurrentSkipListSet} instance using the
+     * comparator specified when the class instance was constructed.
+     * @return The instantiated {@link ConcurrentSkipListSet} object
+     */
+    @Override
+    public Set<V> get() {
+      return new ConcurrentSkipListSet<V>(comparator);
+    }
+  }
+}
\ No newline at end of file

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java?rev=1538319&r1=1538318&r2=1538319&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java Sun Nov  3 05:30:57 2013
@@ -97,4 +97,8 @@ public class BlockCacheKey implements He
   public DataBlockEncoding getDataBlockEncoding() {
     return encoding;
   }
+
+  public long getOffset() {
+    return offset;
+  }
 }

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java?rev=1538319&r1=1538318&r2=1538319&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java Sun Nov  3 05:30:57 2013
@@ -30,9 +30,11 @@ import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
+import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -58,11 +60,13 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
 import org.apache.hadoop.hbase.io.hfile.HFileBlock;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.ConcurrentIndex;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.IdLock;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
@@ -169,8 +173,19 @@ public class BucketCache implements Bloc
    */
   private IdLock offsetLock = new IdLock();
 
+  private final ConcurrentIndex<String, BlockCacheKey> blocksByHFile =
+      new ConcurrentIndex<String, BlockCacheKey>(new Comparator<BlockCacheKey>() {
+        @Override
+        public int compare(BlockCacheKey a, BlockCacheKey b) {
+          if (a.getOffset() == b.getOffset()) {
+            return 0;
+          } else if (a.getOffset() < b.getOffset()) {
+            return -1;
+          }
+          return 1;
+        }
+      });
 
-  
   /** Statistics thread schedule pool (for heavy debugging, could remove) */
   private final ScheduledExecutorService scheduleThreadPool =
     Executors.newScheduledThreadPool(1,
@@ -322,6 +337,7 @@ public class BucketCache implements Bloc
     } else {
       this.blockNumber.incrementAndGet();
       this.heapSize.addAndGet(cachedItem.heapSize());
+      blocksByHFile.put(cacheKey.getHfileName(), cacheKey);
     }
   }
 
@@ -392,6 +408,7 @@ public class BucketCache implements Bloc
         if (bucketEntry.equals(backingMap.remove(cacheKey))) {
           bucketAllocator.freeBlock(bucketEntry.offset());
           realCacheSize.addAndGet(-1 * bucketEntry.getLength());
+          blocksByHFile.remove(cacheKey.getHfileName(), cacheKey);
           if (removedBlock == null) {
             this.blockNumber.decrementAndGet();
           }
@@ -914,10 +931,7 @@ public class BucketCache implements Bloc
   }
 
   /**
-   * Evicts all blocks for a specific HFile. This is an expensive operation
-   * implemented as a linear-time search through all blocks in the cache.
-   * Ideally this should be a search in a log-access-time map.
-   * 
+   * Evicts all blocks for a specific HFile. 
    * <p>
    * This is used for evict-on-close to remove all blocks of a specific HFile.
    * 
@@ -925,13 +939,20 @@ public class BucketCache implements Bloc
    */
   @Override
   public int evictBlocksByHfileName(String hfileName) {
+    // Copy the list to avoid ConcurrentModificationException
+    // as evictBlockKey removes the key from the index
+    Set<BlockCacheKey> keySet = blocksByHFile.values(hfileName);
+    if (keySet == null) {
+      return 0;
+    }
     int numEvicted = 0;
-    for (BlockCacheKey key : this.backingMap.keySet()) {
-      if (key.getHfileName().equals(hfileName)) {
-        if (evictBlock(key))
+    List<BlockCacheKey> keysForHFile = ImmutableList.copyOf(keySet);
+    for (BlockCacheKey key : keysForHFile) {
+      if (evictBlock(key)) {
           ++numEvicted;
       }
     }
+    
     return numEvicted;
   }