You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/13 22:24:54 UTC
svn commit: r1183070 - in /hbase/branches/0.92: CHANGES.txt
src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
Author: nspiegelberg
Date: Thu Oct 13 20:24:53 2011
New Revision: 1183070
URL: http://svn.apache.org/viewvc?rev=1183070&view=rev
Log:
HBASE-4078 Validate store files after flush/compaction
Modified:
hbase/branches/0.92/CHANGES.txt
hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
Modified: hbase/branches/0.92/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/CHANGES.txt?rev=1183070&r1=1183069&r2=1183070&view=diff
==============================================================================
--- hbase/branches/0.92/CHANGES.txt (original)
+++ hbase/branches/0.92/CHANGES.txt Thu Oct 13 20:24:53 2011
@@ -335,6 +335,7 @@ Release 0.92.0 - Unreleased
output file (Mingjie Lai)
HBASE-4582 Store.java cleanup (failing TestHeapSize and has warnings)
HBASE-4556 Fix all incorrect uses of InternalScanner.next(...) (Lars H)
+ HBASE-4078 Validate store files after flush/compaction
TESTS
HBASE-4492 TestRollingRestart fails intermittently
Modified: hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1183070&r1=1183069&r2=1183070&view=diff
==============================================================================
--- hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Oct 13 20:24:53 2011
@@ -520,6 +520,7 @@ public class Store implements HeapSize {
// Write-out finished successfully, move into the right spot
Path dstPath = StoreFile.getUniqueFile(fs, homedir);
+ validateStoreFile(writer.getPath());
String msg = "Renaming flushed file at " + writer.getPath() + " to " + dstPath;
LOG.info(msg);
status.setStatus("Flushing " + this + ": " + msg);
@@ -1091,7 +1092,7 @@ public class Store implements HeapSize {
* nothing made it through the compaction.
* @throws IOException
*/
- private StoreFile.Writer compactStore(final Collection<StoreFile> filesToCompact,
+ StoreFile.Writer compactStore(final Collection<StoreFile> filesToCompact,
final boolean majorCompaction, final long maxId)
throws IOException {
// calculate maximum key count after compaction (for blooms)
@@ -1193,6 +1194,30 @@ public class Store implements HeapSize {
return writer;
}
+ /**
+ * Validates a store file by opening and closing it. In HFileV2 this should
+ * not be an expensive operation.
+ *
+ * @param path the path to the store file
+ */
+ private void validateStoreFile(Path path)
+ throws IOException {
+ StoreFile storeFile = null;
+ try {
+ storeFile = new StoreFile(this.fs, path, this.conf,
+ this.cacheConf, this.family.getBloomFilterType());
+ storeFile.createReader();
+ } catch (IOException e) {
+ LOG.error("Failed to open store file : " + path
+ + ", keeping it in tmp location", e);
+ throw e;
+ } finally {
+ if (storeFile != null) {
+ storeFile.closeReader();
+ }
+ }
+ }
+
/*
* <p>It works by processing a compaction that's been written to disk.
*
@@ -1212,13 +1237,14 @@ public class Store implements HeapSize {
* @return StoreFile created. May be null.
* @throws IOException
*/
- private StoreFile completeCompaction(final Collection<StoreFile> compactedFiles,
+ StoreFile completeCompaction(final Collection<StoreFile> compactedFiles,
final StoreFile.Writer compactedFile)
throws IOException {
// 1. Moving the new files into place -- if there is a new file (may not
// be if all cells were expired or deleted).
StoreFile result = null;
if (compactedFile != null) {
+ validateStoreFile(compactedFile.getPath());
Path p = null;
try {
p = StoreFile.rename(this.fs, compactedFile.getPath(),
Modified: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1183070&r1=1183069&r2=1183070&view=diff
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Thu Oct 13 20:24:53 2011
@@ -24,14 +24,19 @@ import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import static org.junit.Assert.fail;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HConstants;
@@ -44,6 +49,7 @@ import org.apache.hadoop.hbase.client.Re
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -478,4 +484,41 @@ public class TestCompaction extends HBas
"bbb").getBytes(), null);
loader.flushcache();
}
+
+ public void testCompactionWithCorruptResult() throws Exception {
+ int nfiles = 10;
+ for (int i = 0; i < nfiles; i++) {
+ createStoreFile(r);
+ }
+ Store store = r.getStore(COLUMN_FAMILY);
+
+ List<StoreFile> storeFiles = store.getStorefiles();
+ long maxId = StoreFile.getMaxSequenceIdInList(storeFiles);
+
+ StoreFile.Writer compactedFile = store.compactStore(storeFiles, false, maxId);
+
+ // Now lets corrupt the compacted file.
+ FileSystem fs = cluster.getFileSystem();
+ Path origPath = compactedFile.getPath();
+ Path homedir = store.getHomedir();
+ Path dstPath = new Path(homedir, origPath.getName());
+ FSDataOutputStream stream = fs.create(origPath, null, true, 512, (short) 3,
+ (long) 1024,
+ null);
+ stream.writeChars("CORRUPT FILE!!!!");
+ stream.close();
+
+ try {
+ store.completeCompaction(storeFiles, compactedFile);
+ } catch (Exception e) {
+ // The complete compaction should fail and the corrupt file should remain
+ // in the 'tmp' directory;
+ assert (fs.exists(origPath));
+ assert (!fs.exists(dstPath));
+ System.out.println("testCompactionWithCorruptResult Passed");
+ return;
+ }
+ fail("testCompactionWithCorruptResult failed since no exception was" +
+ "thrown while completing a corrupt file");
+ }
}