You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2022/01/25 07:14:52 UTC

[hbase] 02/02: HBASE-26700 The way we bypass broken track file is not enough in StoreFileListFile (#4055)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 3021c585137b1aa4b716cfa7258f07fd08e79545
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Tue Jan 25 14:51:11 2022 +0800

    HBASE-26700 The way we bypass broken track file is not enough in StoreFileListFile (#4055)
    
    Signed-off-by: Wellington Ramos Chevreuil <wc...@apache.org>
---
 .../storefiletracker/StoreFileListFile.java        |  57 +++++--
 .../master/procedure/TestCreateTableProcedure.java |   4 +-
 .../regionserver/TestMergesSplitsAddToTracker.java |   9 +-
 ...leTracker.java => StoreFileTrackerForTest.java} |   6 +-
 .../storefiletracker/TestStoreFileListFile.java    | 165 +++++++++++++++++++++
 5 files changed, 216 insertions(+), 25 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileListFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileListFile.java
index ffb3647..ced0118 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileListFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileListFile.java
@@ -17,8 +17,10 @@
  */
 package org.apache.hadoop.hbase.regionserver.storefiletracker;
 
+import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.zip.CRC32;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -29,9 +31,6 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
-import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
-
 import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList;
 
 /**
@@ -42,18 +41,27 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.
  * other file.
  * <p/>
  * So in this way, we could avoid listing when we want to load the store file list file.
+ * <p/>
+ * To prevent loading partial file, we use the first 4 bytes as file length, and also append a 4
+ * bytes crc32 checksum at the end. This is because the protobuf message parser sometimes can return
+ * without error on partial bytes if you stop at some special points, but the return message will
+ * have incorrect field value. We should try our best to prevent this happens because loading an
+ * incorrect store file list file usually leads to data loss.
  */
 @InterfaceAudience.Private
 class StoreFileListFile {
 
   private static final Logger LOG = LoggerFactory.getLogger(StoreFileListFile.class);
 
-  private static final String TRACK_FILE_DIR = ".filelist";
+  static final String TRACK_FILE_DIR = ".filelist";
 
   private static final String TRACK_FILE = "f1";
 
   private static final String TRACK_FILE_ROTATE = "f2";
 
+  // 16 MB, which is big enough for a tracker file
+  private static final int MAX_FILE_SIZE = 16 * 1024 * 1024;
+
   private final StoreContext ctx;
 
   private final Path trackFileDir;
@@ -74,16 +82,26 @@ class StoreFileListFile {
 
   private StoreFileList load(Path path) throws IOException {
     FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
-    byte[] bytes;
+    byte[] data;
+    int expectedChecksum;
     try (FSDataInputStream in = fs.open(path)) {
-      bytes = ByteStreams.toByteArray(in);
+      int length = in.readInt();
+      if (length <= 0 || length > MAX_FILE_SIZE) {
+        throw new IOException("Invalid file length " + length +
+          ", either less than 0 or greater then max allowed size " + MAX_FILE_SIZE);
+      }
+      data = new byte[length];
+      in.readFully(data);
+      expectedChecksum = in.readInt();
     }
-    // Read all the bytes and then parse it, so we will only throw InvalidProtocolBufferException
-    // here. This is very important for upper layer to determine whether this is the normal case,
-    // where the file does not exist or is incomplete. If there is another type of exception, the
-    // upper layer should throw it out instead of just ignoring it, otherwise it will lead to data
-    // loss.
-    return StoreFileList.parseFrom(bytes);
+    CRC32 crc32 = new CRC32();
+    crc32.update(data);
+    int calculatedChecksum = (int) crc32.getValue();
+    if (expectedChecksum != calculatedChecksum) {
+      throw new IOException(
+        "Checksum mismatch, expected " + expectedChecksum + ", actual " + calculatedChecksum);
+    }
+    return StoreFileList.parseFrom(data);
   }
 
   private int select(StoreFileList[] lists) {
@@ -101,9 +119,9 @@ class StoreFileListFile {
     for (int i = 0; i < 2; i++) {
       try {
         lists[i] = load(trackFiles[i]);
-      } catch (FileNotFoundException | InvalidProtocolBufferException e) {
+      } catch (FileNotFoundException | EOFException e) {
         // this is normal case, so use info and do not log stacktrace
-        LOG.info("Failed to load track file {}: {}", trackFiles[i], e);
+        LOG.info("Failed to load track file {}: {}", trackFiles[i], e.toString());
       }
     }
     int winnerIndex = select(lists);
@@ -124,10 +142,17 @@ class StoreFileListFile {
       // we need to call load first to load the prevTimestamp and also the next file
       load();
     }
-    FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
     long timestamp = Math.max(prevTimestamp + 1, EnvironmentEdgeManager.currentTime());
+    byte[] actualData = builder.setTimestamp(timestamp).build().toByteArray();
+    CRC32 crc32 = new CRC32();
+    crc32.update(actualData);
+    int checksum = (int) crc32.getValue();
+    // 4 bytes length at the beginning, plus 4 bytes checksum
+    FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
     try (FSDataOutputStream out = fs.create(trackFiles[nextTrackFile], true)) {
-      builder.setTimestamp(timestamp).build().writeTo(out);
+      out.writeInt(actualData.length);
+      out.write(actualData);
+      out.writeInt(checksum);
     }
     // record timestamp
     prevTimestamp = timestamp;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestCreateTableProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestCreateTableProcedure.java
index 51ea9f5..bb9985e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestCreateTableProcedure.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestCreateTableProcedure.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
 import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
-import org.apache.hadoop.hbase.regionserver.storefiletracker.TestStoreFileTracker;
+import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerForTest;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -96,7 +96,7 @@ public class TestCreateTableProcedure extends TestTableDDLProcedureBase {
     final TableName tableName = TableName.valueOf(name.getMethodName());
     ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
     TableDescriptor htd = MasterProcedureTestingUtility.createHTD(tableName, F1);
-    String trackerName = TestStoreFileTracker.class.getName();
+    String trackerName = StoreFileTrackerForTest.class.getName();
     htd = TableDescriptorBuilder.newBuilder(htd).setValue(TRACKER_IMPL, trackerName).build();
     RegionInfo[] regions = ModifyRegionUtils.createRegionInfos(htd, null);
     long procId = ProcedureTestingUtility.submitAndWait(procExec,
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMergesSplitsAddToTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMergesSplitsAddToTracker.java
index 2cbfdea..703d619 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMergesSplitsAddToTracker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMergesSplitsAddToTracker.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.regionserver.storefiletracker.TestStoreFileTracker;
+import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerForTest;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -86,13 +86,13 @@ public class TestMergesSplitsAddToTracker {
 
   @Before
   public void setup(){
-    TestStoreFileTracker.clear();
+    StoreFileTrackerForTest.clear();
   }
 
   private TableName createTable(byte[] splitKey) throws IOException {
     TableDescriptor td = TableDescriptorBuilder.newBuilder(name.getTableName())
       .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY_NAME))
-      .setValue(TRACKER_IMPL, TestStoreFileTracker.class.getName()).build();
+      .setValue(TRACKER_IMPL, StoreFileTrackerForTest.class.getName()).build();
     if (splitKey != null) {
       TEST_UTIL.getAdmin().createTable(td, new byte[][] { splitKey });
     } else {
@@ -247,7 +247,8 @@ public class TestMergesSplitsAddToTracker {
 
   private void verifyFilesAreTracked(Path regionDir, FileSystem fs) throws Exception {
     for (FileStatus f : fs.listStatus(new Path(regionDir, FAMILY_NAME_STR))) {
-      assertTrue(TestStoreFileTracker.tracked(regionDir.getName(), FAMILY_NAME_STR, f.getPath()));
+      assertTrue(
+        StoreFileTrackerForTest.tracked(regionDir.getName(), FAMILY_NAME_STR, f.getPath()));
     }
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerForTest.java
similarity index 91%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileTracker.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerForTest.java
index c89e151..abef80a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileTracker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerForTest.java
@@ -32,14 +32,14 @@ import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class TestStoreFileTracker extends DefaultStoreFileTracker {
+public class StoreFileTrackerForTest extends DefaultStoreFileTracker {
 
-  private static final Logger LOG = LoggerFactory.getLogger(TestStoreFileTracker.class);
+  private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerForTest.class);
   private static ConcurrentMap<String, BlockingQueue<StoreFileInfo>> trackedFiles =
     new ConcurrentHashMap<>();
   private String storeId;
 
-  public TestStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
+  public StoreFileTrackerForTest(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
     super(conf, isPrimaryReplica, ctx);
     if (ctx != null && ctx.getRegionFileSystem() != null) {
       this.storeId = ctx.getRegionInfo().getEncodedName() + "-" + ctx.getFamily().getNameAsString();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileListFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileListFile.java
new file mode 100644
index 0000000..2aba24b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileListFile.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.storefiletracker;
+
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.StoreContext;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList;
+
+@Category({ RegionServerTests.class, SmallTests.class })
+public class TestStoreFileListFile {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestStoreFileListFile.class);
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestStoreFileListFile.class);
+
+  private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
+
+  private Path testDir;
+
+  private StoreFileListFile storeFileListFile;
+
+  @Rule
+  public TestName name = new TestName();
+
+  @Before
+  public void setUp() throws IOException {
+    testDir = UTIL.getDataTestDir(name.getMethodName());
+    HRegionFileSystem hfs = mock(HRegionFileSystem.class);
+    when(hfs.getFileSystem()).thenReturn(FileSystem.get(UTIL.getConfiguration()));
+    StoreContext ctx = StoreContext.getBuilder().withFamilyStoreDirectoryPath(testDir)
+      .withRegionFileSystem(hfs).build();
+    storeFileListFile = new StoreFileListFile(ctx);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    UTIL.cleanupTestDir();
+  }
+
+  @Test
+  public void testEmptyLoad() throws IOException {
+    assertNull(storeFileListFile.load());
+  }
+
+  private FileStatus getOnlyTrackerFile(FileSystem fs) throws IOException {
+    return fs.listStatus(new Path(testDir, StoreFileListFile.TRACK_FILE_DIR))[0];
+  }
+
+  private byte[] readAll(FileSystem fs, Path file) throws IOException {
+    try (FSDataInputStream in = fs.open(file)) {
+      return ByteStreams.toByteArray(in);
+    }
+  }
+
+  private void write(FileSystem fs, Path file, byte[] buf, int off, int len) throws IOException {
+    try (FSDataOutputStream out = fs.create(file, true)) {
+      out.write(buf, off, len);
+    }
+  }
+
+  @Test
+  public void testLoadPartial() throws IOException {
+    StoreFileList.Builder builder = StoreFileList.newBuilder();
+    storeFileListFile.update(builder);
+    FileSystem fs = FileSystem.get(UTIL.getConfiguration());
+    FileStatus trackerFileStatus = getOnlyTrackerFile(fs);
+    // truncate it so we do not have enough data
+    LOG.info("Truncate file {} with size {} to {}", trackerFileStatus.getPath(),
+      trackerFileStatus.getLen(), trackerFileStatus.getLen() / 2);
+    byte[] content = readAll(fs, trackerFileStatus.getPath());
+    write(fs, trackerFileStatus.getPath(), content, 0, content.length / 2);
+    assertNull(storeFileListFile.load());
+  }
+
+  private void writeInt(byte[] buf, int off, int value) {
+    byte[] b = Bytes.toBytes(value);
+    for (int i = 0; i < 4; i++) {
+      buf[off + i] = b[i];
+    }
+  }
+
+  @Test
+  public void testZeroFileLength() throws IOException {
+    StoreFileList.Builder builder = StoreFileList.newBuilder();
+    storeFileListFile.update(builder);
+    FileSystem fs = FileSystem.get(UTIL.getConfiguration());
+    FileStatus trackerFileStatus = getOnlyTrackerFile(fs);
+    // write a zero length
+    byte[] content = readAll(fs, trackerFileStatus.getPath());
+    writeInt(content, 0, 0);
+    write(fs, trackerFileStatus.getPath(), content, 0, content.length);
+    assertThrows(IOException.class, () -> storeFileListFile.load());
+  }
+
+  @Test
+  public void testBigFileLength() throws IOException {
+    StoreFileList.Builder builder = StoreFileList.newBuilder();
+    storeFileListFile.update(builder);
+    FileSystem fs = FileSystem.get(UTIL.getConfiguration());
+    FileStatus trackerFileStatus = getOnlyTrackerFile(fs);
+    // write a large length
+    byte[] content = readAll(fs, trackerFileStatus.getPath());
+    writeInt(content, 0, 128 * 1024 * 1024);
+    write(fs, trackerFileStatus.getPath(), content, 0, content.length);
+    assertThrows(IOException.class, () -> storeFileListFile.load());
+  }
+
+  @Test
+  public void testChecksumMismatch() throws IOException {
+    StoreFileList.Builder builder = StoreFileList.newBuilder();
+    storeFileListFile.update(builder);
+    FileSystem fs = FileSystem.get(UTIL.getConfiguration());
+    FileStatus trackerFileStatus = getOnlyTrackerFile(fs);
+    // flip one byte
+    byte[] content = readAll(fs, trackerFileStatus.getPath());
+    content[5] = (byte) ~content[5];
+    write(fs, trackerFileStatus.getPath(), content, 0, content.length);
+    assertThrows(IOException.class, () -> storeFileListFile.load());
+  }
+}