You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2020/03/23 07:50:53 UTC

[hbase] branch branch-2.3 updated: HBASE-23741 Data loss when WAL split to HFile enabled (#1254)

This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new c519576  HBASE-23741 Data loss when WAL split to HFile enabled (#1254)
c519576 is described below

commit c519576fcbdafa55604536e77977e0c363565934
Author: Guanghao Zhang <zg...@apache.org>
AuthorDate: Mon Mar 23 14:38:47 2020 +0800

    HBASE-23741 Data loss when WAL split to HFile enabled (#1254)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../apache/hadoop/hbase/regionserver/HRegion.java  | 10 +--
 .../apache/hadoop/hbase/regionserver/HStore.java   | 36 +++++++++
 .../wal/BoundedRecoveredHFilesOutputSink.java      | 57 +++++++-------
 .../org/apache/hadoop/hbase/wal/WALSplitUtil.java  | 49 +++++-------
 .../hadoop/hbase/wal/TestWALSplitToHFile.java      | 88 ++++++++++++++++++++++
 5 files changed, 171 insertions(+), 69 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 6076aec..aabda40 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -5390,7 +5390,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   }
 
   private long loadRecoveredHFilesIfAny(Collection<HStore> stores) throws IOException {
-    Path regionDir = getWALRegionDir();
+    Path regionDir = fs.getRegionDir();
     long maxSeqId = -1;
     for (HStore store : stores) {
       String familyName = store.getColumnFamilyName();
@@ -5403,17 +5403,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           if (isZeroLengthThenDelete(fs.getFileSystem(), file, filePath)) {
             continue;
           }
-
           try {
-            store.assertBulkLoadHFileOk(filePath);
+            HStoreFile storefile = store.tryCommitRecoveredHFile(file.getPath());
+            maxSeqId = Math.max(maxSeqId, storefile.getReader().getSequenceID());
           } catch (IOException e) {
             handleException(fs.getFileSystem(), filePath, e);
             continue;
           }
-          Pair<Path, Path> pair = store.preBulkLoadHFile(filePath.toString(), -1);
-          store.bulkLoadHFile(Bytes.toBytes(familyName), pair.getFirst().toString(),
-            pair.getSecond());
-          maxSeqId = Math.max(maxSeqId, WALSplitUtil.getSeqIdForRecoveredHFile(filePath.getName()));
         }
         if (this.rsServices != null && store.needsCompaction()) {
           this.rsServices.getCompactionRequestor()
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 7e5d88b..6103819 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1086,6 +1086,42 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     throw lastException;
   }
 
+  public HStoreFile tryCommitRecoveredHFile(Path path) throws IOException {
+    LOG.info("Validating recovered hfile at {} for inclusion in store {} region {}", path, this,
+      getRegionInfo().getRegionNameAsString());
+    FileSystem srcFs = path.getFileSystem(conf);
+    srcFs.access(path, FsAction.READ_WRITE);
+    try (HFile.Reader reader =
+        HFile.createReader(srcFs, path, cacheConf, isPrimaryReplicaStore(), conf)) {
+      Optional<byte[]> firstKey = reader.getFirstRowKey();
+      Preconditions.checkState(firstKey.isPresent(), "First key can not be null");
+      Optional<Cell> lk = reader.getLastKey();
+      Preconditions.checkState(lk.isPresent(), "Last key can not be null");
+      byte[] lastKey = CellUtil.cloneRow(lk.get());
+      if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) {
+        throw new WrongRegionException("Recovered hfile " + path.toString() +
+            " does not fit inside region " + this.getRegionInfo().getRegionNameAsString());
+      }
+    }
+
+    Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path);
+    HStoreFile sf = createStoreFileAndReader(dstPath);
+    StoreFileReader r = sf.getReader();
+    this.storeSize.addAndGet(r.length());
+    this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
+
+    this.lock.writeLock().lock();
+    try {
+      this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf));
+    } finally {
+      this.lock.writeLock().unlock();
+    }
+
+    LOG.info("Loaded recovered hfile to {}, entries={}, sequenceid={}, filesize={}", sf,
+      r.getEntries(), r.getSequenceID(), TraditionalBinaryPrefix.long2String(r.length(), "B", 1));
+    return sf;
+  }
+
   /**
    * @param path The pathname of the tmp file into which the store was flushed
    * @return store file created.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java
index b5f922a..c340265 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java
@@ -32,15 +32,17 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellComparatorImpl;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.regionserver.CellSet;
+import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.EntryBuffers.RegionEntryBuffer;
@@ -85,11 +87,14 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
         if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
           continue;
         }
+        PrivateCellUtil.setSequenceId(cell, seqId);
         String familyName = Bytes.toString(CellUtil.cloneFamily(cell));
         // comparator need to be specified for meta
-        familyCells.computeIfAbsent(familyName, key -> new CellSet(
-          isMetaTable ? CellComparatorImpl.META_COMPARATOR : CellComparator.getInstance()))
-          .add(cell);
+        familyCells
+            .computeIfAbsent(familyName,
+              key -> new CellSet(
+                  isMetaTable ? CellComparatorImpl.META_COMPARATOR : CellComparatorImpl.COMPARATOR))
+            .add(cell);
         familySeqIds.compute(familyName, (k, v) -> v == null ? seqId : Math.max(v, seqId));
       }
     }
@@ -105,6 +110,8 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
         for (Cell cell : cellsEntry.getValue()) {
           writer.append(cell);
         }
+        // Append the max seqid to hfile, used when recovery.
+        writer.appendMetadata(familySeqIds.get(familyName), false);
         regionEditsWrittenMap.compute(Bytes.toString(buffer.encodedRegionName),
           (k, v) -> v == null ? buffer.entries.size() : v + buffer.entries.size());
         splits.add(writer.getPath());
@@ -181,44 +188,32 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
 
   private StoreFileWriter createRecoveredHFileWriter(TableName tableName, String regionName,
       long seqId, String familyName, boolean isMetaTable) throws IOException {
-    Path outputFile = WALSplitUtil
-      .getRegionRecoveredHFilePath(tableName, regionName, familyName, seqId,
-        walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.conf, walSplitter.rootFS);
-    checkPathValid(outputFile);
+    Path outputDir = WALSplitUtil.tryCreateRecoveredHFilesDir(walSplitter.rootFS, walSplitter.conf,
+      tableName, regionName, familyName);
     StoreFileWriter.Builder writerBuilder =
         new StoreFileWriter.Builder(walSplitter.conf, CacheConfig.DISABLED, walSplitter.rootFS)
-            .withFilePath(outputFile);
-    HFileContextBuilder hFileContextBuilder = new HFileContextBuilder();
-    if (isMetaTable) {
-      hFileContextBuilder.withCellComparator(CellComparatorImpl.META_COMPARATOR);
-    } else {
-      configContextForNonMetaWriter(tableName, familyName, hFileContextBuilder, writerBuilder);
-    }
-    return writerBuilder.withFileContext(hFileContextBuilder.build()).build();
-  }
+            .withOutputDir(outputDir);
 
-  private void configContextForNonMetaWriter(TableName tableName, String familyName,
-      HFileContextBuilder hFileContextBuilder, StoreFileWriter.Builder writerBuilder)
-      throws IOException {
     TableDescriptor tableDesc =
         tableDescCache.computeIfAbsent(tableName, t -> getTableDescriptor(t));
     if (tableDesc == null) {
       throw new IOException("Failed to get table descriptor for table " + tableName);
     }
     ColumnFamilyDescriptor cfd = tableDesc.getColumnFamily(Bytes.toBytesBinary(familyName));
-    hFileContextBuilder.withCompression(cfd.getCompressionType()).withBlockSize(cfd.getBlocksize())
-        .withCompressTags(cfd.isCompressTags()).withDataBlockEncoding(cfd.getDataBlockEncoding())
-        .withCellComparator(CellComparatorImpl.COMPARATOR);
-    writerBuilder.withBloomType(cfd.getBloomFilterType());
+    HFileContext hFileContext = createFileContext(cfd, isMetaTable);
+    return writerBuilder.withFileContext(hFileContext).withBloomType(cfd.getBloomFilterType())
+        .build();
   }
 
-  private void checkPathValid(Path outputFile) throws IOException {
-    if (walSplitter.rootFS.exists(outputFile)) {
-      LOG.warn("this file {} may be left after last failed split ", outputFile);
-      if (!walSplitter.rootFS.delete(outputFile, false)) {
-        LOG.warn("delete old generated HFile {} failed", outputFile);
-      }
-    }
+  private HFileContext createFileContext(ColumnFamilyDescriptor cfd, boolean isMetaTable)
+      throws IOException {
+    return new HFileContextBuilder().withCompression(cfd.getCompressionType())
+        .withChecksumType(HStore.getChecksumType(walSplitter.conf))
+        .withBytesPerCheckSum(HStore.getBytesPerChecksum(walSplitter.conf))
+        .withBlockSize(cfd.getBlocksize()).withCompressTags(cfd.isCompressTags())
+        .withDataBlockEncoding(cfd.getDataBlockEncoding()).withCellComparator(
+          isMetaTable ? CellComparatorImpl.META_COMPARATOR : CellComparatorImpl.COMPARATOR)
+        .build();
   }
 
   private TableDescriptor getTableDescriptor(TableName tableName) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
index d2a8142..a262aa7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
@@ -565,42 +565,29 @@ public final class WALSplitUtil {
   }
 
   /**
-   * Path to a file under recovered.hfiles directory of the region's column family: e.g.
-   * /hbase/some_table/2323432434/cf/recovered.hfiles/2332-wal. This method also ensures existence
-   * of recovered.hfiles directory under the region's column family, creating it if necessary.
-   *
-   * @param tableName          the table name
-   * @param encodedRegionName  the encoded region name
-   * @param familyName         the column family name
-   * @param seqId              the sequence id which used to generate file name
+   * Return path to recovered.hfiles directory of the region's column family: e.g.
+   * /hbase/some_table/2323432434/cf/recovered.hfiles/. This method also ensures existence of
+   * recovered.hfiles directory under the region's column family, creating it if necessary.
+   * @param rootFS the root file system
+   * @param conf configuration
+   * @param tableName the table name
+   * @param encodedRegionName the encoded region name
+   * @param familyName the column family name
+   * @param seqId the sequence id which used to generate file name
    * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name
-   * @param conf               configuration
-   * @param rootFS             the root file system
-   * @return Path to file into which to dump split log edits.
+   * @return Path to recovered.hfiles directory of the region's column family.
    */
-  static Path getRegionRecoveredHFilePath(TableName tableName, String encodedRegionName,
-    String familyName, long seqId, String fileNameBeingSplit, Configuration conf, FileSystem rootFS)
-    throws IOException {
+  static Path tryCreateRecoveredHFilesDir(FileSystem rootFS, Configuration conf,
+      TableName tableName, String encodedRegionName, String familyName) throws IOException {
     Path rootDir = FSUtils.getRootDir(conf);
-    Path regionDir =
-      FSUtils.getRegionDirFromTableDir(FSUtils.getTableDir(rootDir, tableName), encodedRegionName);
-    Path dir = getStoreDirRecoveredHFilesDir(regionDir, familyName);
-
+    Path regionDir = FSUtils.getRegionDirFromTableDir(FSUtils.getTableDir(rootDir, tableName),
+      encodedRegionName);
+    Path dir = getRecoveredHFilesDir(regionDir, familyName);
     if (!rootFS.exists(dir) && !rootFS.mkdirs(dir)) {
       LOG.warn("mkdir failed on {}, region {}, column family {}", dir, encodedRegionName,
         familyName);
     }
-
-    String fileName = formatRecoveredHFileName(seqId, fileNameBeingSplit);
-    return new Path(dir, fileName);
-  }
-
-  private static String formatRecoveredHFileName(long seqId, String fileNameBeingSplit) {
-    return String.format("%019d", seqId) + "-" + fileNameBeingSplit;
-  }
-
-  public static long getSeqIdForRecoveredHFile(String fileName) {
-    return Long.parseLong(fileName.split("-")[0]);
+    return dir;
   }
 
   /**
@@ -608,13 +595,13 @@ public final class WALSplitUtil {
    * @param familyName The column family name
    * @return The directory that holds recovered hfiles for the region's column family
    */
-  private static Path getStoreDirRecoveredHFilesDir(final Path regionDir, String familyName) {
+  private static Path getRecoveredHFilesDir(final Path regionDir, String familyName) {
     return new Path(new Path(regionDir, familyName), HConstants.RECOVERED_HFILES_DIR);
   }
 
   public static FileStatus[] getRecoveredHFiles(final FileSystem rootFS,
       final Path regionDir, String familyName) throws IOException {
-    Path dir = getStoreDirRecoveredHFilesDir(regionDir, familyName);
+    Path dir = getRecoveredHFilesDir(regionDir, familyName);
     return FSUtils.listStatus(rootFS, dir);
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java
index a7fb731..5d762dc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java
@@ -30,7 +30,9 @@ import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -252,6 +254,92 @@ public class TestWALSplitToHFile {
     }
   }
 
+  @Test
+  public void testPutWithSameTimestamp() throws Exception {
+    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
+    TableDescriptor td = pair.getFirst();
+    RegionInfo ri = pair.getSecond();
+
+    WAL wal = createWAL(this.conf, rootDir, logName);
+    HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
+    final long timestamp = this.ee.currentTime();
+    // Write data and flush
+    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
+      region.put(new Put(ROW).addColumn(cfd.getName(), Bytes.toBytes("x"), timestamp, VALUE1));
+    }
+    region.flush(true);
+
+    // Now assert edits made it in.
+    Result result1 = region.get(new Get(ROW));
+    assertEquals(td.getColumnFamilies().length, result1.size());
+    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
+      assertTrue(Bytes.equals(VALUE1, result1.getValue(cfd.getName(), Bytes.toBytes("x"))));
+    }
+
+    // Write data with same timestamp and do not flush
+    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
+      region.put(new Put(ROW).addColumn(cfd.getName(), Bytes.toBytes("x"), timestamp, VALUE2));
+    }
+    // Now close the region (without flush)
+    region.close(true);
+    wal.shutdown();
+    // split the log
+    WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
+
+    // reopen the region
+    WAL wal2 = createWAL(this.conf, rootDir, logName);
+    HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2);
+    Result result2 = region2.get(new Get(ROW));
+    assertEquals(td.getColumnFamilies().length, result2.size());
+    for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
+      assertTrue(Bytes.equals(VALUE2, result2.getValue(cfd.getName(), Bytes.toBytes("x"))));
+    }
+  }
+
+  @Test
+  public void testRecoverSequenceId() throws Exception {
+    Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
+    TableDescriptor td = pair.getFirst();
+    RegionInfo ri = pair.getSecond();
+
+    WAL wal = createWAL(this.conf, rootDir, logName);
+    HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
+    Map<Integer, Map<String, Long>> seqIdMap = new HashMap<>();
+    // Write data and do not flush
+    for (int i = 0; i < countPerFamily; i++) {
+      for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
+        region.put(new Put(Bytes.toBytes(i)).addColumn(cfd.getName(), Bytes.toBytes("x"), VALUE1));
+        Result result = region.get(new Get(Bytes.toBytes(i)).addFamily(cfd.getName()));
+        assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), Bytes.toBytes("x"))));
+        List<Cell> cells = result.listCells();
+        assertEquals(1, cells.size());
+        seqIdMap.computeIfAbsent(i, r -> new HashMap<>()).put(cfd.getNameAsString(),
+          cells.get(0).getSequenceId());
+      }
+    }
+
+    // Now close the region (without flush)
+    region.close(true);
+    wal.shutdown();
+    // split the log
+    WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
+
+    // reopen the region
+    WAL wal2 = createWAL(this.conf, rootDir, logName);
+    HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2);
+    // assert the seqid was recovered
+    for (int i = 0; i < countPerFamily; i++) {
+      for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
+        Result result = region2.get(new Get(Bytes.toBytes(i)).addFamily(cfd.getName()));
+        assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), Bytes.toBytes("x"))));
+        List<Cell> cells = result.listCells();
+        assertEquals(1, cells.size());
+        assertEquals((long) seqIdMap.get(i).get(cfd.getNameAsString()),
+          cells.get(0).getSequenceId());
+      }
+    }
+  }
+
   /**
    * Test writing edits into an HRegion, closing it, splitting logs, opening
    * Region again.  Verify seqids.