You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by md...@apache.org on 2018/08/01 21:54:44 UTC
hbase git commit: HBASE-20894 Use proto for BucketCache persistence
Repository: hbase
Updated Branches:
refs/heads/master 9b06361a5 -> 4bcaf495c
HBASE-20894 Use proto for BucketCache persistence
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4bcaf495
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4bcaf495
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4bcaf495
Branch: refs/heads/master
Commit: 4bcaf495c2aa10bb11fe9bb670d5015b039eac39
Parents: 9b06361
Author: Mike Drob <md...@apache.org>
Authored: Mon Jul 16 12:21:33 2018 -0500
Committer: Mike Drob <md...@apache.org>
Committed: Wed Aug 1 16:54:25 2018 -0500
----------------------------------------------------------------------
.../src/main/protobuf/BucketCacheEntry.proto | 79 ++++++++
.../hfile/CacheableDeserializerIdManager.java | 34 +++-
.../hadoop/hbase/io/hfile/HFileBlock.java | 12 +-
.../hbase/io/hfile/bucket/BucketCache.java | 183 ++++++++++--------
.../hbase/io/hfile/bucket/BucketProtoUtils.java | 191 +++++++++++++++++++
.../hbase/io/hfile/bucket/UniqueIndexMap.java | 56 ------
.../hadoop/hbase/io/hfile/CacheTestUtils.java | 1 +
.../hbase/io/hfile/bucket/TestBucketCache.java | 28 +--
.../io/hfile/bucket/TestBucketWriterThread.java | 7 +-
9 files changed, 424 insertions(+), 167 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/4bcaf495/hbase-protocol-shaded/src/main/protobuf/BucketCacheEntry.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/BucketCacheEntry.proto b/hbase-protocol-shaded/src/main/protobuf/BucketCacheEntry.proto
new file mode 100644
index 0000000..d78acc0
--- /dev/null
+++ b/hbase-protocol-shaded/src/main/protobuf/BucketCacheEntry.proto
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto2";
+
+package hbase.pb;
+
+option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
+option java_outer_classname = "BucketCacheProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+message BucketCacheEntry {
+ required int64 cache_capacity = 1;
+ required string io_class = 2;
+ required string map_class = 3;
+ map<int32, string> deserializers = 4;
+ required BackingMap backing_map = 5;
+}
+
+message BackingMap {
+ repeated BackingMapEntry entry = 1;
+}
+
+message BackingMapEntry {
+ required BlockCacheKey key = 1;
+ required BucketEntry value = 2;
+}
+
+message BlockCacheKey {
+ required string hfilename = 1;
+ required int64 offset = 2;
+ required BlockType block_type = 3;
+ required bool primary_replica_block = 4;
+}
+
+enum BlockType {
+ data = 0;
+ encoded_data = 1;
+ leaf_index = 2;
+ bloom_chunk = 3;
+ meta = 4;
+ intermediate_index = 5;
+ root_index = 6;
+ file_info = 7;
+ general_bloom_meta = 8;
+ delete_family_bloom_meta = 9;
+ trailer = 10;
+ index_v1 = 11;
+}
+
+message BucketEntry {
+ required int64 offset = 1;
+ required int32 length = 2;
+ required int64 access_counter = 3;
+ required int32 deserialiser_index = 4;
+ required BlockPriority priority = 5;
+}
+
+enum BlockPriority {
+ single = 0;
+ multi = 1;
+ memory = 2;
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4bcaf495/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializerIdManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializerIdManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializerIdManager.java
index b1ed77d..bcc29c2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializerIdManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializerIdManager.java
@@ -25,8 +25,11 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.yetus.audience.InterfaceAudience;
/**
- * This class is used to manage the identifiers for
- * {@link CacheableDeserializer}
+ * This class is used to manage the identifiers for {@link CacheableDeserializer}.
+ * All deserializers are registered with this Manager via the
+ * {@link #registerDeserializer(CacheableDeserializer)}}. On registration, we return an
+ * int *identifier* for this deserializer. The int identifier is passed to
+ * {@link #getDeserializer(int)}} to obtain the registered deserializer instance.
*/
@InterfaceAudience.Private
public class CacheableDeserializerIdManager {
@@ -34,10 +37,11 @@ public class CacheableDeserializerIdManager {
private static final AtomicInteger identifier = new AtomicInteger(0);
/**
- * Register the given cacheable deserializer and generate an unique identifier
- * id for it
- * @param cd
+ * Register the given {@link Cacheable} -- usually an hfileblock instance, these implement
+ * the Cacheable Interface -- deserializer and generate an unique identifier id for it and return
+ * this as our result.
* @return the identifier of given cacheable deserializer
+ * @see #getDeserializer(int)
*/
public static int registerDeserializer(CacheableDeserializer<Cacheable> cd) {
int idx = identifier.incrementAndGet();
@@ -48,11 +52,25 @@ public class CacheableDeserializerIdManager {
}
/**
- * Get the cacheable deserializer as the given identifier Id
- * @param id
- * @return CacheableDeserializer
+ * Get the cacheable deserializer registered at the given identifier Id.
+ * @see #registerDeserializer(CacheableDeserializer)
*/
public static CacheableDeserializer<Cacheable> getDeserializer(int id) {
return registeredDeserializers.get(id);
}
+
+ /**
+ * Snapshot a map of the current identifiers to class names for reconstruction on reading out
+ * of a file.
+ */
+ public static Map<Integer,String> save() {
+ Map<Integer, String> snapshot = new HashMap<>();
+ synchronized (registeredDeserializers) {
+ for (Map.Entry<Integer, CacheableDeserializer<Cacheable>> entry :
+ registeredDeserializers.entrySet()) {
+ snapshot.put(entry.getKey(), entry.getValue().getClass().getName());
+ }
+ }
+ return snapshot;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4bcaf495/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 238cd70..968a87e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -251,10 +251,14 @@ public class HFileBlock implements Cacheable {
* + Metadata! + <= See note on BLOCK_METADATA_SPACE above.
* ++++++++++++++
* </code>
- * @see #serialize(ByteBuffer)
+ * @see #serialize(ByteBuffer, boolean)
*/
- static final CacheableDeserializer<Cacheable> BLOCK_DESERIALIZER =
- new CacheableDeserializer<Cacheable>() {
+ public static final CacheableDeserializer<Cacheable> BLOCK_DESERIALIZER = new BlockDeserializer();
+
+ public static final class BlockDeserializer implements CacheableDeserializer<Cacheable> {
+ private BlockDeserializer() {
+ }
+
@Override
public HFileBlock deserialize(ByteBuff buf, boolean reuse, MemoryType memType)
throws IOException {
@@ -291,7 +295,7 @@ public class HFileBlock implements Cacheable {
// Used only in tests
return deserialize(b, false, MemoryType.EXCLUSIVE);
}
- };
+ }
private static final int DESERIALIZER_IDENTIFIER;
static {
http://git-wip-us.apache.org/repos/asf/hbase/blob/4bcaf495/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 40c0a00..fcebd02 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -25,8 +25,6 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -68,6 +66,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
import org.apache.hadoop.hbase.io.hfile.CachedBlock;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.IdReadWriteLock;
@@ -81,6 +80,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos;
/**
* BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses
@@ -164,8 +164,6 @@ public class BucketCache implements BlockCache, HeapSize {
private volatile boolean freeInProgress = false;
private final Lock freeSpaceLock = new ReentrantLock();
- private UniqueIndexMap<Integer> deserialiserMap = new UniqueIndexMap<>();
-
private final LongAdder realCacheSize = new LongAdder();
private final LongAdder heapSize = new LongAdder();
/** Current number of cached elements */
@@ -299,10 +297,7 @@ public class BucketCache implements BlockCache, HeapSize {
try {
retrieveFromFile(bucketSizes);
} catch (IOException ioex) {
- LOG.error("Can't restore from file because of", ioex);
- } catch (ClassNotFoundException cnfe) {
- LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe);
- throw new RuntimeException(cnfe);
+ LOG.error("Can't restore from file[" + persistencePath + "] because of ", ioex);
}
}
final String threadName = Thread.currentThread().getName();
@@ -511,7 +506,7 @@ public class BucketCache implements BlockCache, HeapSize {
LOG.trace("Read offset=" + bucketEntry.offset() + ", len=" + len);
}
Cacheable cachedBlock = ioEngine.read(bucketEntry.offset(), len,
- bucketEntry.deserializerReference(this.deserialiserMap));
+ bucketEntry.deserializerReference());
long timeTaken = System.nanoTime() - start;
if (updateCacheMetrics) {
cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
@@ -988,7 +983,7 @@ public class BucketCache implements BlockCache, HeapSize {
continue;
}
BucketEntry bucketEntry =
- re.writeToCache(ioEngine, bucketAllocator, deserialiserMap, realCacheSize);
+ re.writeToCache(ioEngine, bucketAllocator, realCacheSize);
// Successfully added. Up index and add bucketEntry. Clear io exceptions.
bucketEntries[index] = bucketEntry;
if (ioErrorStartTime > 0) {
@@ -1083,76 +1078,99 @@ public class BucketCache implements BlockCache, HeapSize {
return receptacle;
}
+ /**
+ * @see #retrieveFromFile(int[])
+ */
private void persistToFile() throws IOException {
assert !cacheEnabled;
- FileOutputStream fos = null;
- ObjectOutputStream oos = null;
- try {
- if (!ioEngine.isPersistent()) {
- throw new IOException("Attempt to persist non-persistent cache mappings!");
- }
- fos = new FileOutputStream(persistencePath, false);
- oos = new ObjectOutputStream(fos);
- oos.writeLong(cacheCapacity);
- oos.writeUTF(ioEngine.getClass().getName());
- oos.writeUTF(backingMap.getClass().getName());
- oos.writeObject(deserialiserMap);
- oos.writeObject(backingMap);
- } finally {
- if (oos != null) oos.close();
- if (fos != null) fos.close();
+ if (!ioEngine.isPersistent()) {
+ throw new IOException("Attempt to persist non-persistent cache mappings!");
+ }
+ try (FileOutputStream fos = new FileOutputStream(persistencePath, false)) {
+ fos.write(ProtobufMagic.PB_MAGIC);
+ BucketProtoUtils.toPB(this).writeDelimitedTo(fos);
}
}
- @SuppressWarnings("unchecked")
- private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException,
- ClassNotFoundException {
+ /**
+ * @see #persistToFile()
+ */
+ private void retrieveFromFile(int[] bucketSizes) throws IOException {
File persistenceFile = new File(persistencePath);
if (!persistenceFile.exists()) {
return;
}
assert !cacheEnabled;
- FileInputStream fis = null;
- ObjectInputStream ois = null;
- try {
- if (!ioEngine.isPersistent())
- throw new IOException(
- "Attempt to restore non-persistent cache mappings!");
- fis = new FileInputStream(persistencePath);
- ois = new ObjectInputStream(fis);
- long capacitySize = ois.readLong();
- if (capacitySize != cacheCapacity)
- throw new IOException("Mismatched cache capacity:"
- + StringUtils.byteDesc(capacitySize) + ", expected: "
- + StringUtils.byteDesc(cacheCapacity));
- String ioclass = ois.readUTF();
- String mapclass = ois.readUTF();
- if (!ioEngine.getClass().getName().equals(ioclass))
- throw new IOException("Class name for IO engine mismatch: " + ioclass
- + ", expected:" + ioEngine.getClass().getName());
- if (!backingMap.getClass().getName().equals(mapclass))
- throw new IOException("Class name for cache map mismatch: " + mapclass
- + ", expected:" + backingMap.getClass().getName());
- UniqueIndexMap<Integer> deserMap = (UniqueIndexMap<Integer>) ois
- .readObject();
- ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMapFromFile =
- (ConcurrentHashMap<BlockCacheKey, BucketEntry>) ois.readObject();
- BucketAllocator allocator = new BucketAllocator(cacheCapacity, bucketSizes,
- backingMapFromFile, realCacheSize);
- bucketAllocator = allocator;
- deserialiserMap = deserMap;
- backingMap = backingMapFromFile;
- } finally {
- if (ois != null) ois.close();
- if (fis != null) fis.close();
- if (!persistenceFile.delete()) {
- throw new IOException("Failed deleting persistence file "
- + persistenceFile.getAbsolutePath());
+
+ try (FileInputStream in = deleteFileOnClose(persistenceFile)) {
+ int pblen = ProtobufMagic.lengthOfPBMagic();
+ byte[] pbuf = new byte[pblen];
+ int read = in.read(pbuf);
+ if (read != pblen) {
+ throw new IOException("Incorrect number of bytes read while checking for protobuf magic "
+ + "number. Requested=" + pblen + ", Received= " + read + ", File=" + persistencePath);
+ }
+ if (! ProtobufMagic.isPBMagicPrefix(pbuf)) {
+ // In 3.0 we have enough flexibility to dump the old cache data.
+ // TODO: In 2.x line, this might need to be filled in to support reading the old format
+ throw new IOException("Persistence file does not start with protobuf magic number. " +
+ persistencePath);
}
+ parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in));
+ bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize);
}
}
/**
+ * Create an input stream that deletes the file after reading it. Use in try-with-resources to
+ * avoid this pattern where an exception thrown from a finally block may mask earlier exceptions:
+ * <pre>
+ * File f = ...
+ * try (FileInputStream fis = new FileInputStream(f)) {
+ * // use the input stream
+ * } finally {
+ * if (!f.delete()) throw new IOException("failed to delete");
+ * }
+ * </pre>
+ * @param file the file to read and delete
+ * @return a FileInputStream for the given file
+ * @throws IOException if there is a problem creating the stream
+ */
+ private FileInputStream deleteFileOnClose(final File file) throws IOException {
+ return new FileInputStream(file) {
+ @Override
+ public void close() throws IOException {
+ super.close();
+ if (!file.delete()) {
+ throw new IOException("Failed deleting persistence file " + file.getAbsolutePath());
+ }
+ }
+ };
+ }
+
+ private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass)
+ throws IOException {
+ if (capacitySize != cacheCapacity) {
+ throw new IOException("Mismatched cache capacity:"
+ + StringUtils.byteDesc(capacitySize) + ", expected: "
+ + StringUtils.byteDesc(cacheCapacity));
+ }
+ if (!ioEngine.getClass().getName().equals(ioclass)) {
+ throw new IOException("Class name for IO engine mismatch: " + ioclass
+ + ", expected:" + ioEngine.getClass().getName());
+ }
+ if (!backingMap.getClass().getName().equals(mapclass)) {
+ throw new IOException("Class name for cache map mismatch: " + mapclass
+ + ", expected:" + backingMap.getClass().getName());
+ }
+ }
+
+ private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException {
+ verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass());
+ backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap());
+ }
+
+ /**
* Check whether we tolerate IO error this time. If the duration of IOEngine
* throwing errors exceeds ioErrorsDurationTimeTolerated, we will disable the
* cache
@@ -1287,18 +1305,19 @@ public class BucketCache implements BlockCache, HeapSize {
private static final long serialVersionUID = -6741504807982257534L;
// access counter comparator, descending order
- static final Comparator<BucketEntry> COMPARATOR = new Comparator<BucketCache.BucketEntry>() {
-
- @Override
- public int compare(BucketEntry o1, BucketEntry o2) {
- return Long.compare(o2.accessCounter, o1.accessCounter);
- }
- };
+ static final Comparator<BucketEntry> COMPARATOR = Comparator
+ .comparingLong(BucketEntry::getAccessCounter).reversed();
private int offsetBase;
private int length;
private byte offset1;
+
+ /**
+ * The index of the deserializer that can deserialize this BucketEntry content.
+ * See {@link CacheableDeserializerIdManager} for hosting of index to serializers.
+ */
byte deserialiserIndex;
+
private volatile long accessCounter;
private BlockPriority priority;
@@ -1335,17 +1354,16 @@ public class BucketCache implements BlockCache, HeapSize {
return length;
}
- protected CacheableDeserializer<Cacheable> deserializerReference(
- UniqueIndexMap<Integer> deserialiserMap) {
- return CacheableDeserializerIdManager.getDeserializer(deserialiserMap
- .unmap(deserialiserIndex));
+ protected CacheableDeserializer<Cacheable> deserializerReference() {
+ return CacheableDeserializerIdManager.getDeserializer(deserialiserIndex);
+ }
+
+ protected void setDeserialiserReference(CacheableDeserializer<Cacheable> deserializer) {
+ this.deserialiserIndex = (byte) deserializer.getDeserialiserIdentifier();
}
- protected void setDeserialiserReference(
- CacheableDeserializer<Cacheable> deserializer,
- UniqueIndexMap<Integer> deserialiserMap) {
- this.deserialiserIndex = ((byte) deserialiserMap.map(deserializer
- .getDeserialiserIdentifier()));
+ public long getAccessCounter() {
+ return accessCounter;
}
/**
@@ -1504,7 +1522,6 @@ public class BucketCache implements BlockCache, HeapSize {
public BucketEntry writeToCache(final IOEngine ioEngine,
final BucketAllocator bucketAllocator,
- final UniqueIndexMap<Integer> deserialiserMap,
final LongAdder realCacheSize) throws CacheFullException, IOException,
BucketAllocatorException {
int len = data.getSerializedLength();
@@ -1516,7 +1533,7 @@ public class BucketCache implements BlockCache, HeapSize {
? new UnsafeSharedMemoryBucketEntry(offset, len, accessCounter, inMemory)
: new SharedMemoryBucketEntry(offset, len, accessCounter, inMemory)
: new BucketEntry(offset, len, accessCounter, inMemory);
- bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
+ bucketEntry.setDeserialiserReference(data.getDeserializer());
try {
if (data instanceof HFileBlock) {
// If an instance of HFileBlock, save on some allocations.
http://git-wip-us.apache.org/repos/asf/hbase/blob/4bcaf495/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
new file mode 100644
index 0000000..35daff7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
@@ -0,0 +1,191 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.BlockPriority;
+import org.apache.hadoop.hbase.io.hfile.BlockType;
+import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos;
+
+@InterfaceAudience.Private
+final class BucketProtoUtils {
+ private BucketProtoUtils() {
+
+ }
+
+ static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache) {
+ return BucketCacheProtos.BucketCacheEntry.newBuilder()
+ .setCacheCapacity(cache.getMaxSize())
+ .setIoClass(cache.ioEngine.getClass().getName())
+ .setMapClass(cache.backingMap.getClass().getName())
+ .putAllDeserializers(CacheableDeserializerIdManager.save())
+ .setBackingMap(BucketProtoUtils.toPB(cache.backingMap))
+ .build();
+ }
+
+ private static BucketCacheProtos.BackingMap toPB(
+ Map<BlockCacheKey, BucketCache.BucketEntry> backingMap) {
+ BucketCacheProtos.BackingMap.Builder builder = BucketCacheProtos.BackingMap.newBuilder();
+ for (Map.Entry<BlockCacheKey, BucketCache.BucketEntry> entry : backingMap.entrySet()) {
+ builder.addEntry(BucketCacheProtos.BackingMapEntry.newBuilder()
+ .setKey(toPB(entry.getKey()))
+ .setValue(toPB(entry.getValue()))
+ .build());
+ }
+ return builder.build();
+ }
+
+ private static BucketCacheProtos.BlockCacheKey toPB(BlockCacheKey key) {
+ return BucketCacheProtos.BlockCacheKey.newBuilder()
+ .setHfilename(key.getHfileName())
+ .setOffset(key.getOffset())
+ .setPrimaryReplicaBlock(key.isPrimary())
+ .setBlockType(toPB(key.getBlockType()))
+ .build();
+ }
+
+ private static BucketCacheProtos.BlockType toPB(BlockType blockType) {
+ switch(blockType) {
+ case DATA:
+ return BucketCacheProtos.BlockType.data;
+ case META:
+ return BucketCacheProtos.BlockType.meta;
+ case TRAILER:
+ return BucketCacheProtos.BlockType.trailer;
+ case INDEX_V1:
+ return BucketCacheProtos.BlockType.index_v1;
+ case FILE_INFO:
+ return BucketCacheProtos.BlockType.file_info;
+ case LEAF_INDEX:
+ return BucketCacheProtos.BlockType.leaf_index;
+ case ROOT_INDEX:
+ return BucketCacheProtos.BlockType.root_index;
+ case BLOOM_CHUNK:
+ return BucketCacheProtos.BlockType.bloom_chunk;
+ case ENCODED_DATA:
+ return BucketCacheProtos.BlockType.encoded_data;
+ case GENERAL_BLOOM_META:
+ return BucketCacheProtos.BlockType.general_bloom_meta;
+ case INTERMEDIATE_INDEX:
+ return BucketCacheProtos.BlockType.intermediate_index;
+ case DELETE_FAMILY_BLOOM_META:
+ return BucketCacheProtos.BlockType.delete_family_bloom_meta;
+ default:
+ throw new Error("Unrecognized BlockType.");
+ }
+ }
+
+ private static BucketCacheProtos.BucketEntry toPB(BucketCache.BucketEntry entry) {
+ return BucketCacheProtos.BucketEntry.newBuilder()
+ .setOffset(entry.offset())
+ .setLength(entry.getLength())
+ .setDeserialiserIndex(entry.deserialiserIndex)
+ .setAccessCounter(entry.getAccessCounter())
+ .setPriority(toPB(entry.getPriority()))
+ .build();
+ }
+
+ private static BucketCacheProtos.BlockPriority toPB(BlockPriority p) {
+ switch (p) {
+ case MULTI:
+ return BucketCacheProtos.BlockPriority.multi;
+ case MEMORY:
+ return BucketCacheProtos.BlockPriority.memory;
+ case SINGLE:
+ return BucketCacheProtos.BlockPriority.single;
+ default:
+ throw new Error("Unrecognized BlockPriority.");
+ }
+ }
+
+ static ConcurrentHashMap<BlockCacheKey, BucketCache.BucketEntry> fromPB(
+ Map<Integer, String> deserializers, BucketCacheProtos.BackingMap backingMap)
+ throws IOException {
+ ConcurrentHashMap<BlockCacheKey, BucketCache.BucketEntry> result = new ConcurrentHashMap<>();
+ for (BucketCacheProtos.BackingMapEntry entry : backingMap.getEntryList()) {
+ BucketCacheProtos.BlockCacheKey protoKey = entry.getKey();
+ BlockCacheKey key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getOffset(),
+ protoKey.getPrimaryReplicaBlock(), fromPb(protoKey.getBlockType()));
+ BucketCacheProtos.BucketEntry protoValue = entry.getValue();
+ BucketCache.BucketEntry value = new BucketCache.BucketEntry(
+ protoValue.getOffset(),
+ protoValue.getLength(),
+ protoValue.getAccessCounter(),
+ protoValue.getPriority() == BucketCacheProtos.BlockPriority.memory);
+ // This is the deserializer that we stored
+ int oldIndex = protoValue.getDeserialiserIndex();
+ String deserializerClass = deserializers.get(oldIndex);
+ if (deserializerClass == null) {
+ throw new IOException("Found deserializer index without matching entry.");
+ }
+ // Convert it to the identifier for the deserializer that we have in this runtime
+ if (deserializerClass.equals(HFileBlock.BlockDeserializer.class.getName())) {
+ int actualIndex = HFileBlock.BLOCK_DESERIALIZER.getDeserialiserIdentifier();
+ value.deserialiserIndex = (byte) actualIndex;
+ } else {
+ // We could make this more plugable, but right now HFileBlock is the only implementation
+ // of Cacheable outside of tests, so this might not ever matter.
+ throw new IOException("Unknown deserializer class found: " + deserializerClass);
+ }
+ result.put(key, value);
+ }
+ return result;
+ }
+
+ private static BlockType fromPb(BucketCacheProtos.BlockType blockType) {
+ switch (blockType) {
+ case data:
+ return BlockType.DATA;
+ case meta:
+ return BlockType.META;
+ case trailer:
+ return BlockType.TRAILER;
+ case index_v1:
+ return BlockType.INDEX_V1;
+ case file_info:
+ return BlockType.FILE_INFO;
+ case leaf_index:
+ return BlockType.LEAF_INDEX;
+ case root_index:
+ return BlockType.ROOT_INDEX;
+ case bloom_chunk:
+ return BlockType.BLOOM_CHUNK;
+ case encoded_data:
+ return BlockType.ENCODED_DATA;
+ case general_bloom_meta:
+ return BlockType.GENERAL_BLOOM_META;
+ case intermediate_index:
+ return BlockType.INTERMEDIATE_INDEX;
+ case delete_family_bloom_meta:
+ return BlockType.DELETE_FAMILY_BLOOM_META;
+ default:
+ throw new Error("Unrecognized BlockType.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4bcaf495/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UniqueIndexMap.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UniqueIndexMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UniqueIndexMap.java
deleted file mode 100644
index ec297a5..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UniqueIndexMap.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Copyright The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.hbase.io.hfile.bucket;
-
-import java.io.Serializable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * Map from type T to int and vice-versa. Used for reducing bit field item
- * counts.
- */
-@InterfaceAudience.Private
-public final class UniqueIndexMap<T> implements Serializable {
- private static final long serialVersionUID = -1145635738654002342L;
-
- ConcurrentHashMap<T, Integer> mForwardMap = new ConcurrentHashMap<>();
- ConcurrentHashMap<Integer, T> mReverseMap = new ConcurrentHashMap<>();
- AtomicInteger mIndex = new AtomicInteger(0);
-
- // Map a length to an index. If we can't, allocate a new mapping. We might
- // race here and get two entries with the same deserialiser. This is fine.
- int map(T parameter) {
- Integer ret = mForwardMap.get(parameter);
- if (ret != null) return ret.intValue();
- int nexti = mIndex.incrementAndGet();
- assert (nexti < Short.MAX_VALUE);
- mForwardMap.put(parameter, nexti);
- mReverseMap.put(nexti, parameter);
- return nexti;
- }
-
- T unmap(int leni) {
- Integer len = Integer.valueOf(leni);
- assert mReverseMap.containsKey(len);
- return mReverseMap.get(len);
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4bcaf495/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
index af28912..3c4ae78 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
@@ -287,6 +287,7 @@ public class CacheTestUtils {
return deserializerIdentifier;
}
+
@Override
public Cacheable deserialize(ByteBuff b, boolean reuse, MemoryType memType)
throws IOException {
http://git-wip-us.apache.org/repos/asf/hbase/blob/4bcaf495/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
index 924dd02..f4e4e53 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
@@ -19,9 +19,11 @@ package org.apache.hadoop.hbase.io.hfile.bucket;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -246,11 +248,13 @@ public class TestBucketCache {
Path testDir = TEST_UTIL.getDataTestDir();
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
- BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
- constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir
- + "/bucket.persistence");
+ String ioEngineName = "file:" + testDir + "/bucket.cache";
+ String persistencePath = testDir + "/bucket.persistence";
+
+ BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
+ constructedBlockSizes, writeThreads, writerQLen, persistencePath);
long usedSize = bucketCache.getAllocator().getUsedSize();
- assertTrue(usedSize == 0);
+ assertEquals(0, usedSize);
HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
// Add blocks
@@ -261,24 +265,26 @@ public class TestBucketCache {
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
}
usedSize = bucketCache.getAllocator().getUsedSize();
- assertTrue(usedSize != 0);
+ assertNotEquals(0, usedSize);
// persist cache to file
bucketCache.shutdown();
+ assertTrue(new File(persistencePath).exists());
// restore cache from file
- bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
- constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir
- + "/bucket.persistence");
+ bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
+ constructedBlockSizes, writeThreads, writerQLen, persistencePath);
+ assertFalse(new File(persistencePath).exists());
assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
// persist cache to file
bucketCache.shutdown();
+ assertTrue(new File(persistencePath).exists());
// reconfig buckets sizes, the biggest bucket is small than constructedBlockSize (8k or 16k)
// so it can't restore cache from file
int[] smallBucketSizes = new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024 };
- bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
- constructedBlockSize, smallBucketSizes, writeThreads,
- writerQLen, testDir + "/bucket.persistence");
+ bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
+ smallBucketSizes, writeThreads, writerQLen, persistencePath);
+ assertFalse(new File(persistencePath).exists());
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());
http://git-wip-us.apache.org/repos/asf/hbase/blob/4bcaf495/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
index a694fcb..4e7291d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
@@ -27,7 +27,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
@@ -143,8 +142,7 @@ public class TestBucketWriterThread {
RAMQueueEntry rqe = q.remove();
RAMQueueEntry spiedRqe = Mockito.spy(rqe);
Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).
- writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(),
- (UniqueIndexMap<Integer>)Mockito.any(), (LongAdder) Mockito.any());
+ writeToCache(Mockito.any(), Mockito.any(), Mockito.any());
this.q.add(spiedRqe);
doDrainOfOneEntry(bc, wt, q);
// Cache disabled when ioes w/o ever healing.
@@ -166,8 +164,7 @@ public class TestBucketWriterThread {
BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class);
Mockito.doThrow(cfe).
doReturn(mockedBucketEntry).
- when(spiedRqe).writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(),
- (UniqueIndexMap<Integer>)Mockito.any(), (LongAdder) Mockito.any());
+ when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any());
this.q.add(spiedRqe);
doDrainOfOneEntry(bc, wt, q);
}