You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/02/12 20:18:34 UTC
svn commit: r1567725 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/regionserver/
Author: liyin
Date: Wed Feb 12 19:18:33 2014
New Revision: 1567725
URL: http://svn.apache.org/r1567725
Log:
[master] Force double flushes in HRegion.close() to prevent data loss
Author: fan
Summary: Force double flushes in HRegion.close() to prevent data loss.
Test Plan: A new unit test to verify region close when store's snapshot is not empty.
Reviewers: liyintang, gauravm, adela, aaiyer, jiqingt, manukranthk
Reviewed By: aaiyer
CC: hbase-dev@
Differential Revision: https://phabricator.fb.com/D1163424
Task ID: 3621385
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionClose.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1567725&r1=1567724&r2=1567725&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Feb 12 19:18:33 2014
@@ -845,12 +845,10 @@ public class HRegion implements HeapSize
status.setStatus("Waiting for split lock");
synchronized (splitLock) {
status.setStatus("Disabling compacts and flushes for region");
- boolean wasFlushing = false;
synchronized (writestate) {
// Disable compacting and flushing by background threads for this
// region.
writestate.writesEnabled = false;
- wasFlushing = writestate.flushing;
LOG.debug("Closing " + this + ": disabling compactions & flushes");
while (writestate.compacting > 0 || writestate.flushing) {
LOG.debug("waiting for " + writestate.compacting + " compactions" +
@@ -863,13 +861,23 @@ public class HRegion implements HeapSize
}
}
}
- // If we were not just flushing, is it worth doing a preflush...one
- // that will clear out of the bulk of the memstore before we put up
- // the close flag?
- if (!abort && !wasFlushing && worthPreFlushing()) {
+
+ // First flush clears content in either snapshots or current memstores
+ if (!abort) {
status.setStatus("Pre-flushing region before close");
LOG.info("Running close preflush of " + this.getRegionNameAsString());
- internalFlushcache(status);
+
+ try {
+ internalFlushcache(status);
+ } catch (IOException ioe) {
+ // Failed to flush the region but probably it is still able to serve request,
+ // so re-enable writes to it.
+ status.setStatus("Failed to flush the region, putting it online again");
+ synchronized (writestate) {
+ writestate.writesEnabled = true;
+ }
+ throw ioe;
+ }
}
newScannerLock.writeLock().lock();
this.closing.set(true);
@@ -885,9 +893,18 @@ public class HRegion implements HeapSize
waitOnRowLocks();
LOG.debug("No more row locks outstanding on region " + this);
- // Don't flush the cache if we are aborting
+ // Second flush to ensure no unflushed data in memory.
if (!abort) {
- internalFlushcache(status);
+ try {
+ internalFlushcache(status);
+ } catch (IOException ioe) {
+ status.setStatus("Failed to flush the region, putting it online again");
+ synchronized (writestate) {
+ writestate.writesEnabled = true;
+ }
+ this.closing.set(false);
+ throw ioe;
+ }
}
List<StoreFile> result = new ArrayList<StoreFile>();
@@ -902,6 +919,7 @@ public class HRegion implements HeapSize
// close each store in parallel
for (final Store store : stores.values()) {
+ assert store.getFlushableMemstoreSize() == 0;
completionService
.submit(new Callable<ImmutableList<StoreFile>>() {
public ImmutableList<StoreFile> call() throws IOException {
@@ -928,6 +946,9 @@ public class HRegion implements HeapSize
}
}
this.closed.set(true);
+ if (memstoreSize.get() != 0) {
+ LOG.error("Memstore size should be 0 after clean region close, but is " + memstoreSize.get());
+ }
status.markComplete("Closed");
LOG.info("Closed " + this);
return result;
@@ -942,14 +963,6 @@ public class HRegion implements HeapSize
}
}
- /**
- * @return True if its worth doing a flush before we put up the close flag.
- */
- private boolean worthPreFlushing() {
- return this.memstoreSize.get() >
- this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
- }
-
//////////////////////////////////////////////////////////////////////////////
// HRegion accessors
//////////////////////////////////////////////////////////////////////////////
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1567725&r1=1567724&r2=1567725&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Feb 12 19:18:33 2014
@@ -2172,7 +2172,7 @@ public class HRegionServer implements HR
"interrupted.", ex);
}
} else {
- LOG.error("unable to process message" +
+ LOG.error("FAILED TO PROCESS MESSAGE FROM MASTER" +
(e != null ? (": " + e.msg.toString()) : ""), ex);
checkFileSystem();
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1567725&r1=1567724&r2=1567725&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Wed Feb 12 19:18:33 2014
@@ -84,7 +84,7 @@ public class MemStore implements HeapSiz
// Used to track own heapSize
final AtomicLong size;
- private long snapshotSize;
+ volatile private long snapshotSize;
private AtomicLong numDeletesInKvSet;
private AtomicLong numDeletesInSnapshot;
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionClose.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionClose.java?rev=1567725&r1=1567724&r2=1567725&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionClose.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionClose.java Wed Feb 12 19:18:33 2014
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
import org.apache.hadoop.hbase.util.Bytes;
@@ -31,6 +32,7 @@ import org.apache.hadoop.hbase.zookeeper
import org.apache.zookeeper.data.Stat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -96,4 +98,29 @@ public class TestHRegionClose {
public void mainTest() throws Exception {
tryCloseRegion();
}
+
+ @Test
+ public void testMemstoreCleanup() throws Exception {
+ HRegion region = server.getOnlineRegionsAsArray()[0];
+
+ Store store = region.getStore(FAMILIES[0]);
+
+ byte[] row = region.getStartKey();
+ byte[] value = Bytes.toBytes("testMemstoreCleanup");
+ Put put = new Put(row);
+ put.add(FAMILIES[0], null, Bytes.toBytes("testMemstoreCleanup"));
+
+ // First put something in current memstore, which will be in snapshot after flusher.prepare()
+ region.put(put);
+
+ StoreFlusher flusher = store.getStoreFlusher(12345);
+ flusher.prepare();
+
+ // Second put something in current memstore
+ put.add(FAMILIES[0], Bytes.toBytes("abc"), value);
+ region.put(put);
+
+ region.close();
+ assertEquals(0, region.getMemstoreSize().get());
+ }
}