You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2020/06/13 16:52:06 UTC

[hbase] branch branch-1 updated: HBASE-24189 WALSplit recreates region dirs for deleted table with recovered edits data.

This is an automated email from the ASF dual-hosted git repository.

anoopsamjohn pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new 81ee344  HBASE-24189 WALSplit recreates region dirs for deleted table with recovered edits data.
81ee344 is described below

commit 81ee3444398e266a508689bdec9d353e7290bf05
Author: Anoop Sam John <an...@gmail.com>
AuthorDate: Sat Jun 13 22:21:49 2020 +0530

    HBASE-24189 WALSplit recreates region dirs for deleted table with recovered edits data.
---
 .../apache/hadoop/hbase/util/CommonFSUtils.java    |  18 ++++
 .../org/apache/hadoop/hbase/wal/WALSplitter.java   |  83 +++++++++------
 .../apache/hadoop/hbase/wal/TestWALFactory.java    |   4 +
 .../hadoop/hbase/wal/TestWALReaderOnSecureWAL.java |   4 +-
 .../org/apache/hadoop/hbase/wal/TestWALSplit.java  |   6 +-
 .../wal/TestWALSplitWithDeletedTableData.java      | 111 +++++++++++++++++++++
 6 files changed, 193 insertions(+), 33 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
index 1225d19..e39a25ab 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
@@ -379,6 +379,24 @@ public abstract class CommonFSUtils {
   }
 
   /**
+   * Returns the {@link org.apache.hadoop.fs.Path} object representing the region
+   * directory under path rootdir
+   *
+   * @param rootdir    qualified path of HBase root directory
+   * @param tableName  name of table
+   * @param regionName The encoded region name
+   * @return {@link org.apache.hadoop.fs.Path} for region
+   */
+  public static Path getRegionDir(Path rootdir, TableName tableName, String regionName) {
+    return new Path(getTableDir(rootdir, tableName), regionName);
+  }
+
+  public static Path getWALTableDir(Configuration c, TableName tableName) throws IOException {
+    return new Path(getNamespaceDir(getWALRootDir(c), tableName.getNamespaceAsString()),
+        tableName.getQualifierAsString());
+  }
+
+  /**
    * Returns the {@link org.apache.hadoop.hbase.TableName} object representing
    * the table directory under
    * path rootdir
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 7d9314b..f769e6d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -111,6 +111,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEditsReplaySink;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
@@ -143,7 +144,9 @@ public class WALSplitter {
 
   // Parameters for split process
   protected final Path walDir;
+  protected final Path rootDir;
   protected final FileSystem walFS;
+  protected final FileSystem rootFS;
   protected final Configuration conf;
 
   // Major subcomponents of the split process.
@@ -189,15 +192,17 @@ public class WALSplitter {
   public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded";
 
   @VisibleForTesting
-  WALSplitter(final WALFactory factory, Configuration conf, Path walDir,
-      FileSystem walFS, LastSequenceId idChecker,
-      CoordinatedStateManager csm, RecoveryMode mode) {
+  WALSplitter(final WALFactory factory, Configuration conf, Path walDir, FileSystem walFS,
+      Path rootDir, FileSystem rootFS, LastSequenceId idChecker, CoordinatedStateManager csm,
+      RecoveryMode mode) {
     this.conf = HBaseConfiguration.create(conf);
     String codecClassName = conf
         .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
     this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
     this.walDir = walDir;
     this.walFS = walFS;
+    this.rootDir = rootDir;
+    this.rootFS = rootFS;
     this.sequenceIdChecker = idChecker;
     this.csm = (BaseCoordinatedStateManager)csm;
     this.walFactory = factory;
@@ -249,7 +254,10 @@ public class WALSplitter {
   public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS,
       Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
       CoordinatedStateManager cp, RecoveryMode mode, final WALFactory factory) throws IOException {
-    WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, idChecker, cp, mode);
+    Path rootDir = CommonFSUtils.getRootDir(conf);
+    FileSystem rootFS = rootDir.getFileSystem(conf);
+    WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, idChecker, cp,
+        mode);
     return s.splitLogFile(logfile, reporter);
   }
 
@@ -263,9 +271,11 @@ public class WALSplitter {
         Collections.singletonList(logDir), null);
     List<Path> splits = new ArrayList<Path>();
     if (logfiles != null && logfiles.length > 0) {
+      Path rootDir = CommonFSUtils.getRootDir(conf);
+      FileSystem rootFS = rootDir.getFileSystem(conf);
       for (FileStatus logfile: logfiles) {
-        WALSplitter s = new WALSplitter(factory, conf, walRootDir, walFs, null, null,
-            RecoveryMode.LOG_SPLITTING);
+        WALSplitter s = new WALSplitter(factory, conf, walRootDir, walFs, rootDir, rootFS, null,
+            null, RecoveryMode.LOG_SPLITTING);
         if (s.splitLogFile(logfile, null)) {
           finishSplitLogFile(walRootDir, oldLogDir, logfile.getPath(), conf);
           if (s.outputSink.splits != null) {
@@ -347,34 +357,45 @@ public class WALSplitter {
         String encodedRegionNameAsStr = Bytes.toString(region);
         lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
         if (lastFlushedSequenceId == null) {
-          if (this.distributedLogReplay) {
-            RegionStoreSequenceIds ids =
-                csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
-                  encodedRegionNameAsStr);
-            if (ids != null) {
+          if (!(isRegionDirPresentUnderRoot(entry.getKey().getTablename(),
+              encodedRegionNameAsStr))) {
+            // The region directory itself is not present in the FS. This indicates that
+            // region/table is already removed. We can skip all the edits for this region.
+            // Setting lastFlushedSequenceId as Long.MAX_VALUE so that all edits will get
+            // skipped by the seqId check below. See more details in HBASE-24189
+            LOG.info(encodedRegionNameAsStr
+                + " no longer available in the FS. Skipping all edits for this region.");
+            lastFlushedSequenceId = Long.MAX_VALUE;
+          } else {
+            if (this.distributedLogReplay) {
+              RegionStoreSequenceIds ids = csm.getSplitLogWorkerCoordination()
+                  .getRegionFlushedSequenceId(failedServerName, encodedRegionNameAsStr);
+              if (ids != null) {
+                lastFlushedSequenceId = ids.getLastFlushedSequenceId();
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("DLR Last flushed sequenceid for " + encodedRegionNameAsStr + ": "
+                      + TextFormat.shortDebugString(ids));
+                }
+              }
+            } else if (sequenceIdChecker != null) {
+              RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
+              Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(
+                  Bytes.BYTES_COMPARATOR);
+              for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) {
+                maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
+                    storeSeqId.getSequenceId());
+              }
+              regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores);
               lastFlushedSequenceId = ids.getLastFlushedSequenceId();
               if (LOG.isDebugEnabled()) {
-                LOG.debug("DLR Last flushed sequenceid for " + encodedRegionNameAsStr + ": " +
-                  TextFormat.shortDebugString(ids));
+                LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": "
+                    + TextFormat.shortDebugString(ids));
               }
             }
-          } else if (sequenceIdChecker != null) {
-            RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
-            Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
-            for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) {
-              maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
-                storeSeqId.getSequenceId());
-            }
-            regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores);
-            lastFlushedSequenceId = ids.getLastFlushedSequenceId();
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": " +
-                  TextFormat.shortDebugString(ids));
+            if (lastFlushedSequenceId == null) {
+              lastFlushedSequenceId = -1L;
             }
           }
-          if (lastFlushedSequenceId == null) {
-            lastFlushedSequenceId = -1L;
-          }
           lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId);
         }
         if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
@@ -444,6 +465,12 @@ public class WALSplitter {
     return !progress_failed;
   }
 
+  private boolean isRegionDirPresentUnderRoot(TableName tableName, String regionName)
+      throws IOException {
+    Path regionDirPath = CommonFSUtils.getRegionDir(this.rootDir, tableName, regionName);
+    return this.rootFS.exists(regionDirPath);
+  }
+
   /**
    * Completes the work done by splitLogFile by archiving logs
    * <p>
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
index 4bf211d..31717b6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -172,12 +173,15 @@ public class TestWALFactory {
     Path oldLogDir = new Path(hbaseWALDir, HConstants.HREGION_OLDLOGDIR_NAME);
     final int howmany = 3;
     HRegionInfo[] infos = new HRegionInfo[3];
+    Path tableDataDir = CommonFSUtils.getTableDir(hbaseDir, tableName);
+    fs.mkdirs(tableDataDir);
     Path tabledir = FSUtils.getWALTableDir(conf, tableName);
     fs.mkdirs(tabledir);
     for(int i = 0; i < howmany; i++) {
       infos[i] = new HRegionInfo(tableName,
                 Bytes.toBytes("" + i), Bytes.toBytes("" + (i+1)), false);
       fs.mkdirs(new Path(tabledir, infos[i].getEncodedName()));
+      fs.mkdirs(new Path(tableDataDir, infos[i].getEncodedName()));
       LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString());
     }
     HTableDescriptor htd = new HTableDescriptor(tableName);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
index 613dea3..a5351cf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
@@ -155,7 +155,7 @@ public class TestWALReaderOnSecureWAL {
         RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
     Path rootdir = FSUtils.getRootDir(conf);
     try {
-      WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, null, null, mode);
+      WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, rootdir, fs, null, null, mode);
       s.splitLogFile(listStatus[0], null);
       Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
         "corrupt");
@@ -200,7 +200,7 @@ public class TestWALReaderOnSecureWAL {
         RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
     Path rootdir = FSUtils.getRootDir(conf);
     try {
-      WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, null, null, mode);
+      WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, rootdir, fs, null, null, mode);
       s.splitLogFile(listStatus[0], null);
       Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
         "corrupt");
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
index 153b182..be80d78 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
@@ -803,7 +803,7 @@ public class TestWALSplit {
     assertTrue("There should be some log greater than size 0.", 0 < largestSize);
     // Set up a splitter that will throw an IOE on the output side
     WALSplitter logSplitter = new WALSplitter(wals,
-        conf, HBASEDIR, fs, null, null, this.mode) {
+        conf, HBASEDIR, fs, HBASEDIR, fs, null, null, this.mode) {
       @Override
       protected Writer createWriter(Path logfile) throws IOException {
         Writer mockWriter = Mockito.mock(Writer.class);
@@ -989,7 +989,7 @@ public class TestWALSplit {
 
     // Create a splitter that reads and writes the data without touching disk
     WALSplitter logSplitter = new WALSplitter(wals,
-        localConf, HBASEDIR, fs, null, null, this.mode) {
+        localConf, HBASEDIR, fs, HBASEDIR, fs, null, null, this.mode) {
 
       /* Produce a mock writer that doesn't write anywhere */
       @Override
@@ -1140,7 +1140,7 @@ public class TestWALSplit {
         logfiles != null && logfiles.length > 0);
 
     WALSplitter logSplitter = new WALSplitter(wals,
-        conf, HBASEDIR, fs, null, null, this.mode) {
+        conf, HBASEDIR, fs, HBASEDIR, fs, null, null, this.mode) {
       @Override
       protected Writer createWriter(Path logfile)
           throws IOException {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitWithDeletedTableData.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitWithDeletedTableData.java
new file mode 100644
index 0000000..f0aa54d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitWithDeletedTableData.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({RegionServerTests.class, LargeTests.class})
+public class TestWALSplitWithDeletedTableData {
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    TEST_UTIL.startMiniCluster(2);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testWALSplitWithDeletedTableData() throws Exception {
+    final byte[] CFNAME = Bytes.toBytes("f1");
+    final byte[] QNAME = Bytes.toBytes("q1");
+    final byte[] VALUE = Bytes.toBytes("v1");
+    final TableName t1 = TableName.valueOf("t1");
+    final TableName t2 = TableName.valueOf("t2");
+    final byte[][] splitRows = { Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c"),
+        Bytes.toBytes("d") };
+    HTableDescriptor htd = new HTableDescriptor(t1);
+    htd.addFamily(new HColumnDescriptor(CFNAME));
+    Table tab1 = TEST_UTIL.createTable(htd, splitRows);
+    HTableDescriptor htd2 = new HTableDescriptor(t2);
+    htd2.addFamily(new HColumnDescriptor(CFNAME));
+    Table tab2 = TEST_UTIL.createTable(htd2, splitRows);
+    List<Put> puts = new ArrayList<Put>(4);
+    byte[][] rks = { Bytes.toBytes("ac"), Bytes.toBytes("ba"), Bytes.toBytes("ca"),
+        Bytes.toBytes("dd") };
+    for (byte[] rk : rks) {
+      puts.add(new Put(rk).addColumn(CFNAME, QNAME, VALUE));
+    }
+    tab1.put(puts);
+    tab2.put(puts);
+    MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
+    TEST_UTIL.deleteTable(t1);
+    Path tableDir = CommonFSUtils.getWALTableDir(TEST_UTIL.getConfiguration(), t1);
+    // Dropping table 't1' removed the table directory from the WAL FS completely
+    assertFalse(TEST_UTIL.getDFSCluster().getFileSystem().exists(tableDir));
+    ServerName rs1 = cluster.getRegionServer(1).getServerName();
+    // Kill one RS and wait for the WAL split and replay be over.
+    cluster.killRegionServer(rs1);
+    cluster.waitForRegionServerToStop(rs1, 60 * 1000);
+    assertEquals(1, cluster.hbaseCluster.getLiveRegionServers().size());
+    Thread.sleep(1 * 1000);
+    TEST_UTIL.waitUntilNoRegionsInTransition(60 * 1000);
+    // Table 't1' is dropped. Assert table directory does not exist in WAL FS after WAL split.
+    assertFalse(TEST_UTIL.getDFSCluster().getFileSystem().exists(tableDir));
+    // Assert the table t2 region's data getting replayed after WAL split and available
+    for (byte[] rk : rks) {
+      Result result = tab2.get(new Get(rk));
+      assertFalse(result.isEmpty());
+      Cell cell = result.getColumnLatestCell(CFNAME, QNAME);
+      assertNotNull(cell);
+      assertTrue(CellUtil.matchingValue(cell, VALUE));
+    }
+  }
+}
\ No newline at end of file