You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2022/10/21 09:53:25 UTC

[hbase] branch branch-2.5 updated: HBASE-27433 DefaultMobStoreCompactor should delete MobStoreFile cleanly when compaction is failed (#4840)

This is an automated email from the ASF dual-hosted git repository.

chenglei pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new d4145db6abb HBASE-27433 DefaultMobStoreCompactor should delete MobStoreFile cleanly when compaction is failed (#4840)
d4145db6abb is described below

commit d4145db6abba119e289d3aee50f308fab1d9bcf5
Author: chenglei <ch...@apache.org>
AuthorDate: Fri Oct 21 17:53:19 2022 +0800

    HBASE-27433 DefaultMobStoreCompactor should delete MobStoreFile cleanly when compaction is failed (#4840)
---
 .../hadoop/hbase/mob/DefaultMobStoreCompactor.java |  49 +++-
 .../hbase/mob/TestMobCompactionWithException.java  | 254 +++++++++++++++++++++
 2 files changed, 295 insertions(+), 8 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
index e46eb0f6209..9c9985ba8d6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
@@ -354,6 +354,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
       (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize();
 
     Cell mobCell = null;
+    List<String> committedMobWriterFileNames = new ArrayList<>();
     try {
 
       mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker());
@@ -425,8 +426,8 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
                     if (len > maxMobFileSize) {
                       LOG.debug("Closing output MOB File, length={} file={}, store={}", len,
                         mobFileWriter.getPath().getName(), getStoreInfo());
-                      commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
-                      mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker());
+                      mobFileWriter = switchToNewMobWriter(mobFileWriter, fd, mobCells, major,
+                        request, committedMobWriterFileNames);
                       fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
                       mobCells = 0;
                     }
@@ -469,8 +470,8 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
                   // file compression yet)
                   long len = mobFileWriter.getPos();
                   if (len > maxMobFileSize) {
-                    commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
-                    mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker());
+                    mobFileWriter = switchToNewMobWriter(mobFileWriter, fd, mobCells, major,
+                      request, committedMobWriterFileNames);
                     fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
                     mobCells = 0;
                   }
@@ -521,8 +522,8 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
             if (ioOptimizedMode) {
               long len = mobFileWriter.getPos();
               if (len > maxMobFileSize) {
-                commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
-                mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker());
+                mobFileWriter = switchToNewMobWriter(mobFileWriter, fd, mobCells, major, request,
+                  committedMobWriterFileNames);
                 fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
                 mobCells = 0;
               }
@@ -561,6 +562,8 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
         }
         cells.clear();
       } while (hasMore);
+      // Commit last MOB writer
+      commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
       finished = true;
     } catch (InterruptedException e) {
       progress.cancel();
@@ -582,11 +585,10 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
         LOG.debug("Aborting writer for {} because of a compaction failure, Store {}",
           mobFileWriter.getPath(), getStoreInfo());
         abortWriter(mobFileWriter);
+        deleteCommittedMobFiles(committedMobWriterFileNames);
       }
     }
 
-    // Commit last MOB writer
-    commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
     mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob);
     mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob);
     mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob);
@@ -671,4 +673,35 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
     clearThreadLocals();
     return newFiles;
   }
+
+  private StoreFileWriter switchToNewMobWriter(StoreFileWriter mobFileWriter, FileDetails fd,
+    long mobCells, boolean major, CompactionRequestImpl request,
+    List<String> committedMobWriterFileNames) throws IOException {
+    commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
+    committedMobWriterFileNames.add(mobFileWriter.getPath().getName());
+    return newMobWriter(fd, major, request.getWriterCreationTracker());
+  }
+
+  private void deleteCommittedMobFiles(List<String> fileNames) {
+    if (fileNames.isEmpty()) {
+      return;
+    }
+    Path mobColumnFamilyPath =
+      MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
+    for (String fileName : fileNames) {
+      if (fileName == null) {
+        continue;
+      }
+      Path path = new Path(mobColumnFamilyPath, fileName);
+      try {
+        if (store.getFileSystem().exists(path)) {
+          store.getFileSystem().delete(path, false);
+        }
+      } catch (IOException e) {
+        LOG.warn("Failed to delete the mob file {} for an failed mob compaction.", path, e);
+      }
+    }
+
+  }
+
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithException.java
new file mode 100644
index 00000000000..7aaedf170ac
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithException.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
+import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.CellSink;
+import org.apache.hadoop.hbase.regionserver.HMobStore;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.RegionAsTable;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
+import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
+import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category(MediumTests.class)
+public class TestMobCompactionWithException {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestMobCompactionWithException.class);
+
+  @Rule
+  public TestName name = new TestName();
+  static final Logger LOG = LoggerFactory.getLogger(TestMobCompactionWithException.class.getName());
+  private final static HBaseTestingUtility HTU = new HBaseTestingUtility();
+  private static Configuration conf = null;
+
+  private HRegion region = null;
+  private TableDescriptor tableDescriptor;
+  private ColumnFamilyDescriptor columnFamilyDescriptor;
+  private FileSystem fs;
+
+  private static final byte[] COLUMN_FAMILY = fam1;
+  private final byte[] STARTROW = Bytes.toBytes(START_KEY);
+  private static volatile boolean testException = false;
+  private static int rowCount = 100;
+  private Table table;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    conf = HTU.getConfiguration();
+    conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
+    conf.set(MobStoreEngine.MOB_COMPACTOR_CLASS_KEY, MyMobStoreCompactor.class.getName());
+
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    region.close();
+    this.table.close();
+    fs.delete(HTU.getDataTestDir(), true);
+  }
+
+  private void createTable(long mobThreshold) throws IOException {
+
+    this.columnFamilyDescriptor =
+      ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_FAMILY).setMobEnabled(true)
+        .setMobThreshold(mobThreshold).setMaxVersions(1).setBlocksize(500).build();
+    this.tableDescriptor =
+      TableDescriptorBuilder.newBuilder(TableName.valueOf(TestMobUtils.getTableName(name)))
+        .setColumnFamily(columnFamilyDescriptor).build();
+    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build();
+    region = HBaseTestingUtility.createRegionAndWAL(regionInfo, HTU.getDataTestDir(), conf,
+      tableDescriptor, new MobFileCache(conf));
+    this.table = new RegionAsTable(region);
+    fs = FileSystem.get(conf);
+  }
+
+  /**
+   * This test is for HBASE-27433.
+   */
+  @Test
+  public void testMobStoreFileDeletedWhenCompactException() throws Exception {
+    this.createTable(200);
+    byte[] dummyData = makeDummyData(1000); // larger than mob threshold
+    for (int i = 0; i < rowCount; i++) {
+      Put p = createPut(i, dummyData);
+      table.put(p);
+      region.flush(true);
+    }
+
+    int storeFileCountBeforeCompact = countStoreFiles();
+    int mobFileCountBeforeCompact = countMobFiles();
+    long mobFileByteSize = getMobFileByteSize();
+
+    List<HStore> stores = region.getStores();
+    assertTrue(stores.size() == 1);
+    HMobStore mobStore = (HMobStore) stores.get(0);
+    Compactor<?> compactor = mobStore.getStoreEngine().getCompactor();
+    MyMobStoreCompactor myMobStoreCompactor = (MyMobStoreCompactor) compactor;
+    myMobStoreCompactor.setMobFileMaxByteSize(mobFileByteSize + 100);
+    testException = true;
+    try {
+      try {
+
+        // Force major compaction
+        mobStore.triggerMajorCompaction();
+        Optional<CompactionContext> context = mobStore.requestCompaction(HStore.PRIORITY_USER,
+          CompactionLifeCycleTracker.DUMMY, User.getCurrent());
+        assertTrue(context.isPresent());
+        region.compact(context.get(), mobStore, NoLimitThroughputController.INSTANCE,
+          User.getCurrent());
+
+        fail();
+      } catch (IOException e) {
+        assertTrue(e != null);
+      }
+    } finally {
+      testException = false;
+    }
+
+    // When compaction is failed,the count of StoreFile and MobStoreFile should be the same as
+    // before compaction.
+    assertEquals("After compaction: store files", storeFileCountBeforeCompact, countStoreFiles());
+    assertEquals("After compaction: mob file count", mobFileCountBeforeCompact, countMobFiles());
+  }
+
+  private int countStoreFiles() throws IOException {
+    HStore store = region.getStore(COLUMN_FAMILY);
+    return store.getStorefilesCount();
+  }
+
+  private int countMobFiles() throws IOException {
+    Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableDescriptor.getTableName(),
+      columnFamilyDescriptor.getNameAsString());
+    if (fs.exists(mobDirPath)) {
+      FileStatus[] files = HTU.getTestFileSystem().listStatus(mobDirPath);
+      return files.length;
+    }
+    return 0;
+  }
+
+  private long getMobFileByteSize() throws IOException {
+    Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableDescriptor.getTableName(),
+      columnFamilyDescriptor.getNameAsString());
+    if (fs.exists(mobDirPath)) {
+      FileStatus[] files = HTU.getTestFileSystem().listStatus(mobDirPath);
+      if (files.length > 0) {
+        return files[0].getLen();
+      }
+    }
+    return 0;
+  }
+
+  private Put createPut(int rowIdx, byte[] dummyData) throws IOException {
+    Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)));
+    p.setDurability(Durability.SKIP_WAL);
+    p.addColumn(COLUMN_FAMILY, Bytes.toBytes("colX"), dummyData);
+    return p;
+  }
+
+  private byte[] makeDummyData(int size) {
+    byte[] dummyData = new byte[size];
+    Bytes.random(dummyData);
+    return dummyData;
+  }
+
+  public static class MyMobStoreCompactor extends DefaultMobStoreCompactor {
+    public MyMobStoreCompactor(Configuration conf, HStore store) {
+      super(conf, store);
+
+    }
+
+    public void setMobFileMaxByteSize(long maxByteSize) {
+      this.conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, maxByteSize);
+    }
+
+    @Override
+    protected boolean performCompaction(FileDetails fd, final InternalScanner scanner,
+      CellSink writer, long smallestReadPoint, boolean cleanSeqId,
+      ThroughputController throughputController, CompactionRequestImpl request,
+      CompactionProgress progress) throws IOException {
+
+      InternalScanner wrappedScanner = new InternalScanner() {
+
+        private int count = -1;
+
+        @Override
+        public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
+          count++;
+          if (count == rowCount - 1 && testException) {
+            count = 0;
+            throw new IOException("Inject Error");
+          }
+          return scanner.next(result, scannerContext);
+        }
+
+        @Override
+        public void close() throws IOException {
+          scanner.close();
+        }
+      };
+      return super.performCompaction(fd, wrappedScanner, writer, smallestReadPoint, cleanSeqId,
+        throughputController, request, progress);
+    }
+  }
+}