You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2014/03/14 21:36:39 UTC
svn commit: r1577694 - in
/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver:
HRegion.java MemStore.java Store.java
Author: stack
Date: Fri Mar 14 20:36:39 2014
New Revision: 1577694
URL: http://svn.apache.org/r1577694
Log:
HBASE-10514 Forward port HBASE-10466, possible data loss when failed flushes
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1577694&r1=1577693&r2=1577694&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri Mar 14 20:36:39 2014
@@ -1042,22 +1042,25 @@ public class HRegion implements HeapSize
}
status.setStatus("Disabling compacts and flushes for region");
- boolean wasFlushing = false;
synchronized (writestate) {
// Disable compacting and flushing by background threads for this
// region.
writestate.writesEnabled = false;
- wasFlushing = writestate.flushing;
LOG.debug("Closing " + this + ": disabling compactions & flushes");
waitForFlushesAndCompactions();
}
// If we were not just flushing, is it worth doing a preflush...one
// that will clear out of the bulk of the memstore before we put up
// the close flag?
- if (!abort && !wasFlushing && worthPreFlushing()) {
+ if (!abort && worthPreFlushing()) {
status.setStatus("Pre-flushing region before close");
LOG.info("Running close preflush of " + this.getRegionNameAsString());
- internalFlushcache(status);
+ try {
+ internalFlushcache(status);
+ } catch (IOException ioe) {
+ // Failed to flush the region. Keep going.
+ status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
+ }
}
this.closing.set(true);
@@ -1073,7 +1076,30 @@ public class HRegion implements HeapSize
LOG.debug("Updates disabled for region " + this);
// Don't flush the cache if we are aborting
if (!abort) {
- internalFlushcache(status);
+ int flushCount = 0;
+ while (this.getMemstoreSize().get() > 0) {
+ try {
+ if (flushCount++ > 0) {
+ int actualFlushes = flushCount - 1;
+ if (actualFlushes > 5) {
+ // If we tried 5 times and are unable to clear memory, abort
+ // so we do not lose data
+ throw new DroppedSnapshotException("Failed clearing memory after " +
+ actualFlushes + " attempts on region: " + Bytes.toStringBinary(getRegionName()));
+ }
+ LOG.info("Running extra flush, " + actualFlushes +
+ " (carrying snapshot?) " + this);
+ }
+ internalFlushcache(status);
+ } catch (IOException ioe) {
+ status.setStatus("Failed flush " + this + ", putting online again");
+ synchronized (writestate) {
+ writestate.writesEnabled = true;
+ }
+ // Have to throw to upper layers. I can't abort server from here.
+ throw ioe;
+ }
+ }
}
List<StoreFile> result = new ArrayList<StoreFile>();
@@ -1088,6 +1114,7 @@ public class HRegion implements HeapSize
// close each store in parallel
for (final Store store : stores.values()) {
+ assert abort? true: store.getFlushableSize() == 0;
completionService
.submit(new Callable<ImmutableList<StoreFile>>() {
public ImmutableList<StoreFile> call() throws IOException {
@@ -1111,7 +1138,7 @@ public class HRegion implements HeapSize
}
}
this.closed.set(true);
-
+ if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get());
if (coprocessorHost != null) {
status.setStatus("Running coprocessor post-close hooks");
this.coprocessorHost.postClose(abort);
@@ -1629,7 +1656,7 @@ public class HRegion implements HeapSize
status.setStatus("Obtaining lock to block concurrent updates");
// block waiting for the lock for internal flush
this.updatesLock.writeLock().lock();
- long flushsize = this.memstoreSize.get();
+ long totalFlushableSize = 0;
status.setStatus("Preparing to flush by snapshotting stores");
List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());
try {
@@ -1642,6 +1669,7 @@ public class HRegion implements HeapSize
completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
for (Store s : stores.values()) {
+ totalFlushableSize += s.getFlushableSize();
storeFlushers.add(s.getStoreFlusher(completeSequenceId));
}
@@ -1653,7 +1681,7 @@ public class HRegion implements HeapSize
this.updatesLock.writeLock().unlock();
}
String s = "Finished snapshotting " + this +
- ", commencing wait for mvcc, flushsize=" + flushsize;
+ ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize;
status.setStatus(s);
LOG.debug(s);
@@ -1699,7 +1727,7 @@ public class HRegion implements HeapSize
storeFlushers.clear();
// Set down the memstore size by amount of flush.
- this.addAndGetGlobalMemstoreSize(-flushsize);
+ this.addAndGetGlobalMemstoreSize(-totalFlushableSize);
} catch (Throwable t) {
// An exception here means that the snapshot was not persisted.
// The hlog needs to be replayed so its content is restored to memstore.
@@ -1739,7 +1767,7 @@ public class HRegion implements HeapSize
long time = EnvironmentEdgeManager.currentTimeMillis() - startTime;
long memstoresize = this.memstoreSize.get();
String msg = "Finished memstore flush of ~" +
- StringUtils.humanReadableInt(flushsize) + "/" + flushsize +
+ StringUtils.humanReadableInt(totalFlushableSize) + "/" + totalFlushableSize +
", currentsize=" +
StringUtils.humanReadableInt(memstoresize) + "/" + memstoresize +
" for region " + this + " in " + time + "ms, sequenceid=" + sequenceId +
@@ -1747,7 +1775,7 @@ public class HRegion implements HeapSize
((wal == null)? "; wal=null": "");
LOG.info(msg);
status.setStatus(msg);
- this.recentFlushes.add(new Pair<Long,Long>(time/1000, flushsize));
+ this.recentFlushes.add(new Pair<Long,Long>(time/1000, totalFlushableSize));
return compactionRequested;
}
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1577694&r1=1577693&r2=1577694&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Fri Mar 14 20:36:39 2014
@@ -88,6 +88,7 @@ public class MemStore implements HeapSiz
// Used to track own heapSize
final AtomicLong size;
+ private volatile long snapshotSize;
// Used to track when to flush
volatile long timeOfOldestEdit = Long.MAX_VALUE;
@@ -122,6 +123,7 @@ public class MemStore implements HeapSiz
timeRangeTracker = new TimeRangeTracker();
snapshotTimeRangeTracker = new TimeRangeTracker();
this.size = new AtomicLong(DEEP_OVERHEAD);
+ this.snapshotSize = 0;
if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
this.allocator = new MemStoreLAB(conf);
} else {
@@ -151,6 +153,7 @@ public class MemStore implements HeapSiz
"Doing nothing. Another ongoing flush or did we fail last attempt?");
} else {
if (!this.kvset.isEmpty()) {
+ this.snapshotSize = keySize();
this.snapshot = this.kvset;
this.kvset = new KeyValueSkipListSet(this.comparator);
this.snapshotTimeRangeTracker = this.timeRangeTracker;
@@ -179,6 +182,18 @@ public class MemStore implements HeapSiz
}
/**
+ * On flush, how much memory we will clear.
+ * Flush will first clear out the data in snapshot if any (It will take a second flush
+ * invocation to clear the current Cell set). If snapshot is empty, current
+ * Cell set will be flushed.
+ *
+ * @return size of data that is going to be flushed
+ */
+ long getFlushableSize() {
+ return this.snapshotSize > 0 ? this.snapshotSize : keySize();
+ }
+
+ /**
* The passed snapshot was successfully persisted; it can be let go.
* @param ss The snapshot to clean out.
* @throws UnexpectedException
@@ -196,6 +211,7 @@ public class MemStore implements HeapSiz
this.snapshot = new KeyValueSkipListSet(this.comparator);
this.snapshotTimeRangeTracker = new TimeRangeTracker();
}
+ this.snapshotSize = 0;
}
/**
@@ -877,7 +893,7 @@ public class MemStore implements HeapSiz
}
public final static long FIXED_OVERHEAD = ClassSize.align(
- ClassSize.OBJECT + (10 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
+ ClassSize.OBJECT + (10 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG));
public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
ClassSize.ATOMIC_LONG +
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1577694&r1=1577693&r2=1577694&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Fri Mar 14 20:36:39 2014
@@ -363,6 +363,11 @@ public class Store extends SchemaConfigu
return getStoreHomedir(tabledir, encodedName, Bytes.toString(family));
}
+ public long getFlushableSize() {
+ return this.memstore.getFlushableSize();
+ }
+
+
/**
* @param tabledir
* @param encodedName Encoded region name.
@@ -796,7 +801,7 @@ public class Store extends SchemaConfigu
return pathName;
} catch (Exception e) {
LOG.warn("Failed validating store file " + pathName
- + ", retring num=" + i, e);
+ + ", retrying num=" + i, e);
if (e instanceof IOException) {
lastException = (IOException) e;
} else {
@@ -804,7 +809,7 @@ public class Store extends SchemaConfigu
}
}
} catch (IOException e) {
- LOG.warn("Failed flushing store file, retring num=" + i, e);
+ LOG.warn("Failed flushing store file, retrying num=" + i, e);
lastException = e;
}
if (lastException != null && i < (flush_retries_number - 1)) {