You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by re...@apache.org on 2019/09/20 06:10:55 UTC
[hbase] branch branch-1.4 updated: HBASE-22890 Verify the file
integrity in persistent IOEngine
This is an automated email from the ASF dual-hosted git repository.
reidchan pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1.4 by this push:
new 29080ed HBASE-22890 Verify the file integrity in persistent IOEngine
29080ed is described below
commit 29080eda9859a3689910eab285a3e51481f772ff
Author: zbq.dean <zb...@gmail.com>
AuthorDate: Fri Sep 20 14:09:34 2019 +0800
HBASE-22890 Verify the file integrity in persistent IOEngine
Signed-off-by Anoop Sam John <an...@apache.org>
Signed-off-by stack <st...@apache.org>
Signed-off-by Reid Chan <re...@apache.org>
---
.../hadoop/hbase/io/hfile/bucket/BucketCache.java | 81 ++++--
.../hadoop/hbase/io/hfile/bucket/FileIOEngine.java | 88 ++++++-
.../hbase/io/hfile/bucket/PersistentIOEngine.java | 44 ++++
.../io/hfile/bucket/TestVerifyBucketCacheFile.java | 282 +++++++++++++++++++++
4 files changed, 473 insertions(+), 22 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 5a4ac13..5c02166 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -69,6 +69,8 @@ import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
import org.apache.hadoop.hbase.io.hfile.CachedBlock;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.IdReadWriteLock;
@@ -242,6 +244,16 @@ public class BucketCache implements BlockCache, HeapSize {
/** In-memory bucket size */
private float memoryFactor;
+ private static final String FILE_VERIFY_ALGORITHM =
+ "hbase.bucketcache.persistent.file.integrity.check.algorithm";
+ private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
+
+ /**
+ * Use {@link java.security.MessageDigest} class's encryption algorithms to check
+ * persistent file integrity, default algorithm is MD5
+ * */
+ private String algorithm;
+
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException,
IOException {
@@ -252,8 +264,9 @@ public class BucketCache implements BlockCache, HeapSize {
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration,
Configuration conf)
- throws FileNotFoundException, IOException {
- this.ioEngine = getIOEngineFromName(ioEngineName, capacity);
+ throws IOException {
+ this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM);
+ ioEngine = getIOEngineFromName(ioEngineName, capacity);
this.writerThreads = new WriterThread[writerThreadNum];
long blockNumCapacity = capacity / blockSize;
if (blockNumCapacity >= Integer.MAX_VALUE) {
@@ -295,7 +308,7 @@ public class BucketCache implements BlockCache, HeapSize {
} catch (IOException ioex) {
LOG.error("Can't restore from file because of", ioex);
} catch (ClassNotFoundException cnfe) {
- LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe);
+ LOG.error("Can't restore from file in rebuild because can't deserialise", cnfe);
throw new RuntimeException(cnfe);
}
}
@@ -1021,41 +1034,69 @@ public class BucketCache implements BlockCache, HeapSize {
private void persistToFile() throws IOException {
assert !cacheEnabled;
- FileOutputStream fos = null;
- ObjectOutputStream oos = null;
- try {
+ try (ObjectOutputStream oos = new ObjectOutputStream(
+ new FileOutputStream(persistencePath, false))){
if (!ioEngine.isPersistent()) {
throw new IOException("Attempt to persist non-persistent cache mappings!");
}
- fos = new FileOutputStream(persistencePath, false);
- oos = new ObjectOutputStream(fos);
+ byte[] checksum = ((PersistentIOEngine) ioEngine).calculateChecksum(algorithm);
+ if (checksum != null) {
+ oos.write(ProtobufUtil.PB_MAGIC);
+ oos.writeInt(checksum.length);
+ oos.write(checksum);
+ }
oos.writeLong(cacheCapacity);
oos.writeUTF(ioEngine.getClass().getName());
oos.writeUTF(backingMap.getClass().getName());
oos.writeObject(deserialiserMap);
oos.writeObject(backingMap);
- } finally {
- if (oos != null) oos.close();
- if (fos != null) fos.close();
}
}
@SuppressWarnings("unchecked")
- private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException,
+ private void retrieveFromFile(int[] bucketSizes) throws IOException,
ClassNotFoundException {
File persistenceFile = new File(persistencePath);
if (!persistenceFile.exists()) {
return;
}
assert !cacheEnabled;
- FileInputStream fis = null;
ObjectInputStream ois = null;
try {
if (!ioEngine.isPersistent())
throw new IOException(
"Attempt to restore non-persistent cache mappings!");
- fis = new FileInputStream(persistencePath);
- ois = new ObjectInputStream(fis);
+ ois = new ObjectInputStream(new FileInputStream(persistencePath));
+ int pblen = ProtobufUtil.lengthOfPBMagic();
+ byte[] pbuf = new byte[pblen];
+ int read = ois.read(pbuf);
+ if (read != pblen) {
+ LOG.warn("Can't restore from file because of incorrect number of bytes read while " +
+ "checking for protobuf magic number. Requested=" + pblen + ", but received= " +
+ read + ".");
+ return;
+ }
+ if (Bytes.equals(ProtobufUtil.PB_MAGIC, pbuf)) {
+ int length = ois.readInt();
+ byte[] persistentChecksum = new byte[length];
+ int readLen = ois.read(persistentChecksum);
+ if (readLen != length) {
+ LOG.warn("Can't restore from file because of incorrect number of bytes read while " +
+ "checking for persistent checksum. Requested=" + length + ", but received=" +
+ readLen + ". ");
+ return;
+ }
+ if (!((PersistentIOEngine) ioEngine).verifyFileIntegrity(
+ persistentChecksum, algorithm)) {
+ LOG.warn("Can't restore from file because of verification failed.");
+ return;
+ }
+ } else {
+ // persistent file may be an old version of file, it's not support verification,
+ // so reopen ObjectInputStream and read the persistent file from head
+ ois.close();
+ ois = new ObjectInputStream(new FileInputStream(persistencePath));
+ }
long capacitySize = ois.readLong();
if (capacitySize != cacheCapacity)
throw new IOException("Mismatched cache capacity:"
@@ -1079,8 +1120,9 @@ public class BucketCache implements BlockCache, HeapSize {
deserialiserMap = deserMap;
backingMap = backingMapFromFile;
} finally {
- if (ois != null) ois.close();
- if (fis != null) fis.close();
+ if (ois != null) {
+ ois.close();
+ }
if (!persistenceFile.delete()) {
throw new IOException("Failed deleting persistence file "
+ persistenceFile.getAbsolutePath());
@@ -1598,4 +1640,9 @@ public class BucketCache implements BlockCache, HeapSize {
float getMemoryFactor() {
return memoryFactor;
}
+
+ @VisibleForTesting
+ public UniqueIndexMap<Integer> getDeserialiserMap() {
+ return deserialiserMap;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
index 7d3a9fa..f631836 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
@@ -25,6 +25,8 @@ import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.concurrent.locks.ReentrantLock;
@@ -32,15 +34,19 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
/**
* IO engine that stores data to a file on the local file system.
*/
@InterfaceAudience.Private
-public class FileIOEngine implements IOEngine {
+public class FileIOEngine implements PersistentIOEngine {
private static final Log LOG = LogFactory.getLog(FileIOEngine.class);
public static final String FILE_DELIMITER = ",";
+ private static final DuFileCommand DU = new DuFileCommand(new String[] {"du", ""});
+
private final String[] filePaths;
private final FileChannel[] fileChannels;
private final RandomAccessFile[] rafs;
@@ -68,15 +74,20 @@ public class FileIOEngine implements IOEngine {
// The next setting length will throw exception,logging this message
// is just used for the detail reason of exception,
String msg = "Only " + StringUtils.byteDesc(totalSpace)
- + " total space under " + filePath + ", not enough for requested "
- + StringUtils.byteDesc(sizePerFile);
+ + " total space under " + filePath + ", not enough for requested "
+ + StringUtils.byteDesc(sizePerFile);
LOG.warn(msg);
}
- rafs[i].setLength(sizePerFile);
+ File file = new File(filePath);
+ // setLength() method will change file's last modified time. So if don't do
+ // this check, wrong time will be used when calculating checksum.
+ if (file.length() != sizePerFile) {
+ rafs[i].setLength(sizePerFile);
+ }
fileChannels[i] = rafs[i].getChannel();
channelLocks[i] = new ReentrantLock();
LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile)
- + ", on the path:" + filePath);
+ + ", on the path: " + filePath);
} catch (IOException fex) {
LOG.error("Failed allocating cache on " + filePath, fex);
shutdown();
@@ -86,6 +97,18 @@ public class FileIOEngine implements IOEngine {
}
@Override
+ public boolean verifyFileIntegrity(byte[] persistentChecksum, String algorithm) {
+ byte[] calculateChecksum = calculateChecksum(algorithm);
+ if (!Bytes.equals(persistentChecksum, calculateChecksum)) {
+ LOG.error("Mismatch of checksum! The persistent checksum is " +
+ Bytes.toString(persistentChecksum) + ", but the calculate checksum is " +
+ Bytes.toString(calculateChecksum));
+ return false;
+ }
+ return true;
+ }
+
+ @Override
public String toString() {
return "ioengine=" + this.getClass().getSimpleName() + ", paths="
+ Arrays.asList(filePaths) + ", capacity=" + String.format("%,d", this.capacity);
@@ -267,6 +290,61 @@ public class FileIOEngine implements IOEngine {
}
}
+ @Override
+ public byte[] calculateChecksum(String algorithm) {
+ if (filePaths == null) {
+ return null;
+ }
+ try {
+ StringBuilder sb = new StringBuilder();
+ for (String filePath : filePaths){
+ File file = new File(filePath);
+ sb.append(filePath);
+ sb.append(getFileSize(filePath));
+ sb.append(file.lastModified());
+ }
+ MessageDigest messageDigest = MessageDigest.getInstance(algorithm);
+ messageDigest.update(Bytes.toBytes(sb.toString()));
+ return messageDigest.digest();
+ } catch (IOException ioex) {
+ LOG.error("Calculating checksum failed.", ioex);
+ return null;
+ } catch (NoSuchAlgorithmException e) {
+ LOG.error("No such algorithm : " + algorithm + "!");
+ return null;
+ }
+ }
+
+ /**
+ * Using Linux command du to get file's real size
+ * @param filePath the file
+ * @return file's real size
+ * @throws IOException something happened like file not exists
+ */
+ private static long getFileSize(String filePath) throws IOException {
+ DU.setExecCommand(filePath);
+ DU.execute();
+ return Long.parseLong(DU.getOutput().split("\t")[0]);
+ }
+
+ private static class DuFileCommand extends Shell.ShellCommandExecutor {
+ private String[] execCommand;
+
+ DuFileCommand(String[] execString) {
+ super(execString);
+ execCommand = execString;
+ }
+
+ void setExecCommand(String filePath) {
+ this.execCommand[1] = filePath;
+ }
+
+ @Override
+ public String[] getExecString() {
+ return this.execCommand;
+ }
+ }
+
private static interface FileAccessor {
int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset)
throws IOException;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java
new file mode 100644
index 0000000..5886c8b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PersistentIOEngine.java
@@ -0,0 +1,44 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A class implementing PersistentIOEngine interface supports persistent and file integrity verify
+ * for {@link BucketCache}
+ */
+@InterfaceAudience.Private
+public interface PersistentIOEngine extends IOEngine {
+
+ /**
+ * Using an encryption algorithm to calculate a checksum, the default encryption algorithm is MD5
+ * @param algorithm which algorithm to calculate checksum
+ * @return the checksum which is convert to HexString
+ */
+ byte[] calculateChecksum(String algorithm);
+
+ /**
+ * Verify cache files's integrity
+ * @param persistentChecksum the persistent checksum
+ * @param algorithm which algorithm to calculate checksum
+ * @return true if verify successfully
+ */
+ boolean verifyFileIntegrity(byte[] persistentChecksum, String algorithm);
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
new file mode 100644
index 0000000..f86df96
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
@@ -0,0 +1,282 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.OutputStreamWriter;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
+import org.apache.hadoop.hbase.io.hfile.Cacheable;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Basic test for check file's integrity when BucketCache retrieve from file
+ */
+@Category(SmallTests.class)
+public class TestVerifyBucketCacheFile {
+ final int constructedBlockSize = 8 * 1024;
+ final long capacitySize = 32 * 1024 * 1024;
+ final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
+ final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
+
+ /**
+ * Test cache file or persistence file does not exist whether BucketCache starts normally
+ * (1) Start BucketCache and add some blocks, then shutdown BucketCache and persist cache
+ * to file. Restart BucketCache and it can restore cache from file.
+ * (2) Delete bucket cache file after shutdown BucketCache. Restart BucketCache and it can't
+ * restore cache from file, the cache file and persistence file would be deleted before
+ * BucketCache start normally.
+ * (3) Delete persistence file after shutdown BucketCache. Restart BucketCache and it can't
+ * restore cache from file, the cache file and persistence file would be deleted before
+ * BucketCache start normally.
+ * @throws Exception the exception
+ */
+ @Test
+ public void testRetrieveFromFile() throws Exception {
+ HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ Path testDir = TEST_UTIL.getDataTestDir();
+ TEST_UTIL.getTestFileSystem().mkdirs(testDir);
+
+ BucketCache bucketCache =
+ new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
+ null, writeThreads, writerQLen, testDir + "/bucket.persistence");
+ long usedSize = bucketCache.getAllocator().getUsedSize();
+ assertTrue(usedSize == 0);
+ CacheTestUtils.HFileBlockPair[] blocks =
+ CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
+ // Add blocks
+ for (CacheTestUtils.HFileBlockPair block : blocks) {
+ cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
+ }
+ usedSize = bucketCache.getAllocator().getUsedSize();
+ assertTrue(usedSize != 0);
+ // 1.persist cache to file
+ bucketCache.shutdown();
+ // restore cache from file
+ bucketCache =
+ new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
+ null, writeThreads, writerQLen, testDir + "/bucket.persistence");
+ assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
+ // persist cache to file
+ bucketCache.shutdown();
+
+ // 2.delete bucket cache file
+ File cacheFile = new File(testDir + "/bucket.cache");
+ assertTrue(cacheFile.delete());
+ // can't restore cache from file
+ bucketCache =
+ new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
+ null, writeThreads, writerQLen, testDir + "/bucket.persistence");
+ assertEquals(0, bucketCache.getAllocator().getUsedSize());
+ assertEquals(0, bucketCache.backingMap.size());
+ // Add blocks
+ for (CacheTestUtils.HFileBlockPair block : blocks) {
+ cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
+ }
+ usedSize = bucketCache.getAllocator().getUsedSize();
+ assertTrue(usedSize != 0);
+ // persist cache to file
+ bucketCache.shutdown();
+
+ // 3.delete backingMap persistence file
+ File mapFile = new File(testDir + "/bucket.persistence");
+ assertTrue(mapFile.delete());
+ // can't restore cache from file
+ bucketCache =
+ new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
+ null, writeThreads, writerQLen, testDir + "/bucket.persistence");
+ assertEquals(0, bucketCache.getAllocator().getUsedSize());
+ assertEquals(0, bucketCache.backingMap.size());
+
+ TEST_UTIL.cleanupTestDir();
+ }
+
+ /**
+ * Test whether BucketCache is started normally after modifying the cache file.
+ * Start BucketCache and add some blocks, then shutdown BucketCache and persist cache to file.
+ * Restart BucketCache after modify cache file's data, and it can't restore cache from file,
+ * the cache file and persistence file would be deleted before BucketCache start normally.
+ * @throws Exception the exception
+ */
+ @Test
+ public void testModifiedBucketCacheFileData() throws Exception {
+ HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ Path testDir = TEST_UTIL.getDataTestDir();
+ TEST_UTIL.getTestFileSystem().mkdirs(testDir);
+
+ BucketCache bucketCache =
+ new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
+ null, writeThreads, writerQLen, testDir + "/bucket.persistence");
+ long usedSize = bucketCache.getAllocator().getUsedSize();
+ assertTrue(usedSize == 0);
+
+ CacheTestUtils.HFileBlockPair[] blocks =
+ CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
+ // Add blocks
+ for (CacheTestUtils.HFileBlockPair block : blocks) {
+ cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
+ }
+ usedSize = bucketCache.getAllocator().getUsedSize();
+ assertTrue(usedSize != 0);
+ // persist cache to file
+ bucketCache.shutdown();
+
+ // modified bucket cache file
+ String file = testDir + "/bucket.cache";
+ try(BufferedWriter out = new BufferedWriter(new OutputStreamWriter(
+ new FileOutputStream(file, false)))) {
+ out.write("test bucket cache");
+ }
+
+ // can't restore cache from file
+ bucketCache =
+ new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
+ null, writeThreads, writerQLen, testDir + "/bucket.persistence");
+ assertEquals(0, bucketCache.getAllocator().getUsedSize());
+ assertEquals(0, bucketCache.backingMap.size());
+
+ TEST_UTIL.cleanupTestDir();
+ }
+
+ /**
+ * Test whether BucketCache is started normally after modifying the cache file's last modified
+ * time. First Start BucketCache and add some blocks, then shutdown BucketCache and persist
+ * cache to file. Then Restart BucketCache after modify cache file's last modified time, and
+ * it can't restore cache from file, the cache file and persistence file would be deleted
+ * before BucketCache start normally.
+ * @throws Exception the exception
+ */
+ @Test
+ public void testModifiedBucketCacheFileTime() throws Exception {
+ HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ Path testDir = TEST_UTIL.getDataTestDir();
+ TEST_UTIL.getTestFileSystem().mkdirs(testDir);
+
+ BucketCache bucketCache =
+ new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
+ null, writeThreads, writerQLen, testDir + "/bucket.persistence");
+ long usedSize = bucketCache.getAllocator().getUsedSize();
+ assertTrue(usedSize == 0);
+
+ CacheTestUtils.HFileBlockPair[] blocks =
+ CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
+ // Add blocks
+ for (CacheTestUtils.HFileBlockPair block : blocks) {
+ cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
+ }
+ usedSize = bucketCache.getAllocator().getUsedSize();
+ assertTrue(usedSize != 0);
+ // persist cache to file
+ bucketCache.shutdown();
+
+ // modified bucket cache file LastModifiedTime
+ File file = new File(testDir + "/bucket.cache");
+ assertTrue(file.setLastModified(System.currentTimeMillis() + 1000));
+
+ // can't restore cache from file
+ bucketCache =
+ new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
+ null, writeThreads, writerQLen, testDir + "/bucket.persistence");
+ assertEquals(0, bucketCache.getAllocator().getUsedSize());
+ assertEquals(0, bucketCache.backingMap.size());
+
+ TEST_UTIL.cleanupTestDir();
+ }
+
+ /**
+ * Test whether it can read the old version's persistence file, it's for backward compatibility.
+ * Start BucketCache and add some blocks, then persist cache to file in old way and shutdown
+ * BucketCache. Restart BucketCache, and it can normally restore from old version persistence
+ * file.
+ * @throws Exception the exception
+ */
+ @Test
+ public void compatibilityTest() throws Exception {
+ HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ Path testDir = TEST_UTIL.getDataTestDir();
+ TEST_UTIL.getTestFileSystem().mkdirs(testDir);
+ String persistencePath = testDir + "/bucket.persistence";
+ BucketCache bucketCache =
+ new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
+ null, writeThreads, writerQLen, persistencePath);
+ long usedSize = bucketCache.getAllocator().getUsedSize();
+ assertTrue(usedSize == 0);
+
+ CacheTestUtils.HFileBlockPair[] blocks =
+ CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
+ // Add blocks
+ for (CacheTestUtils.HFileBlockPair block : blocks) {
+ cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
+ }
+ usedSize = bucketCache.getAllocator().getUsedSize();
+ assertTrue(usedSize != 0);
+ // persistence backingMap using old way
+ persistToFileInOldWay(persistencePath + ".old", bucketCache.getMaxSize(),
+ bucketCache.backingMap, bucketCache.getDeserialiserMap());
+ bucketCache.shutdown();
+
+ // restore cache from file which skip check checksum
+ bucketCache =
+ new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize,
+ null, writeThreads, writerQLen, persistencePath + ".old");
+ assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
+ assertEquals(blocks.length, bucketCache.backingMap.size());
+ }
+
+ private void persistToFileInOldWay(String persistencePath, long cacheCapacity,
+ ConcurrentMap backingMap, UniqueIndexMap deserialiserMap)
+ throws IOException {
+ try(ObjectOutputStream oos = new ObjectOutputStream(
+ new FileOutputStream(persistencePath, false))) {
+ oos.writeLong(cacheCapacity);
+ oos.writeUTF(FileIOEngine.class.getName());
+ oos.writeUTF(backingMap.getClass().getName());
+ oos.writeObject(deserialiserMap);
+ oos.writeObject(backingMap);
+ }
+ }
+
+ private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
+ throws InterruptedException {
+ while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
+ Thread.sleep(100);
+ }
+ }
+
+ // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
+ // threads will flush it to the bucket and put reference entry in backingMap.
+ private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
+ Cacheable block) throws InterruptedException {
+ cache.cacheBlock(cacheKey, block);
+ waitUntilFlushedToBucket(cache, cacheKey);
+ }
+}