You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2013/11/14 20:18:24 UTC
svn commit: r1542033 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hbase/util/
test/java/org/apache/hadoop/hbase/regionserver/
Author: liyin
Date: Thu Nov 14 19:18:23 2013
New Revision: 1542033
URL: http://svn.apache.org/r1542033
Log:
[0.89-fb] [HBASE-9967] delete store files when a flush aborts
Author: pervyshev
Test Plan: unit test
Reviewers: aaiyer, liyintang, manukranthk, rshroff
Reviewed By: aaiyer
CC: hbase-eng@
Differential Revision: https://phabricator.fb.com/D1049445
Task ID: 3111537
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1542033&r1=1542032&r2=1542033&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Nov 14 19:18:23 2013
@@ -177,10 +177,10 @@ public class HRegion implements HeapSize
// private byte [] name = null;
protected final AtomicLong memstoreSize = new AtomicLong(0);
-
+
// The number of rows are read
protected final AtomicInteger rowReadCnt = new AtomicInteger(0);
-
+
// The number of rows are updated
protected final AtomicInteger rowUpdateCnt = new AtomicInteger(0);
@@ -1576,6 +1576,9 @@ public class HRegion implements HeapSize
status.abort("Flush failed: " + StringUtils.stringifyException(ioe));
// The caller can recover from this IOException. No harm done if
// memstore flush fails.
+ for (StoreFlusher flusher : storeFlushers) {
+ flusher.cancel();
+ }
throw ioe;
}
@@ -1917,7 +1920,7 @@ public class HRegion implements HeapSize
public void delete(Map<byte[], List<KeyValue>> familyMap, boolean writeToWAL)
throws IOException {
long now = EnvironmentEdgeManager.currentTimeMillis();
-
+
byte [] byteNow = Bytes.toBytes(now);
boolean flush = false;
@@ -2484,9 +2487,9 @@ public class HRegion implements HeapSize
long seqNum) {
// Increment the rowUpdatedCnt
this.rowUpdateCnt.incrementAndGet();
-
+
long start = EnvironmentEdgeManager.currentTimeMillis();
-
+
MultiVersionConsistencyControl.WriteEntry w = null;
long size = 0;
try {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1542033&r1=1542032&r2=1542033&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Nov 14 19:18:23 2013
@@ -722,12 +722,13 @@ public class Store extends SchemaConfigu
private StoreFile flushCache(final long logCacheFlushId,
SortedSet<KeyValue> snapshot,
TimeRangeTracker snapshotTimeRangeTracker,
- MonitoredTask status) throws IOException {
+ MonitoredTask status,
+ CacheFlushInfo info) throws IOException {
// If an exception happens flushing, we let it out without clearing
// the memstore snapshot. The old snapshot will be returned when we say
// 'snapshot', the next time flush comes around.
return internalFlushCache(snapshot, logCacheFlushId,
- snapshotTimeRangeTracker, status);
+ snapshotTimeRangeTracker, status, info);
}
/*
@@ -741,7 +742,8 @@ public class Store extends SchemaConfigu
private StoreFile internalFlushCache(final SortedSet<KeyValue> snapshot,
final long logCacheFlushId,
TimeRangeTracker snapshotTimeRangeTracker,
- MonitoredTask status) throws IOException {
+ MonitoredTask status,
+ CacheFlushInfo info) throws IOException {
StoreFile.Writer writer;
// Find the smallest read point across all the Scanners.
long smallestReadPoint = region.getSmallestReadPoint();
@@ -770,6 +772,7 @@ public class Store extends SchemaConfigu
status.setStatus("Flushing " + this + ": creating writer");
// A. Write the map out to the disk
writer = createWriterInTmp(snapshot.size(), this.compression, false);
+ info.tmpPath = writer.getPath();
writer.setTimeRangeTracker(snapshotTimeRangeTracker);
fileName = writer.getPath().getName();
try {
@@ -801,6 +804,7 @@ public class Store extends SchemaConfigu
writer.appendMetadata(EnvironmentEdgeManager.currentTimeMillis(), logCacheFlushId, false);
status.setStatus("Flushing " + this + ": closing flushed file");
writer.close();
+ InjectionHandler.processEventIO(InjectionEvent.STOREFILE_AFTER_WRITE_CLOSE, writer.getPath());
}
}
} finally {
@@ -814,28 +818,58 @@ public class Store extends SchemaConfigu
// Write-out finished successfully, move into the right spot
LOG.info("Renaming flushed file at " + writer.getPath() + " to " + dstPath);
fs.rename(writer.getPath(), dstPath);
+ InjectionHandler.processEventIO(InjectionEvent.STOREFILE_AFTER_RENAME, writer.getPath(), dstPath);
+ info.dstPath = dstPath;
StoreFile sf = new StoreFile(this.fs, dstPath, this.conf, this.cacheConf,
this.family.getBloomFilterType(), this.dataBlockEncoder);
passSchemaMetricsTo(sf);
StoreFile.Reader r = sf.createReader();
- this.storeSize += r.length();
+ info.sequenceId = logCacheFlushId;
+ info.entries = r.getEntries();
+ info.memSize = flushed;
+ info.fileSize = r.length();
+ return sf;
+ }
+
+ private boolean commit(StoreFile storeFile, SortedSet<KeyValue> snapshot, CacheFlushInfo info)
+ throws IOException {
+ this.storeSize += info.fileSize;
// This increments the metrics associated with total flushed bytes for this
// family. The overall flush count is stored in the static metrics and
// retrieved from HRegion.recentFlushes, which is set within
// HRegion.internalFlushcache, which indirectly calls this to actually do
// the flushing through the StoreFlusherImpl class
getSchemaMetrics().updatePersistentStoreMetric(
- SchemaMetrics.StoreMetricType.FLUSH_SIZE, flushed);
+ SchemaMetrics.StoreMetricType.FLUSH_SIZE, info.memSize);
if (LOG.isInfoEnabled()) {
- LOG.info("Added " + sf + ", entries=" + r.getEntries() +
- ", sequenceid=" + logCacheFlushId +
- ", memsize=" + StringUtils.humanReadableInt(flushed) +
- ", filesize=" + StringUtils.humanReadableInt(r.length()) +
+ LOG.info("Added " + storeFile + ", entries=" + info.entries +
+ ", sequenceid=" + info.sequenceId +
+ ", memsize=" + StringUtils.humanReadableInt(info.memSize) +
+ ", filesize=" + StringUtils.humanReadableInt(info.fileSize) +
" to " + this.region.regionInfo.getRegionNameAsString());
}
- return sf;
+ // Add new file to store files. Clear snapshot too while we have
+ // the Store write lock.
+ return updateStorefiles(storeFile, snapshot);
+ }
+
+ private void cancel(CacheFlushInfo info) {
+ if (info.tmpPath != null) {
+ try {
+ fs.delete(info.tmpPath, false);
+ } catch (IOException e) {
+ // that's ok
+ }
+ }
+ if (info.dstPath != null) {
+ try {
+ fs.delete(info.dstPath, false);
+ } catch (IOException e) {
+ // that's ok
+ }
+ }
}
/*
@@ -1949,6 +1983,16 @@ public class Store extends SchemaConfigu
return new StoreFlusherImpl(cacheFlushId);
}
+ private class CacheFlushInfo {
+ Path tmpPath;
+ Path dstPath;
+
+ long entries;
+ long sequenceId;
+ long memSize;
+ long fileSize;
+ }
+
private class StoreFlusherImpl implements StoreFlusher {
private long cacheFlushId;
@@ -1956,6 +2000,8 @@ public class Store extends SchemaConfigu
private StoreFile storeFile;
private TimeRangeTracker snapshotTimeRangeTracker;
+ CacheFlushInfo cacheFlushInfo;
+
private StoreFlusherImpl(long cacheFlushId) {
this.cacheFlushId = cacheFlushId;
}
@@ -1965,12 +2011,13 @@ public class Store extends SchemaConfigu
memstore.snapshot();
this.snapshot = memstore.getSnapshot();
this.snapshotTimeRangeTracker = memstore.getSnapshotTimeRangeTracker();
+ this.cacheFlushInfo = new CacheFlushInfo();
}
@Override
public void flushCache(MonitoredTask status) throws IOException {
storeFile = Store.this.flushCache(cacheFlushId, snapshot,
- snapshotTimeRangeTracker, status);
+ snapshotTimeRangeTracker, status, cacheFlushInfo);
}
@Override
@@ -1978,9 +2025,12 @@ public class Store extends SchemaConfigu
if (storeFile == null) {
return false;
}
- // Add new file to store files. Clear snapshot too while we have
- // the Store write lock.
- return Store.this.updateStorefiles(storeFile, snapshot);
+ return Store.this.commit(storeFile, snapshot, cacheFlushInfo);
+ }
+
+ @Override
+ public void cancel() {
+ Store.this.cancel(cacheFlushInfo);
}
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java?rev=1542033&r1=1542032&r2=1542033&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java Thu Nov 14 19:18:23 2013
@@ -63,4 +63,9 @@ interface StoreFlusher {
*/
boolean commit() throws IOException;
+ /**
+ * Cancel the flush - remove files from file system
+ */
+ void cancel();
+
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java?rev=1542033&r1=1542032&r2=1542033&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java Thu Nov 14 19:18:23 2013
@@ -39,5 +39,7 @@ public enum InjectionEvent {
// Injection into Store.java
READONLYSTORE_COMPACTION_WHILE_SNAPSHOTTING,
- STORESCANNER_COMPACTION_RACE
+ STORESCANNER_COMPACTION_RACE,
+ STOREFILE_AFTER_WRITE_CLOSE,
+ STOREFILE_AFTER_RENAME
}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1542033&r1=1542032&r2=1542033&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Thu Nov 14 19:18:23 2013
@@ -23,8 +23,10 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -66,6 +68,8 @@ import org.apache.hadoop.hbase.util.Envi
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
+import org.apache.hadoop.hbase.util.InjectionEvent;
+import org.apache.hadoop.hbase.util.InjectionHandler;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
@@ -1468,7 +1472,7 @@ public class TestHRegion extends HBaseTe
// Setting up region
String method = this.getName();
byte[] tableName = Bytes.toBytes("testtableNextRows");
- byte[][] rows = {Bytes.toBytes("row1"), Bytes.toBytes("row2"),
+ byte[][] rows = {Bytes.toBytes("row1"), Bytes.toBytes("row2"),
Bytes.toBytes("rows3")};
byte[][] families = { Bytes.toBytes("fam1"), Bytes.toBytes("fam2"),
Bytes.toBytes("fam3"), Bytes.toBytes("fam4") };
@@ -1478,8 +1482,8 @@ public class TestHRegion extends HBaseTe
List<KeyValue> expected = new ArrayList<KeyValue>();
fillTable(rows, families, 2, expected);
/**
- * in this case we know kv size = 28
- * KLEN VLEN ROWLEN ROWNAME CFLEN CFNAME TS TYPE
+ * in this case we know kv size = 28
+ * KLEN VLEN ROWLEN ROWNAME CFLEN CFNAME TS TYPE
* --4-|--4-|--2---|---4---|--1--|--4---|-8-|--1-- ===> 28 bytes
*/
Scan scan = new Scan();
@@ -1488,23 +1492,23 @@ public class TestHRegion extends HBaseTe
scan.addFamily(families[3]);
// fetch one kv even when responseSize = 0, oh well, this's the semantic
- // that users should be aware of
+ // that users should be aware of
compareNextRows(scan, 0, true, Integer.MAX_VALUE, expected.subList(0, 1));
// fetch the last kv pair if the responseSize is not big enough
compareNextRows(scan, 1, true, Integer.MAX_VALUE, expected.subList(0, 1));
- // maxResponseSize perfectly fits one kv
+ // maxResponseSize perfectly fits one kv
compareNextRows(scan, 28, true, Integer.MAX_VALUE, expected.subList(0, 1));
- // if partialRow == true, fetch as much as maxResponseSize allows
+ // if partialRow == true, fetch as much as maxResponseSize allows
compareNextRows(scan, 29, true, Integer.MAX_VALUE, expected.subList(0, 2));
- // if partialRow == false, fetch the entire row
+ // if partialRow == false, fetch the entire row
compareNextRows(scan, 29, false, Integer.MAX_VALUE, expected.subList(0, 6));
-
+
// fetch everything in the table as long as responseSize is big enough
compareNextRows(scan, 10000, true, Integer.MAX_VALUE, expected);
compareNextRows(scan, 10000, false, Integer.MAX_VALUE, expected);
-
- // check nbRows
+
+ // check nbRows
// fetch two rows, each has two columns and each column has 3 kvs
compareNextRows(scan, 10000, true, 2, expected.subList(0, 12));
compareNextRows(scan, 10000, false, 2, expected.subList(0, 12));
@@ -2977,6 +2981,72 @@ public class TestHRegion extends HBaseTe
assertTrue(keyValues.length == 0);
}
+ public void testRemoveStoreFilesOnWriteFailure()
+ throws IOException {
+ byte[] table = Bytes.toBytes("table");
+ byte[][] families = new byte[][] {
+ Bytes.toBytes("family1"),
+ Bytes.toBytes("family2"),
+ Bytes.toBytes("family3")
+ };
+ initHRegion(table, getName(), families);
+
+ Put put = new Put(Bytes.toBytes("row"));
+ put.add(families[0], null, null);
+ put.add(families[1], null, null);
+ put.add(families[2], null, null);
+ region.put(put);
+
+ class InjectionHandlerImpl extends InjectionHandler {
+
+ private Set<Path> paths = new HashSet<Path>();
+
+ private int writeCount = 0;
+
+ protected void _processEventIO(InjectionEvent event, Object... args)
+ throws IOException {
+ switch (event) {
+ case STOREFILE_AFTER_WRITE_CLOSE:
+ {
+ paths.add((Path) args[0]);
+ if (++writeCount == 2) {
+ throw new IOException();
+ }
+ break;
+ }
+ case STOREFILE_AFTER_RENAME:
+ {
+ paths.add((Path) args[1]);
+ break;
+ }
+ }
+ }
+
+ public void validate()
+ throws IOException {
+ for (Path path : paths) {
+ assertFalse("file should not exist: " + path, region.fs.exists(path));
+ }
+ }
+
+ }
+
+ InjectionHandlerImpl ih = new InjectionHandlerImpl();
+ InjectionHandler.set(ih);
+
+ try {
+ region.flushcache();
+ fail();
+ }
+ catch (IOException e) {
+ // that's expected
+ }
+
+ ih.validate();
+
+ InjectionHandler.clear();
+ }
+
private void putData(int startRow, int numRows, byte [] qf,
byte [] ...families)
throws IOException {