You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by re...@apache.org on 2019/09/16 09:52:02 UTC
[hbase] branch branch-1 updated: Revert "HBASE-22890 Verify the
files when RegionServer is starting and BucketCache is in file mode"
This is an automated email from the ASF dual-hosted git repository.
reidchan pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1 by this push:
new 3b6cff5 Revert "HBASE-22890 Verify the files when RegionServer is starting and BucketCache is in file mode"
3b6cff5 is described below
commit 3b6cff590e99245cd972193d32ef73618cce41b3
Author: Reid Chan <re...@apache.org>
AuthorDate: Mon Sep 16 17:50:57 2019 +0800
Revert "HBASE-22890 Verify the files when RegionServer is starting and BucketCache is in file mode"
Reason: There're still some concerns on whether to delete cached data file.
This reverts commit 5bf60ec55fdf637d80492b61e6e6d8d605c5ef4a.
---
.../hadoop/hbase/io/hfile/bucket/BucketCache.java | 75 ++----
.../hadoop/hbase/io/hfile/bucket/FileIOEngine.java | 152 +----------
.../hbase/io/hfile/bucket/PersistentIOEngine.java | 59 ----
.../hbase/io/hfile/bucket/TestFileIOEngine.java | 2 +-
.../io/hfile/bucket/TestVerifyBucketCacheFile.java | 297 ---------------------
5 files changed, 32 insertions(+), 553 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 98abfc8..c5a1b21 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -29,7 +29,6 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
-import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
@@ -70,7 +69,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
import org.apache.hadoop.hbase.io.hfile.CachedBlock;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.IdReadWriteLock;
@@ -244,17 +242,6 @@ public class BucketCache implements BlockCache, HeapSize {
/** In-memory bucket size */
private float memoryFactor;
- private String ioEngineName;
- private static final String FILE_VERIFY_ALGORITHM =
- "hbase.bucketcache.persistent.file.integrity.check.algorithm";
- private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
-
- /**
- * Use {@link java.security.MessageDigest} class's encryption algorithms to check
- * persistent file integrity, default algorithm is MD5
- * */
- private String algorithm;
-
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException,
IOException {
@@ -265,7 +252,8 @@ public class BucketCache implements BlockCache, HeapSize {
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration,
Configuration conf)
- throws IOException {
+ throws FileNotFoundException, IOException {
+ this.ioEngine = getIOEngineFromName(ioEngineName, capacity);
this.writerThreads = new WriterThread[writerThreadNum];
long blockNumCapacity = capacity / blockSize;
if (blockNumCapacity >= Integer.MAX_VALUE) {
@@ -287,7 +275,6 @@ public class BucketCache implements BlockCache, HeapSize {
", memoryFactor: " + memoryFactor);
this.cacheCapacity = capacity;
- this.ioEngineName = ioEngineName;
this.persistencePath = persistencePath;
this.blockSize = blockSize;
this.ioErrorsTolerationDuration = ioErrorsTolerationDuration;
@@ -301,15 +288,14 @@ public class BucketCache implements BlockCache, HeapSize {
this.ramCache = new ConcurrentHashMap<BlockCacheKey, RAMQueueEntry>();
this.backingMap = new ConcurrentHashMap<BlockCacheKey, BucketEntry>((int) blockNumCapacity);
- this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM);
- ioEngine = getIOEngineFromName();
+
if (ioEngine.isPersistent() && persistencePath != null) {
try {
retrieveFromFile(bucketSizes);
} catch (IOException ioex) {
LOG.error("Can't restore from file because of", ioex);
} catch (ClassNotFoundException cnfe) {
- LOG.error("Can't restore from file in rebuild because can't deserialise", cnfe);
+ LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe);
throw new RuntimeException(cnfe);
}
}
@@ -373,10 +359,12 @@ public class BucketCache implements BlockCache, HeapSize {
/**
* Get the IOEngine from the IO engine name
+ * @param ioEngineName
+ * @param capacity
* @return the IOEngine
* @throws IOException
*/
- private IOEngine getIOEngineFromName()
+ private IOEngine getIOEngineFromName(String ioEngineName, long capacity)
throws IOException {
if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) {
// In order to make the usage simple, we only need the prefix 'files:' in
@@ -384,11 +372,11 @@ public class BucketCache implements BlockCache, HeapSize {
// the compatibility
String[] filePaths =
ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER);
- return new FileIOEngine(algorithm, persistencePath, cacheCapacity, filePaths);
+ return new FileIOEngine(capacity, filePaths);
} else if (ioEngineName.startsWith("offheap"))
- return new ByteBufferIOEngine(cacheCapacity, true);
+ return new ByteBufferIOEngine(capacity, true);
else if (ioEngineName.startsWith("heap"))
- return new ByteBufferIOEngine(cacheCapacity, false);
+ return new ByteBufferIOEngine(capacity, false);
else
throw new IllegalArgumentException(
"Don't understand io engine name for cache - prefix with file:, heap or offheap");
@@ -1033,48 +1021,41 @@ public class BucketCache implements BlockCache, HeapSize {
private void persistToFile() throws IOException {
assert !cacheEnabled;
- try (ObjectOutputStream oos = new ObjectOutputStream(
- new FileOutputStream(persistencePath, false))){
+ FileOutputStream fos = null;
+ ObjectOutputStream oos = null;
+ try {
if (!ioEngine.isPersistent()) {
throw new IOException("Attempt to persist non-persistent cache mappings!");
}
- if (ioEngine instanceof PersistentIOEngine) {
- oos.write(ProtobufUtil.PB_MAGIC);
- byte[] checksum = ((PersistentIOEngine) ioEngine).calculateChecksum();
- oos.writeInt(checksum.length);
- oos.write(checksum);
- }
+ fos = new FileOutputStream(persistencePath, false);
+ oos = new ObjectOutputStream(fos);
oos.writeLong(cacheCapacity);
oos.writeUTF(ioEngine.getClass().getName());
oos.writeUTF(backingMap.getClass().getName());
oos.writeObject(deserialiserMap);
oos.writeObject(backingMap);
- } catch (NoSuchAlgorithmException e) {
- LOG.error("No such algorithm : " + algorithm + "! Failed to persist data on exit",e);
+ } finally {
+ if (oos != null) oos.close();
+ if (fos != null) fos.close();
}
}
@SuppressWarnings("unchecked")
- private void retrieveFromFile(int[] bucketSizes) throws IOException,
+ private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException,
ClassNotFoundException {
File persistenceFile = new File(persistencePath);
if (!persistenceFile.exists()) {
return;
}
assert !cacheEnabled;
- try (ObjectInputStream ois = new ObjectInputStream(new FileInputStream(persistencePath))){
+ FileInputStream fis = null;
+ ObjectInputStream ois = null;
+ try {
if (!ioEngine.isPersistent())
throw new IOException(
"Attempt to restore non-persistent cache mappings!");
- // for backward compatibility
- if (ioEngine instanceof PersistentIOEngine &&
- !((PersistentIOEngine) ioEngine).isOldVersion()) {
- byte[] PBMagic = new byte[ProtobufUtil.PB_MAGIC.length];
- ois.read(PBMagic);
- int length = ois.readInt();
- byte[] persistenceChecksum = new byte[length];
- ois.read(persistenceChecksum);
- }
+ fis = new FileInputStream(persistencePath);
+ ois = new ObjectInputStream(fis);
long capacitySize = ois.readLong();
if (capacitySize != cacheCapacity)
throw new IOException("Mismatched cache capacity:"
@@ -1097,8 +1078,9 @@ public class BucketCache implements BlockCache, HeapSize {
bucketAllocator = allocator;
deserialiserMap = deserMap;
backingMap = backingMapFromFile;
- blockNumber.set(backingMap.size());
} finally {
+ if (ois != null) ois.close();
+ if (fis != null) fis.close();
if (!persistenceFile.delete()) {
throw new IOException("Failed deleting persistence file "
+ persistenceFile.getAbsolutePath());
@@ -1615,9 +1597,4 @@ public class BucketCache implements BlockCache, HeapSize {
float getMemoryFactor() {
return memoryFactor;
}
-
- @VisibleForTesting
- public UniqueIndexMap<Integer> getDeserialiserMap() {
- return deserialiserMap;
- }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
index f26c6c5..7d3a9fa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
@@ -19,16 +19,12 @@
package org.apache.hadoop.hbase.io.hfile.bucket;
import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
-import java.io.ObjectInputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.concurrent.locks.ReentrantLock;
@@ -36,20 +32,15 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
/**
* IO engine that stores data to a file on the local file system.
*/
@InterfaceAudience.Private
-public class FileIOEngine implements PersistentIOEngine {
+public class FileIOEngine implements IOEngine {
private static final Log LOG = LogFactory.getLog(FileIOEngine.class);
public static final String FILE_DELIMITER = ",";
- private static final DuFileCommand DU = new DuFileCommand(new String[] {"du", ""});
-
private final String[] filePaths;
private final FileChannel[] fileChannels;
private final RandomAccessFile[] rafs;
@@ -57,58 +48,17 @@ public class FileIOEngine implements PersistentIOEngine {
private final long sizePerFile;
private final long capacity;
- private final String algorithmName;
- private boolean oldVersion;
private FileReadAccessor readAccessor = new FileReadAccessor();
private FileWriteAccessor writeAccessor = new FileWriteAccessor();
- public FileIOEngine(String algorithmName, String persistentPath,
- long capacity, String... filePaths) throws IOException {
+ public FileIOEngine(long capacity, String... filePaths) throws IOException {
this.sizePerFile = capacity / filePaths.length;
this.capacity = this.sizePerFile * filePaths.length;
this.filePaths = filePaths;
this.fileChannels = new FileChannel[filePaths.length];
this.rafs = new RandomAccessFile[filePaths.length];
this.channelLocks = new ReentrantLock[filePaths.length];
- this.algorithmName = algorithmName;
- verifyFileIntegrity(persistentPath);
- init();
- }
-
- /**
- * Verify cache files's integrity
- * @param persistentPath the backingMap persistent path
- */
- @Override
- public void verifyFileIntegrity(String persistentPath) {
- if (persistentPath != null) {
- byte[] persistentChecksum = readPersistentChecksum(persistentPath);
- if (!oldVersion) {
- try {
- byte[] calculateChecksum = calculateChecksum();
- if (!Bytes.equals(persistentChecksum, calculateChecksum)) {
- LOG.warn("The persistent checksum is " + Bytes.toString(persistentChecksum) +
- ", but the calculate checksum is " + Bytes.toString(calculateChecksum));
- throw new IOException();
- }
- } catch (IOException ioex) {
- LOG.error("File verification failed because of ", ioex);
- // delete cache files and backingMap persistent file.
- deleteCacheDataFile();
- new File(persistentPath).delete();
- } catch (NoSuchAlgorithmException nsae) {
- LOG.error("No such algorithm " + algorithmName, nsae);
- throw new RuntimeException(nsae);
- }
- }
- } else {
- // not configure persistent path
- deleteCacheDataFile();
- }
- }
-
- private void init() throws IOException {
for (int i = 0; i < filePaths.length; i++) {
String filePath = filePaths[i];
try {
@@ -118,15 +68,15 @@ public class FileIOEngine implements PersistentIOEngine {
// The next setting length will throw exception,logging this message
// is just used for the detail reason of exception,
String msg = "Only " + StringUtils.byteDesc(totalSpace)
- + " total space under " + filePath + ", not enough for requested "
- + StringUtils.byteDesc(sizePerFile);
+ + " total space under " + filePath + ", not enough for requested "
+ + StringUtils.byteDesc(sizePerFile);
LOG.warn(msg);
}
rafs[i].setLength(sizePerFile);
fileChannels[i] = rafs[i].getChannel();
channelLocks[i] = new ReentrantLock();
LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile)
- + ", on the path: " + filePath);
+ + ", on the path:" + filePath);
} catch (IOException fex) {
LOG.error("Failed allocating cache on " + filePath, fex);
shutdown();
@@ -317,98 +267,6 @@ public class FileIOEngine implements PersistentIOEngine {
}
}
- /**
- * Read the persistent checksum from persistent path
- * @param persistentPath the backingMap persistent path
- * @return the persistent checksum
- */
- private byte[] readPersistentChecksum(String persistentPath) {
- try (ObjectInputStream ois = new ObjectInputStream(new FileInputStream(persistentPath))) {
- byte[] PBMagic = new byte[ProtobufUtil.PB_MAGIC.length];
- ois.read(PBMagic);
- if (Bytes.equals(ProtobufUtil.PB_MAGIC, PBMagic)) {
- int length = ois.readInt();
- byte[] persistentChecksum = new byte[length];
- ois.read(persistentChecksum);
- return persistentChecksum;
- } else {
- // if the persistent file is not start with PB_MAGIC, it's an old version file
- oldVersion = true;
- }
- } catch (IOException ioex) {
- LOG.warn("Failed read persistent checksum, because of " + ioex);
- return null;
- }
- return null;
- }
-
- @Override
- public void deleteCacheDataFile() {
- if (filePaths == null) {
- return;
- }
- for (String file : filePaths) {
- new File(file).delete();
- }
- }
-
- @Override
- public byte[] calculateChecksum()
- throws IOException, NoSuchAlgorithmException {
- if (filePaths == null) {
- return null;
- }
- StringBuilder sb = new StringBuilder();
- for (String filePath : filePaths){
- File file = new File(filePath);
- if (file.exists()){
- sb.append(filePath);
- sb.append(getFileSize(filePath));
- sb.append(file.lastModified());
- } else {
- throw new IOException("Cache file: " + filePath + " is not exists.");
- }
- }
- MessageDigest messageDigest = MessageDigest.getInstance(algorithmName);
- messageDigest.update(Bytes.toBytes(sb.toString()));
- return messageDigest.digest();
- }
-
- @Override
- public boolean isOldVersion() {
- return oldVersion;
- }
-
- /**
- * Using Linux command du to get file's real size
- * @param filePath the file
- * @return file's real size
- * @throws IOException something happened like file not exists
- */
- private static long getFileSize(String filePath) throws IOException {
- DU.setExecCommand(filePath);
- DU.execute();
- return Long.parseLong(DU.getOutput().split("\t")[0]);
- }
-
- private static class DuFileCommand extends Shell.ShellCommandExecutor {
- private String[] execCommand;
-
- DuFileCommand(String[] execString) {
- super(execString);
- execCommand = execString;
- }
-
- void setExecCommand(String filePath) {
- this.execCommand[1] = filePath;
- }
-
- @Override
- public String[] getExecString() {
- return this.execCommand;
- }
- }
-
private static interface FileAccessor {
int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset)
throws IOException;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java
deleted file mode 100644
index 556f5c5..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Copyright The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.hbase.io.hfile.bucket;
-
-import java.io.IOException;
-import java.security.NoSuchAlgorithmException;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * A class implementing PersistentIOEngine interface supports persistent and file integrity verify
- * for {@link BucketCache}
- */
-@InterfaceAudience.Private
-public interface PersistentIOEngine extends IOEngine {
-
- /**
- * Delete bucketcache files
- */
- void deleteCacheDataFile();
-
- /**
- * Using an encryption algorithm to calculate a checksum, the default encryption algorithm is MD5
- * @return the checksum which is convert to HexString
- * @throws IOException something happened like file not exists
- * @throws NoSuchAlgorithmException no such algorithm
- */
- byte[] calculateChecksum()
- throws IOException, NoSuchAlgorithmException;
-
- /**
- * Whether the persistent file support verify file integrity, old version file
- * does not support verification, it's for back compatibility
- * @return true if the persistent file does not support verify file integrity
- */
- boolean isOldVersion();
-
- /**
- * Verify cache files's integrity
- * @param persistentPath the backingMap persistent path
- */
- void verifyFileIntegrity(String persistentPath);
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
index d85aec9..6e677d5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
@@ -68,7 +68,7 @@ public class TestFileIOEngine {
@Before
public void setUp() throws IOException {
- fileIOEngine = new FileIOEngine("MD5", null, TOTAL_CAPACITY, FILE_PATHS);
+ fileIOEngine = new FileIOEngine(TOTAL_CAPACITY, FILE_PATHS);
}
@After
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
deleted file mode 100644
index c54315f..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/**
- * Copyright The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.hbase.io.hfile.bucket;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.io.OutputStreamWriter;
-import java.util.Arrays;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
-import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
-import org.apache.hadoop.hbase.io.hfile.Cacheable;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-/**
- * Basic test for check file's integrity before start BucketCache in fileIOEngine
- */
-@RunWith(Parameterized.class)
-@Category(SmallTests.class)
-public class TestVerifyBucketCacheFile {
- @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}")
- public static Iterable<Object[]> data() {
- return Arrays.asList(new Object[][] { { 8192, null }, { 16 * 1024,
- new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
- 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
- 128 * 1024 + 1024 } } });
- }
-
- @Parameterized.Parameter(0)
- public int constructedBlockSize;
-
- @Parameterized.Parameter(1)
- public int[] constructedBlockSizes;
-
- final long capacitySize = 32 * 1024 * 1024;
- final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
- final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
-
- /**
- * Test cache file or persistence file does not exist whether BucketCache starts normally
- * (1) Start BucketCache and add some blocks, then shutdown BucketCache and persist cache
- * to file. Restart BucketCache and it can restore cache from file.
- * (2) Delete bucket cache file after shutdown BucketCache. Restart BucketCache and it can't
- * restore cache from file, the cache file and persistence file would be deleted before
- * BucketCache start normally.
- * (3) Delete persistence file after shutdown BucketCache. Restart BucketCache and it can't
- * restore cache from file, the cache file and persistence file would be deleted before
- * BucketCache start normally.
- * @throws Exception the exception
- */
- @Test
- public void testRetrieveFromFile() throws Exception {
- HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- Path testDir = TEST_UTIL.getDataTestDir();
- TEST_UTIL.getTestFileSystem().mkdirs(testDir);
-
- BucketCache bucketCache =
- new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
- constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
- long usedSize = bucketCache.getAllocator().getUsedSize();
- assertTrue(usedSize == 0);
- CacheTestUtils.HFileBlockPair[] blocks =
- CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
- // Add blocks
- for (CacheTestUtils.HFileBlockPair block : blocks) {
- cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
- }
- usedSize = bucketCache.getAllocator().getUsedSize();
- assertTrue(usedSize != 0);
- // 1.persist cache to file
- bucketCache.shutdown();
- // restore cache from file
- bucketCache =
- new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
- constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
- assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
- // persist cache to file
- bucketCache.shutdown();
-
- // 2.delete bucket cache file
- File cacheFile = new File(testDir + "/bucket.cache");
- assertTrue(cacheFile.delete());
- // can't restore cache from file
- bucketCache =
- new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
- constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
- assertEquals(0, bucketCache.getAllocator().getUsedSize());
- assertEquals(0, bucketCache.backingMap.size());
- // Add blocks
- for (CacheTestUtils.HFileBlockPair block : blocks) {
- cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
- }
- usedSize = bucketCache.getAllocator().getUsedSize();
- assertTrue(usedSize != 0);
- // persist cache to file
- bucketCache.shutdown();
-
- // 3.delete backingMap persistence file
- File mapFile = new File(testDir + "/bucket.persistence");
- assertTrue(mapFile.delete());
- // can't restore cache from file
- bucketCache =
- new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
- constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
- assertEquals(0, bucketCache.getAllocator().getUsedSize());
- assertEquals(0, bucketCache.backingMap.size());
-
- TEST_UTIL.cleanupTestDir();
- }
-
- /**
- * Test whether BucketCache is started normally after modifying the cache file.
- * Start BucketCache and add some blocks, then shutdown BucketCache and persist cache to file.
- * Restart BucketCache after modify cache file's data, and it can't restore cache from file,
- * the cache file and persistence file would be deleted before BucketCache start normally.
- * @throws Exception the exception
- */
- @Test
- public void testModifiedBucketCacheFileData() throws Exception {
- HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- Path testDir = TEST_UTIL.getDataTestDir();
- TEST_UTIL.getTestFileSystem().mkdirs(testDir);
-
- BucketCache bucketCache =
- new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
- constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
- long usedSize = bucketCache.getAllocator().getUsedSize();
- assertTrue(usedSize == 0);
-
- CacheTestUtils.HFileBlockPair[] blocks =
- CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
- // Add blocks
- for (CacheTestUtils.HFileBlockPair block : blocks) {
- cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
- }
- usedSize = bucketCache.getAllocator().getUsedSize();
- assertTrue(usedSize != 0);
- // persist cache to file
- bucketCache.shutdown();
-
- // modified bucket cache file
- String file = testDir + "/bucket.cache";
- try(BufferedWriter out = new BufferedWriter(new OutputStreamWriter(
- new FileOutputStream(file, true)))) {
- out.write("test bucket cache");
- }
- // can't restore cache from file
- bucketCache =
- new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
- constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
- assertEquals(0, bucketCache.getAllocator().getUsedSize());
- assertEquals(0, bucketCache.backingMap.size());
-
- TEST_UTIL.cleanupTestDir();
- }
-
- /**
- * Test whether BucketCache is started normally after modifying the cache file's last modified
- * time. First Start BucketCache and add some blocks, then shutdown BucketCache and persist
- * cache to file. Then Restart BucketCache after modify cache file's last modified time, and
- * it can't restore cache from file, the cache file and persistence file would be deleted
- * before BucketCache start normally.
- * @throws Exception the exception
- */
- @Test
- public void testModifiedBucketCacheFileTime() throws Exception {
- HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- Path testDir = TEST_UTIL.getDataTestDir();
- TEST_UTIL.getTestFileSystem().mkdirs(testDir);
-
- BucketCache bucketCache =
- new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
- constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
- long usedSize = bucketCache.getAllocator().getUsedSize();
- assertTrue(usedSize == 0);
-
- CacheTestUtils.HFileBlockPair[] blocks =
- CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
- // Add blocks
- for (CacheTestUtils.HFileBlockPair block : blocks) {
- cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
- }
- usedSize = bucketCache.getAllocator().getUsedSize();
- assertTrue(usedSize != 0);
- // persist cache to file
- bucketCache.shutdown();
-
- // modified bucket cache file LastModifiedTime
- File file = new File(testDir + "/bucket.cache");
- assertTrue(file.setLastModified(System.currentTimeMillis() + 1000));
- // can't restore cache from file
- bucketCache =
- new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
- constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence");
- assertEquals(0, bucketCache.getAllocator().getUsedSize());
- assertEquals(0, bucketCache.backingMap.size());
-
- TEST_UTIL.cleanupTestDir();
- }
-
- /**
- * Test whether it can read the old version's persistence file, it's for backward compatibility.
- * Start BucketCache and add some blocks, then persist cache to file in old way and shutdown
- * BucketCache. Restart BucketCache, and it can normally restore from old version persistence
- * file.
- * @throws Exception the exception
- */
- @Test
- public void compatibilityTest() throws Exception {
- HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- Path testDir = TEST_UTIL.getDataTestDir();
- TEST_UTIL.getTestFileSystem().mkdirs(testDir);
- String persistencePath = testDir + "/bucket.persistence";
- BucketCache bucketCache =
- new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
- constructedBlockSizes, writeThreads, writerQLen, persistencePath);
- long usedSize = bucketCache.getAllocator().getUsedSize();
- assertTrue(usedSize == 0);
-
- CacheTestUtils.HFileBlockPair[] blocks =
- CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
- // Add blocks
- for (CacheTestUtils.HFileBlockPair block : blocks) {
- cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
- }
- usedSize = bucketCache.getAllocator().getUsedSize();
- assertTrue(usedSize != 0);
- // persistence backingMap using old way
- persistToFileInOldWay(persistencePath + ".old", bucketCache.getMaxSize(),
- bucketCache.backingMap, bucketCache.getDeserialiserMap());
- bucketCache.shutdown();
-
- // restore cache from file which skip check checksum
- bucketCache =
- new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
- constructedBlockSizes, writeThreads, writerQLen, persistencePath + ".old");
- assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
- assertEquals(blocks.length, bucketCache.backingMap.size());
- }
-
- private void persistToFileInOldWay(String persistencePath, long cacheCapacity,
- ConcurrentMap backingMap, UniqueIndexMap deserialiserMap)
- throws IOException {
- try(ObjectOutputStream oos = new ObjectOutputStream(
- new FileOutputStream(persistencePath, false))) {
- oos.writeLong(cacheCapacity);
- oos.writeUTF(FileIOEngine.class.getName());
- oos.writeUTF(backingMap.getClass().getName());
- oos.writeObject(deserialiserMap);
- oos.writeObject(backingMap);
- }
- }
-
- private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
- throws InterruptedException {
- while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
- Thread.sleep(100);
- }
- }
-
- // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
- // threads will flush it to the bucket and put reference entry in backingMap.
- private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
- Cacheable block) throws InterruptedException {
- cache.cacheBlock(cacheKey, block);
- waitUntilFlushedToBucket(cache, cacheKey);
- }
-}