You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2019/07/25 15:58:06 UTC
[hbase] branch branch-1 updated: HBASE-22132 Backport HBASE-22115
to branch-1 (#395)
This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1 by this push:
new 2be03e1 HBASE-22132 Backport HBASE-22115 to branch-1 (#395)
2be03e1 is described below
commit 2be03e130bc46011ec1a07853f7a8b357b503b9a
Author: Viraj Jasani <vj...@salesforce.com>
AuthorDate: Thu Jul 25 21:27:59 2019 +0530
HBASE-22132 Backport HBASE-22115 to branch-1 (#395)
Signed-off-by: Reid Chan <re...@apache.org>
Signed-off-by: Andrew Purtell <ap...@apache.org>
---
.../hadoop/hbase/regionserver/MemStoreFlusher.java | 97 +++++++++++-----------
1 file changed, 49 insertions(+), 48 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index debca0f..a92a275 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -579,63 +579,64 @@ class MemStoreFlusher implements FlushRequester {
* amount of memstore consumption.
*/
public void reclaimMemStoreMemory() {
- TraceScope scope = Trace.startSpan("MemStoreFluser.reclaimMemStoreMemory");
- if (isAboveHighWaterMark()) {
- if (Trace.isTracing()) {
- scope.getSpan().addTimelineAnnotation("Force Flush. We're above high water mark.");
- }
- long start = EnvironmentEdgeManager.currentTime();
- long nextLogTimeMs = start;
- synchronized (this.blockSignal) {
- boolean blocked = false;
- long startTime = 0;
- boolean interrupted = false;
- try {
- while (isAboveHighWaterMark() && !server.isStopped()) {
- if (!blocked) {
- startTime = EnvironmentEdgeManager.currentTime();
- LOG.info("Blocking updates on "
- + server.toString()
- + ": the global memstore size "
- + TraditionalBinaryPrefix.long2String(server.getRegionServerAccounting()
- .getGlobalMemstoreSize(), "", 1) + " is >= than blocking "
- + TraditionalBinaryPrefix.long2String(globalMemStoreLimit, "", 1) + " size");
- }
- blocked = true;
- wakeupFlushThread();
- try {
- // we should be able to wait forever, but we've seen a bug where
- // we miss a notify, so put a 5 second bound on it at least.
- blockSignal.wait(5 * 1000);
- } catch (InterruptedException ie) {
- LOG.warn("Interrupted while waiting");
- interrupted = true;
+ try (TraceScope scope = Trace.startSpan("MemStoreFluser.reclaimMemStoreMemory")) {
+ if (isAboveHighWaterMark()) {
+ if (Trace.isTracing()) {
+ scope.getSpan().addTimelineAnnotation("Force Flush. We're above high water mark.");
+ }
+ long start = EnvironmentEdgeManager.currentTime();
+ long nextLogTimeMs = start;
+ synchronized (this.blockSignal) {
+ boolean blocked = false;
+ long startTime = 0;
+ boolean interrupted = false;
+ try {
+ while (isAboveHighWaterMark() && !server.isStopped()) {
+ if (!blocked) {
+ startTime = EnvironmentEdgeManager.currentTime();
+ LOG.info("Blocking updates on "
+ + server.toString()
+ + ": the global memstore size "
+ + TraditionalBinaryPrefix.long2String(server.getRegionServerAccounting()
+ .getGlobalMemstoreSize(), "", 1) + " is >= than blocking "
+ + TraditionalBinaryPrefix.long2String(globalMemStoreLimit, "", 1) + " size");
+ }
+ blocked = true;
+ wakeupFlushThread();
+ try {
+ // we should be able to wait forever, but we've seen a bug where
+ // we miss a notify, so put a 5 second bound on it at least.
+ blockSignal.wait(5 * 1000);
+ } catch (InterruptedException ie) {
+ LOG.warn("Interrupted while waiting");
+ interrupted = true;
+ }
+ long nowMs = EnvironmentEdgeManager.currentTime();
+ if (nowMs >= nextLogTimeMs) {
+ LOG.warn("Memstore is above high water mark and block " + (nowMs - start) + " ms");
+ nextLogTimeMs = nowMs + 1000;
+ }
}
- long nowMs = EnvironmentEdgeManager.currentTime();
- if (nowMs >= nextLogTimeMs) {
- LOG.warn("Memstore is above high water mark and block " + (nowMs - start) + " ms");
- nextLogTimeMs = nowMs + 1000;
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
}
}
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
- }
- if(blocked){
- final long totalTime = EnvironmentEdgeManager.currentTime() - startTime;
- if(totalTime > 0){
- this.updatesBlockedMsHighWater.add(totalTime);
+ if (blocked) {
+ final long totalTime = EnvironmentEdgeManager.currentTime() - startTime;
+ if (totalTime > 0) {
+ this.updatesBlockedMsHighWater.add(totalTime);
+ }
+ LOG.info("Unblocking updates for server " + server.toString());
}
- LOG.info("Unblocking updates for server " + server.toString());
}
+ } else if (isAboveLowWaterMark()) {
+ wakeupFlushThread();
}
- } else if (isAboveLowWaterMark()) {
- wakeupFlushThread();
}
- scope.close();
}
+
@Override
public String toString() {
return "flush_queue="