You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2020/03/22 14:56:44 UTC

[hbase] branch master updated: HBASE-24033 Add ut for loading the corrupt recovered hfiles (#1322)

This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a60a61  HBASE-24033 Add ut for loading the corrupt recovered hfiles (#1322)
2a60a61 is described below

commit 2a60a61a73d3b001324e159397b2083dfcbbc2aa
Author: Guanghao Zhang <zg...@apache.org>
AuthorDate: Sun Mar 22 22:56:30 2020 +0800

    HBASE-24033 Add ut for loading the corrupt recovered hfiles (#1322)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../apache/hadoop/hbase/regionserver/HRegion.java  |   1 +
 .../org/apache/hadoop/hbase/wal/WALSplitUtil.java  |   6 +-
 .../hadoop/hbase/wal/TestWALSplitToHFile.java      | 168 ++++++++++++++++-----
 3 files changed, 133 insertions(+), 42 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 11a1018..a948355 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -5449,6 +5449,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             store.assertBulkLoadHFileOk(filePath);
           } catch (IOException e) {
             handleException(fs.getFileSystem(), filePath, e);
+            continue;
           }
           Pair<Path, Path> pair = store.preBulkLoadHFile(filePath.toString(), -1);
           store.bulkLoadHFile(Bytes.toBytes(familyName), pair.getFirst().toString(),
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
index 8011a8b..3ff1e70 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
@@ -345,16 +345,16 @@ public final class WALSplitUtil {
 
   /**
    * Move aside a bad edits file.
-   * @param walFS WAL FileSystem used to rename bad edits file.
+   * @param fs the file system used to rename bad edits file.
    * @param edits Edits file to move aside.
    * @return The name of the moved aside file.
    * @throws IOException
    */
-  public static Path moveAsideBadEditsFile(final FileSystem walFS, final Path edits)
+  public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits)
       throws IOException {
     Path moveAsideName =
         new Path(edits.getParent(), edits.getName() + "." + System.currentTimeMillis());
-    if (!walFS.rename(edits, moveAsideName)) {
+    if (!fs.rename(edits, moveAsideName)) {
       LOG.warn("Rename failed from {} to {}", edits, moveAsideName);
     }
     return moveAsideName;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java
index eaf6b60..a7fb731 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.wal;
 import static org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay.addRegionEdits;
 import static org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.WAL_SPLIT_TO_HFILE;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.when;
@@ -32,6 +33,9 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
@@ -51,6 +55,7 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
 import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -61,10 +66,12 @@ import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -95,6 +102,11 @@ public class TestWALSplitToHFile {
   private Configuration conf;
   private WALFactory wals;
 
+  private static final byte[] ROW = Bytes.toBytes("row");
+  private static final byte[] VALUE1 = Bytes.toBytes("value1");
+  private static final byte[] VALUE2 = Bytes.toBytes("value2");
+  private static final int countPerFamily = 10;
+
   @Rule
   public final TestName TEST_NAME = new TestName();
 
@@ -116,6 +128,7 @@ public class TestWALSplitToHFile {
   @Before
   public void setUp() throws Exception {
     this.conf = HBaseConfiguration.create(UTIL.getConfiguration());
+    this.conf.setBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, false);
     this.fs = UTIL.getDFSCluster().getFileSystem();
     this.rootDir = FSUtils.getRootDir(this.conf);
     this.oldLogDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
@@ -163,24 +176,93 @@ public class TestWALSplitToHFile {
     return wal;
   }
 
-  /**
-   * Test writing edits into an HRegion, closing it, splitting logs, opening
-   * Region again.  Verify seqids.
-   */
-  @Test
-  public void testReplayEditsWrittenViaHRegion()
-      throws IOException, SecurityException, IllegalArgumentException, InterruptedException {
+  private Pair<TableDescriptor, RegionInfo> setupTableAndRegion() throws IOException {
     final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
     final TableDescriptor td = createBasic3FamilyTD(tableName);
     final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
     final Path tableDir = FSUtils.getTableDir(this.rootDir, tableName);
     deleteDir(tableDir);
     FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false);
-    final byte[] rowName = tableName.getName();
-    final int countPerFamily = 10;
+    HRegion region = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td);
+    HBaseTestingUtility.closeRegionAndWAL(region);
+    return new Pair<>(td, ri);
+  }
+
+  @Test
+  public void testCorruptRecoveredHFile() throws Exception {
+    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
+    TableDescriptor td = pair.getFirst();
+    RegionInfo ri = pair.getSecond();
+
+    WAL wal = createWAL(this.conf, rootDir, logName);
+    HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
+    final long timestamp = this.ee.currentTime();
+    // Write data and flush
+    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
+      region.put(new Put(ROW).addColumn(cfd.getName(), Bytes.toBytes("x"), timestamp, VALUE1));
+    }
+    region.flush(true);
+
+    // Now assert edits made it in.
+    Result result1 = region.get(new Get(ROW));
+    assertEquals(td.getColumnFamilies().length, result1.size());
+    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
+      assertTrue(Bytes.equals(VALUE1, result1.getValue(cfd.getName(), Bytes.toBytes("x"))));
+    }
+
+    // Now close the region
+    region.close(true);
+    wal.shutdown();
+    // split the log
+    WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
+
+    // Write a corrupt recovered hfile
+    Path regionDir =
+        new Path(CommonFSUtils.getTableDir(rootDir, td.getTableName()), ri.getEncodedName());
+    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
+      FileStatus[] files =
+          WALSplitUtil.getRecoveredHFiles(this.fs, regionDir, cfd.getNameAsString());
+      assertNotNull(files);
+      assertTrue(files.length > 0);
+      writeCorruptRecoveredHFile(files[0].getPath());
+    }
+
+    // Failed to reopen the region
+    WAL wal2 = createWAL(this.conf, rootDir, logName);
+    try {
+      HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
+      fail("Should fail to open region");
+    } catch (CorruptHFileException che) {
+      // Expected
+    }
+
+    // Set skip errors to true and reopen the region
+    this.conf.setBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, true);
+    HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
+    Result result2 = region2.get(new Get(ROW));
+    assertEquals(td.getColumnFamilies().length, result2.size());
+    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
+      assertTrue(Bytes.equals(VALUE1, result2.getValue(cfd.getName(), Bytes.toBytes("x"))));
+      // Assert the corrupt file was skipped and still exist
+      FileStatus[] files =
+          WALSplitUtil.getRecoveredHFiles(this.fs, regionDir, cfd.getNameAsString());
+      assertNotNull(files);
+      assertEquals(1, files.length);
+      assertTrue(files[0].getPath().getName().contains("corrupt"));
+    }
+  }
+
+  /**
+   * Test writing edits into an HRegion, closing it, splitting logs, opening
+   * Region again.  Verify seqids.
+   */
+  @Test
+  public void testWrittenViaHRegion()
+      throws IOException, SecurityException, IllegalArgumentException, InterruptedException {
+    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
+    TableDescriptor td = pair.getFirst();
+    RegionInfo ri = pair.getSecond();
 
-    HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td);
-    HBaseTestingUtility.closeRegionAndWAL(region3);
     // Write countPerFamily edits into the three families.  Do a flush on one
     // of the families during the load of edits so its seqid is not same as
     // others to test we do right thing when different seqids.
@@ -189,7 +271,7 @@ public class TestWALSplitToHFile {
     long seqid = region.getOpenSeqNum();
     boolean first = true;
     for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
-      addRegionEdits(rowName, cfd.getName(), countPerFamily, this.ee, region, "x");
+      addRegionEdits(ROW, cfd.getName(), countPerFamily, this.ee, region, "x");
       if (first) {
         // If first, so we have at least one family w/ different seqid to rest.
         region.flush(true);
@@ -197,7 +279,7 @@ public class TestWALSplitToHFile {
       }
     }
     // Now assert edits made it in.
-    final Get g = new Get(rowName);
+    final Get g = new Get(ROW);
     Result result = region.get(g);
     assertEquals(countPerFamily * td.getColumnFamilies().length, result.size());
     // Now close the region (without flush), split the log, reopen the region and assert that
@@ -222,14 +304,14 @@ public class TestWALSplitToHFile {
     // out from under it and assert that replay of the log adds the edits back
     // correctly when region is opened again.
     for (ColumnFamilyDescriptor hcd : td.getColumnFamilies()) {
-      addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y");
+      addRegionEdits(ROW, hcd.getName(), countPerFamily, this.ee, region2, "y");
     }
     // Get count of edits.
     final Result result2 = region2.get(g);
     assertEquals(2 * result.size(), result2.size());
     wal2.sync();
     final Configuration newConf = HBaseConfiguration.create(this.conf);
-    User user = HBaseTestingUtility.getDifferentUser(newConf, tableName.getNameAsString());
+    User user = HBaseTestingUtility.getDifferentUser(newConf, td.getTableName().getNameAsString());
     user.runAs(new PrivilegedExceptionAction<Object>() {
       @Override
       public Object run() throws Exception {
@@ -237,6 +319,7 @@ public class TestWALSplitToHFile {
         FileSystem newFS = FileSystem.get(newConf);
         // Make a new wal for new region open.
         WAL wal3 = createWAL(newConf, rootDir, logName);
+        Path tableDir = FSUtils.getTableDir(rootDir, td.getTableName());
         HRegion region3 = new HRegion(tableDir, wal3, newFS, newConf, ri, td, null);
         long seqid3 = region3.initialize();
         Result result3 = region3.get(g);
@@ -262,18 +345,12 @@ public class TestWALSplitToHFile {
    * We restart Region again, and verify that the edits were replayed.
    */
   @Test
-  public void testReplayEditsAfterPartialFlush()
+  public void testAfterPartialFlush()
       throws IOException, SecurityException, IllegalArgumentException {
-    final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
-    final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
-    final Path tableDir = FSUtils.getTableDir(this.rootDir, tableName);
-    deleteDir(tableDir);
-    final byte[] rowName = tableName.getName();
-    final int countPerFamily = 10;
-    final TableDescriptor td = createBasic3FamilyTD(tableName);
-    FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false);
-    HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td);
-    HBaseTestingUtility.closeRegionAndWAL(region3);
+    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
+    TableDescriptor td = pair.getFirst();
+    RegionInfo ri = pair.getSecond();
+
     // Write countPerFamily edits into the three families.  Do a flush on one
     // of the families during the load of edits so its seqid is not same as
     // others to test we do right thing when different seqids.
@@ -281,11 +358,11 @@ public class TestWALSplitToHFile {
     HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
     long seqid = region.getOpenSeqNum();
     for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
-      addRegionEdits(rowName, cfd.getName(), countPerFamily, this.ee, region, "x");
+      addRegionEdits(ROW, cfd.getName(), countPerFamily, this.ee, region, "x");
     }
 
     // Now assert edits made it in.
-    final Get g = new Get(rowName);
+    final Get g = new Get(ROW);
     Result result = region.get(g);
     assertEquals(countPerFamily * td.getColumnFamilies().length, result.size());
 
@@ -323,15 +400,11 @@ public class TestWALSplitToHFile {
    * and flush again, at last verify the data.
    */
   @Test
-  public void testReplayEditsAfterAbortingFlush() throws IOException {
-    final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
-    final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
-    final Path tableDir = FSUtils.getTableDir(this.rootDir, tableName);
-    deleteDir(tableDir);
-    final TableDescriptor td = createBasic3FamilyTD(tableName);
-    FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false);
-    HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td);
-    HBaseTestingUtility.closeRegionAndWAL(region3);
+  public void testAfterAbortingFlush() throws IOException {
+    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
+    TableDescriptor td = pair.getFirst();
+    RegionInfo ri = pair.getSecond();
+
     // Write countPerFamily edits into the three families. Do a flush on one
     // of the families during the load of edits so its seqid is not same as
     // others to test we do right thing when different seqids.
@@ -347,7 +420,7 @@ public class TestWALSplitToHFile {
     int writtenRowCount = 10;
     List<ColumnFamilyDescriptor> families = Arrays.asList(td.getColumnFamilies());
     for (int i = 0; i < writtenRowCount; i++) {
-      Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
+      Put put = new Put(Bytes.toBytes(td.getTableName() + Integer.toString(i)));
       put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
           Bytes.toBytes("val"));
       region.put(put);
@@ -372,7 +445,7 @@ public class TestWALSplitToHFile {
     // writing more data
     int moreRow = 10;
     for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) {
-      Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
+      Put put = new Put(Bytes.toBytes(td.getTableName() + Integer.toString(i)));
       put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
           Bytes.toBytes("val"));
       region.put(put);
@@ -414,4 +487,21 @@ public class TestWALSplitToHFile {
     }
     return scannedCount;
   }
+
+  private void writeCorruptRecoveredHFile(Path recoveredHFile) throws Exception {
+    // Read the recovered hfile
+    int fileSize = (int) fs.listStatus(recoveredHFile)[0].getLen();
+    FSDataInputStream in = fs.open(recoveredHFile);
+    byte[] fileContent = new byte[fileSize];
+    in.readFully(0, fileContent, 0, fileSize);
+    in.close();
+
+    // Write a corrupt hfile by append garbage
+    Path path = new Path(recoveredHFile.getParent(), recoveredHFile.getName() + ".corrupt");
+    FSDataOutputStream out;
+    out = fs.create(path);
+    out.write(fileContent);
+    out.write(Bytes.toBytes("-----"));
+    out.close();
+  }
 }