You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2016/10/08 00:54:37 UTC
hbase git commit: 1). Fix resource leak issue upon exception during
mob compaction. 2). Reorg the code in compactMobFilesInBatch() to make it
more readable.
Repository: hbase
Updated Branches:
refs/heads/master 723d56153 -> c7cae6be3
1). Fix resource leak issue upon exception during mob compaction. 2). Reorg the code in compactMobFilesInBatch() to make it more readable.
Signed-off-by: Jonathan M Hsieh <jm...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c7cae6be
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c7cae6be
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c7cae6be
Branch: refs/heads/master
Commit: c7cae6be3dccfaa63033b705ea9845f3f088aab6
Parents: 723d561
Author: Huaxiang Sun <hs...@cloudera.com>
Authored: Fri Oct 7 15:47:06 2016 -0700
Committer: Jonathan M Hsieh <jm...@apache.org>
Committed: Fri Oct 7 17:49:27 2016 -0700
----------------------------------------------------------------------
.../compactions/PartitionedMobCompactor.java | 157 +++++++++++--------
.../TestPartitionedMobCompactor.java | 90 ++++++++++-
2 files changed, 178 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/c7cae6be/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
index 29b7e8a..33aecc0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
@@ -229,8 +229,8 @@ public class PartitionedMobCompactor extends MobCompactor {
}
// archive the del files if all the mob files are selected.
if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) {
- LOG.info("After a mob compaction with all files selected, archiving the del files "
- + newDelPaths);
+ LOG.info(
+ "After a mob compaction with all files selected, archiving the del files " + newDelPaths);
try {
MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), newDelFiles);
} catch (IOException e) {
@@ -381,7 +381,7 @@ public class PartitionedMobCompactor extends MobCompactor {
List<StoreFile> filesToCompact, int batch,
Path bulkloadPathOfPartition, Path bulkloadColumnPath,
List<Path> newFiles)
- throws IOException {
+ throws IOException {
// open scanner to the selected mob files and del files.
StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES);
// the mob files to be compacted, not include the del files.
@@ -392,62 +392,92 @@ public class PartitionedMobCompactor extends MobCompactor {
StoreFileWriter writer = null;
StoreFileWriter refFileWriter = null;
Path filePath = null;
- Path refFilePath = null;
long mobCells = 0;
+ boolean cleanupTmpMobFile = false;
+ boolean cleanupBulkloadDirOfPartition = false;
+ boolean cleanupCommittedMobFile = false;
+ boolean closeReaders= true;
+
try {
- writer = MobUtils.createWriter(conf, fs, column, partition.getPartitionId().getDate(),
- tempPath, Long.MAX_VALUE, column.getCompactionCompressionType(), partition.getPartitionId()
- .getStartKey(), compactionCacheConfig, cryptoContext);
- filePath = writer.getPath();
- byte[] fileName = Bytes.toBytes(filePath.getName());
- // create a temp file and open a writer for it in the bulkloadPath
- refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath, fileInfo
- .getSecond().longValue(), compactionCacheConfig, cryptoContext);
- refFilePath = refFileWriter.getPath();
- List<Cell> cells = new ArrayList<>();
- boolean hasMore;
- ScannerContext scannerContext =
- ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
- do {
- hasMore = scanner.next(cells, scannerContext);
- for (Cell cell : cells) {
- // write the mob cell to the mob file.
- writer.append(cell);
- // write the new reference cell to the store file.
- KeyValue reference = MobUtils.createMobRefKeyValue(cell, fileName, tableNameTag);
- refFileWriter.append(reference);
- mobCells++;
+ try {
+ writer = MobUtils
+ .createWriter(conf, fs, column, partition.getPartitionId().getDate(), tempPath,
+ Long.MAX_VALUE, column.getCompactionCompressionType(),
+ partition.getPartitionId().getStartKey(), compactionCacheConfig, cryptoContext);
+ cleanupTmpMobFile = true;
+ filePath = writer.getPath();
+ byte[] fileName = Bytes.toBytes(filePath.getName());
+ // create a temp file and open a writer for it in the bulkloadPath
+ refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath,
+ fileInfo.getSecond().longValue(), compactionCacheConfig, cryptoContext);
+ cleanupBulkloadDirOfPartition = true;
+ List<Cell> cells = new ArrayList<>();
+ boolean hasMore;
+ ScannerContext scannerContext =
+ ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
+ do {
+ hasMore = scanner.next(cells, scannerContext);
+ for (Cell cell : cells) {
+ // write the mob cell to the mob file.
+ writer.append(cell);
+ // write the new reference cell to the store file.
+ KeyValue reference = MobUtils.createMobRefKeyValue(cell, fileName, tableNameTag);
+ refFileWriter.append(reference);
+ mobCells++;
+ }
+ cells.clear();
+ } while (hasMore);
+ } finally {
+ // close the scanner.
+ scanner.close();
+
+ if (cleanupTmpMobFile) {
+ // append metadata to the mob file, and close the mob file writer.
+ closeMobFileWriter(writer, fileInfo.getFirst(), mobCells);
}
- cells.clear();
- } while (hasMore);
+
+ if (cleanupBulkloadDirOfPartition) {
+ // append metadata and bulkload info to the ref mob file, and close the writer.
+ closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime);
+ }
+ }
+
+ if (mobCells > 0) {
+ // commit mob file
+ MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
+ cleanupTmpMobFile = false;
+ cleanupCommittedMobFile = true;
+ // bulkload the ref file
+ bulkloadRefFile(connection, table, bulkloadPathOfPartition, filePath.getName());
+ cleanupCommittedMobFile = false;
+ newFiles.add(new Path(mobFamilyDir, filePath.getName()));
+ }
+
+ // archive the old mob files, do not archive the del files.
+ try {
+ closeStoreFileReaders(mobFilesToCompact);
+ closeReaders = false;
+ MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact);
+ } catch (IOException e) {
+ LOG.error("Failed to archive the files " + mobFilesToCompact, e);
+ }
} finally {
- // close the scanner.
- scanner.close();
- // append metadata to the mob file, and close the mob file writer.
- closeMobFileWriter(writer, fileInfo.getFirst(), mobCells);
- // append metadata and bulkload info to the ref mob file, and close the writer.
- closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime);
- }
- if (mobCells > 0) {
- // commit mob file
- MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
- // bulkload the ref file
- bulkloadRefFile(connection, table, bulkloadPathOfPartition, filePath.getName());
- newFiles.add(new Path(mobFamilyDir, filePath.getName()));
- } else {
- // remove the new files
- // the mob file is empty, delete it instead of committing.
- deletePath(filePath);
- // the ref file is empty, delete it instead of committing.
- deletePath(refFilePath);
- }
- // archive the old mob files, do not archive the del files.
- try {
- closeStoreFileReaders(mobFilesToCompact);
- MobUtils
- .removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact);
- } catch (IOException e) {
- LOG.error("Failed to archive the files " + mobFilesToCompact, e);
+ if (closeReaders) {
+ closeStoreFileReaders(mobFilesToCompact);
+ }
+
+ if (cleanupTmpMobFile) {
+ deletePath(filePath);
+ }
+
+ if (cleanupBulkloadDirOfPartition) {
+ // delete the bulkload files in bulkloadPath
+ deletePath(bulkloadPathOfPartition);
+ }
+
+ if (cleanupCommittedMobFile) {
+ deletePath(new Path(mobFamilyDir, filePath.getName()));
+ }
}
}
@@ -509,7 +539,7 @@ public class PartitionedMobCompactor extends MobCompactor {
writer = MobUtils.createDelFileWriter(conf, fs, column,
MobUtils.formatDate(new Date(request.selectionTime)), tempPath, Long.MAX_VALUE,
column.getCompactionCompressionType(), HConstants.EMPTY_START_ROW, compactionCacheConfig,
- cryptoContext);
+ cryptoContext);
filePath = writer.getPath();
List<Cell> cells = new ArrayList<>();
boolean hasMore;
@@ -572,22 +602,15 @@ public class PartitionedMobCompactor extends MobCompactor {
* @throws IOException if IO failure is encountered
*/
private void bulkloadRefFile(Connection connection, Table table, Path bulkloadDirectory,
- String fileName)
- throws IOException {
+ String fileName)
+ throws IOException {
// bulkload the ref file
try {
LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
- bulkload.doBulkLoad(bulkloadDirectory,
- connection.getAdmin(),
- table,
- connection.getRegionLocator(table.getName()));
+ bulkload.doBulkLoad(bulkloadDirectory, connection.getAdmin(), table,
+ connection.getRegionLocator(table.getName()));
} catch (Exception e) {
- // delete the committed mob file
- deletePath(new Path(mobFamilyDir, fileName));
throw new IOException(e);
- } finally {
- // delete the bulkload files in bulkloadPath
- deletePath(bulkloadDirectory);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c7cae6be/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
index 7970d62..7da8544 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
@@ -53,8 +53,10 @@ import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.C
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.junit.AfterClass;
import org.junit.Assert;
+import static org.junit.Assert.assertTrue;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -79,6 +81,9 @@ public class TestPartitionedMobCompactor {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
+ // Inject our customized DistributedFileSystem
+ TEST_UTIL.getConfiguration().setClass("fs.hdfs.impl", FaultyDistributedFileSystem.class,
+ DistributedFileSystem.class);
TEST_UTIL.startMiniCluster(1);
pool = createThreadPool();
}
@@ -160,6 +165,51 @@ public class TestPartitionedMobCompactor {
testCompactDelFilesAtBatchSize(tableName, 4, 2);
}
+ @Test
+ public void testCompactFilesWithDstDirFull() throws Exception {
+ String tableName = "testCompactFilesWithDstDirFull";
+ fs = FileSystem.get(conf);
+ FaultyDistributedFileSystem faultyFs = (FaultyDistributedFileSystem)fs;
+ Path testDir = FSUtils.getRootDir(conf);
+ Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
+ basePath = new Path(new Path(mobTestDir, tableName), family);
+
+ try {
+ int count = 2;
+ // create 2 mob files.
+ createStoreFiles(basePath, family, qf, count, Type.Put, true);
+ listFiles();
+
+ TableName tName = TableName.valueOf(tableName);
+ MobCompactor compactor = new PartitionedMobCompactor(conf, faultyFs, tName, hcd, pool);
+ faultyFs.setThrowException(true);
+ try {
+ compactor.compact(allFiles, true);
+ } catch (IOException e) {
+ System.out.println("Expected exception, ignore");
+ }
+
+ // Verify that all the files in tmp directory are cleaned up
+ Path tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME);
+ FileStatus[] ls = faultyFs.listStatus(tempPath);
+
+ // Only .bulkload under this directory
+ assertTrue(ls.length == 1);
+ assertTrue(MobConstants.BULKLOAD_DIR_NAME.equalsIgnoreCase(ls[0].getPath().getName()));
+
+ Path bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path(
+ tName.getNamespaceAsString(), tName.getQualifierAsString())));
+
+ // Nothing in bulkLoad directory
+ FileStatus[] lsBulkload = faultyFs.listStatus(bulkloadPath);
+ assertTrue(lsBulkload.length == 0);
+
+ } finally {
+ faultyFs.setThrowException(false);
+ }
+ }
+
+
private void testCompactDelFilesAtBatchSize(String tableName, int batchSize,
int delfileMaxCount) throws Exception {
resetConf();
@@ -289,17 +339,30 @@ public class TestPartitionedMobCompactor {
*/
private void createStoreFiles(Path basePath, String family, String qualifier, int count,
Type type) throws IOException {
+ createStoreFiles(basePath, family, qualifier, count, type, false);
+ }
+
+ private void createStoreFiles(Path basePath, String family, String qualifier, int count,
+ Type type, boolean sameStartKey) throws IOException {
HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
String startKey = "row_";
MobFileName mobFileName = null;
for (int i = 0; i < count; i++) {
- byte[] startRow = Bytes.toBytes(startKey + i) ;
+ byte[] startRow;
+ if (sameStartKey) {
+ // When creating multiple files under one partition, suffix needs to be different.
+ startRow = Bytes.toBytes(startKey);
+ mobSuffix = UUID.randomUUID().toString().replaceAll("-", "");
+ delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del";
+ } else {
+ startRow = Bytes.toBytes(startKey + i);
+ }
if(type.equals(Type.Delete)) {
mobFileName = MobFileName.create(startRow, MobUtils.formatDate(
new Date()), delSuffix);
}
if(type.equals(Type.Put)){
- mobFileName = MobFileName.create(Bytes.toBytes(startKey + i), MobUtils.formatDate(
+ mobFileName = MobFileName.create(startRow, MobUtils.formatDate(
new Date()), mobSuffix);
}
StoreFileWriter mobFileWriter = new StoreFileWriter.Builder(conf, cacheConf, fs)
@@ -394,4 +457,27 @@ public class TestPartitionedMobCompactor {
conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE,
MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE);
}
+
+ /**
+ * The customized Distributed File System Implementation
+ */
+ static class FaultyDistributedFileSystem extends DistributedFileSystem {
+ private volatile boolean throwException = false;
+
+ public FaultyDistributedFileSystem() {
+ super();
+ }
+
+ public void setThrowException(boolean throwException) {
+ this.throwException = throwException;
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ if (throwException) {
+ throw new IOException("No more files allowed");
+ }
+ return super.rename(src, dst);
+ }
+ }
}