You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2016/10/08 00:54:37 UTC

hbase git commit: 1). Fix resource leak issue upon exception during mob compaction. 2). Reorg the code in compactMobFilesInBatch() to make it more readable.

Repository: hbase
Updated Branches:
  refs/heads/master 723d56153 -> c7cae6be3


1). Fix resource leak issue upon exception during mob compaction. 2). Reorg the code in compactMobFilesInBatch() to make it more readable.

Signed-off-by: Jonathan M Hsieh <jm...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c7cae6be
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c7cae6be
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c7cae6be

Branch: refs/heads/master
Commit: c7cae6be3dccfaa63033b705ea9845f3f088aab6
Parents: 723d561
Author: Huaxiang Sun <hs...@cloudera.com>
Authored: Fri Oct 7 15:47:06 2016 -0700
Committer: Jonathan M Hsieh <jm...@apache.org>
Committed: Fri Oct 7 17:49:27 2016 -0700

----------------------------------------------------------------------
 .../compactions/PartitionedMobCompactor.java    | 157 +++++++++++--------
 .../TestPartitionedMobCompactor.java            |  90 ++++++++++-
 2 files changed, 178 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c7cae6be/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
index 29b7e8a..33aecc0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
@@ -229,8 +229,8 @@ public class PartitionedMobCompactor extends MobCompactor {
     }
     // archive the del files if all the mob files are selected.
     if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) {
-      LOG.info("After a mob compaction with all files selected, archiving the del files "
-        + newDelPaths);
+      LOG.info(
+          "After a mob compaction with all files selected, archiving the del files " + newDelPaths);
       try {
         MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), newDelFiles);
       } catch (IOException e) {
@@ -381,7 +381,7 @@ public class PartitionedMobCompactor extends MobCompactor {
                                       List<StoreFile> filesToCompact, int batch,
                                       Path bulkloadPathOfPartition, Path bulkloadColumnPath,
                                       List<Path> newFiles)
-    throws IOException {
+      throws IOException {
     // open scanner to the selected mob files and del files.
     StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES);
     // the mob files to be compacted, not include the del files.
@@ -392,62 +392,92 @@ public class PartitionedMobCompactor extends MobCompactor {
     StoreFileWriter writer = null;
     StoreFileWriter refFileWriter = null;
     Path filePath = null;
-    Path refFilePath = null;
     long mobCells = 0;
+    boolean cleanupTmpMobFile = false;
+    boolean cleanupBulkloadDirOfPartition = false;
+    boolean cleanupCommittedMobFile = false;
+    boolean closeReaders= true;
+
     try {
-      writer = MobUtils.createWriter(conf, fs, column, partition.getPartitionId().getDate(),
-        tempPath, Long.MAX_VALUE, column.getCompactionCompressionType(), partition.getPartitionId()
-          .getStartKey(), compactionCacheConfig, cryptoContext);
-      filePath = writer.getPath();
-      byte[] fileName = Bytes.toBytes(filePath.getName());
-      // create a temp file and open a writer for it in the bulkloadPath
-      refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath, fileInfo
-        .getSecond().longValue(), compactionCacheConfig, cryptoContext);
-      refFilePath = refFileWriter.getPath();
-      List<Cell> cells = new ArrayList<>();
-      boolean hasMore;
-      ScannerContext scannerContext =
-              ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
-      do {
-        hasMore = scanner.next(cells, scannerContext);
-        for (Cell cell : cells) {
-          // write the mob cell to the mob file.
-          writer.append(cell);
-          // write the new reference cell to the store file.
-          KeyValue reference = MobUtils.createMobRefKeyValue(cell, fileName, tableNameTag);
-          refFileWriter.append(reference);
-          mobCells++;
+      try {
+        writer = MobUtils
+            .createWriter(conf, fs, column, partition.getPartitionId().getDate(), tempPath,
+                Long.MAX_VALUE, column.getCompactionCompressionType(),
+                partition.getPartitionId().getStartKey(), compactionCacheConfig, cryptoContext);
+        cleanupTmpMobFile = true;
+        filePath = writer.getPath();
+        byte[] fileName = Bytes.toBytes(filePath.getName());
+        // create a temp file and open a writer for it in the bulkloadPath
+        refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath,
+            fileInfo.getSecond().longValue(), compactionCacheConfig, cryptoContext);
+        cleanupBulkloadDirOfPartition = true;
+        List<Cell> cells = new ArrayList<>();
+        boolean hasMore;
+        ScannerContext scannerContext =
+            ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
+        do {
+          hasMore = scanner.next(cells, scannerContext);
+          for (Cell cell : cells) {
+            // write the mob cell to the mob file.
+            writer.append(cell);
+            // write the new reference cell to the store file.
+            KeyValue reference = MobUtils.createMobRefKeyValue(cell, fileName, tableNameTag);
+            refFileWriter.append(reference);
+            mobCells++;
+          }
+          cells.clear();
+        } while (hasMore);
+      } finally {
+        // close the scanner.
+        scanner.close();
+
+        if (cleanupTmpMobFile) {
+          // append metadata to the mob file, and close the mob file writer.
+          closeMobFileWriter(writer, fileInfo.getFirst(), mobCells);
         }
-        cells.clear();
-      } while (hasMore);
+
+        if (cleanupBulkloadDirOfPartition) {
+          // append metadata and bulkload info to the ref mob file, and close the writer.
+          closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime);
+        }
+      }
+
+      if (mobCells > 0) {
+        // commit mob file
+        MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
+        cleanupTmpMobFile = false;
+        cleanupCommittedMobFile = true;
+        // bulkload the ref file
+        bulkloadRefFile(connection, table, bulkloadPathOfPartition, filePath.getName());
+        cleanupCommittedMobFile = false;
+        newFiles.add(new Path(mobFamilyDir, filePath.getName()));
+      }
+
+      // archive the old mob files, do not archive the del files.
+      try {
+        closeStoreFileReaders(mobFilesToCompact);
+        closeReaders = false;
+        MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact);
+      } catch (IOException e) {
+        LOG.error("Failed to archive the files " + mobFilesToCompact, e);
+      }
     } finally {
-      // close the scanner.
-      scanner.close();
-      // append metadata to the mob file, and close the mob file writer.
-      closeMobFileWriter(writer, fileInfo.getFirst(), mobCells);
-      // append metadata and bulkload info to the ref mob file, and close the writer.
-      closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime);
-    }
-    if (mobCells > 0) {
-      // commit mob file
-      MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
-      // bulkload the ref file
-      bulkloadRefFile(connection, table, bulkloadPathOfPartition, filePath.getName());
-      newFiles.add(new Path(mobFamilyDir, filePath.getName()));
-    } else {
-      // remove the new files
-      // the mob file is empty, delete it instead of committing.
-      deletePath(filePath);
-      // the ref file is empty, delete it instead of committing.
-      deletePath(refFilePath);
-    }
-    // archive the old mob files, do not archive the del files.
-    try {
-      closeStoreFileReaders(mobFilesToCompact);
-      MobUtils
-        .removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact);
-    } catch (IOException e) {
-      LOG.error("Failed to archive the files " + mobFilesToCompact, e);
+      if (closeReaders) {
+        closeStoreFileReaders(mobFilesToCompact);
+      }
+
+      if (cleanupTmpMobFile) {
+        deletePath(filePath);
+      }
+
+      if (cleanupBulkloadDirOfPartition) {
+        // delete the bulkload files in bulkloadPath
+        deletePath(bulkloadPathOfPartition);
+      }
+
+      if (cleanupCommittedMobFile) {
+        deletePath(new Path(mobFamilyDir, filePath.getName()));
+      }
     }
   }
 
@@ -509,7 +539,7 @@ public class PartitionedMobCompactor extends MobCompactor {
       writer = MobUtils.createDelFileWriter(conf, fs, column,
         MobUtils.formatDate(new Date(request.selectionTime)), tempPath, Long.MAX_VALUE,
         column.getCompactionCompressionType(), HConstants.EMPTY_START_ROW, compactionCacheConfig,
-        cryptoContext);
+          cryptoContext);
       filePath = writer.getPath();
       List<Cell> cells = new ArrayList<>();
       boolean hasMore;
@@ -572,22 +602,15 @@ public class PartitionedMobCompactor extends MobCompactor {
    * @throws IOException if IO failure is encountered
    */
   private void bulkloadRefFile(Connection connection, Table table, Path bulkloadDirectory,
-                               String fileName)
-    throws IOException {
+      String fileName)
+      throws IOException {
     // bulkload the ref file
     try {
       LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
-      bulkload.doBulkLoad(bulkloadDirectory,
-        connection.getAdmin(),
-        table,
-        connection.getRegionLocator(table.getName()));
+      bulkload.doBulkLoad(bulkloadDirectory, connection.getAdmin(), table,
+          connection.getRegionLocator(table.getName()));
     } catch (Exception e) {
-      // delete the committed mob file
-      deletePath(new Path(mobFamilyDir, fileName));
       throw new IOException(e);
-    } finally {
-      // delete the bulkload files in bulkloadPath
-      deletePath(bulkloadDirectory);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c7cae6be/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
index 7970d62..7da8544 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
@@ -53,8 +53,10 @@ import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.C
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.junit.AfterClass;
 import org.junit.Assert;
+import static org.junit.Assert.assertTrue;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -79,6 +81,9 @@ public class TestPartitionedMobCompactor {
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+    // Inject our customized DistributedFileSystem
+    TEST_UTIL.getConfiguration().setClass("fs.hdfs.impl", FaultyDistributedFileSystem.class,
+        DistributedFileSystem.class);
     TEST_UTIL.startMiniCluster(1);
     pool = createThreadPool();
   }
@@ -160,6 +165,51 @@ public class TestPartitionedMobCompactor {
     testCompactDelFilesAtBatchSize(tableName, 4, 2);
   }
 
+  @Test
+  public void testCompactFilesWithDstDirFull() throws Exception {
+    String tableName = "testCompactFilesWithDstDirFull";
+    fs = FileSystem.get(conf);
+    FaultyDistributedFileSystem faultyFs = (FaultyDistributedFileSystem)fs;
+    Path testDir = FSUtils.getRootDir(conf);
+    Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
+    basePath = new Path(new Path(mobTestDir, tableName), family);
+
+    try {
+      int count = 2;
+      // create 2 mob files.
+      createStoreFiles(basePath, family, qf, count, Type.Put, true);
+      listFiles();
+
+      TableName tName = TableName.valueOf(tableName);
+      MobCompactor compactor = new PartitionedMobCompactor(conf, faultyFs, tName, hcd, pool);
+      faultyFs.setThrowException(true);
+      try {
+        compactor.compact(allFiles, true);
+      } catch (IOException e) {
+        System.out.println("Expected exception, ignore");
+      }
+
+      // Verify that all the files in tmp directory are cleaned up
+      Path tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME);
+      FileStatus[] ls = faultyFs.listStatus(tempPath);
+
+      // Only .bulkload under this directory
+      assertTrue(ls.length == 1);
+      assertTrue(MobConstants.BULKLOAD_DIR_NAME.equalsIgnoreCase(ls[0].getPath().getName()));
+
+      Path bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path(
+          tName.getNamespaceAsString(), tName.getQualifierAsString())));
+
+      // Nothing in bulkLoad directory
+      FileStatus[] lsBulkload = faultyFs.listStatus(bulkloadPath);
+      assertTrue(lsBulkload.length == 0);
+
+    } finally {
+      faultyFs.setThrowException(false);
+    }
+  }
+
+
   private void testCompactDelFilesAtBatchSize(String tableName, int batchSize,
       int delfileMaxCount)  throws Exception {
     resetConf();
@@ -289,17 +339,30 @@ public class TestPartitionedMobCompactor {
    */
   private void createStoreFiles(Path basePath, String family, String qualifier, int count,
       Type type) throws IOException {
+    createStoreFiles(basePath, family, qualifier, count, type, false);
+  }
+
+  private void createStoreFiles(Path basePath, String family, String qualifier, int count,
+      Type type, boolean sameStartKey) throws IOException {
     HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
     String startKey = "row_";
     MobFileName mobFileName = null;
     for (int i = 0; i < count; i++) {
-      byte[] startRow = Bytes.toBytes(startKey + i) ;
+      byte[] startRow;
+      if (sameStartKey) {
+        // When creating multiple files under one partition, suffix needs to be different.
+        startRow = Bytes.toBytes(startKey);
+        mobSuffix = UUID.randomUUID().toString().replaceAll("-", "");
+        delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del";
+      } else {
+        startRow = Bytes.toBytes(startKey + i);
+      }
       if(type.equals(Type.Delete)) {
         mobFileName = MobFileName.create(startRow, MobUtils.formatDate(
             new Date()), delSuffix);
       }
       if(type.equals(Type.Put)){
-        mobFileName = MobFileName.create(Bytes.toBytes(startKey + i), MobUtils.formatDate(
+        mobFileName = MobFileName.create(startRow, MobUtils.formatDate(
             new Date()), mobSuffix);
       }
       StoreFileWriter mobFileWriter = new StoreFileWriter.Builder(conf, cacheConf, fs)
@@ -394,4 +457,27 @@ public class TestPartitionedMobCompactor {
     conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE,
       MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE);
   }
+
+  /**
+   * The customized Distributed File System Implementation
+   */
+  static class FaultyDistributedFileSystem extends DistributedFileSystem {
+    private volatile boolean throwException = false;
+
+    public FaultyDistributedFileSystem() {
+      super();
+    }
+
+    public void setThrowException(boolean throwException) {
+      this.throwException = throwException;
+    }
+
+    @Override
+    public boolean rename(Path src, Path dst) throws IOException {
+      if (throwException) {
+        throw new IOException("No more files allowed");
+      }
+      return super.rename(src, dst);
+    }
+  }
 }