You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by se...@apache.org on 2019/03/29 20:45:03 UTC
[hbase] branch branch-2 updated: HBASE-22115 HBase RPC aspires to
grow an infinite tree of trace scopes; some other places are also unsafe
This is an automated email from the ASF dual-hosted git repository.
sershe pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 8bceaf7 HBASE-22115 HBase RPC aspires to grow an infinite tree of trace scopes; some other places are also unsafe
8bceaf7 is described below
commit 8bceaf7f0b16badb6567f17bae0fbca714c8fceb
Author: Sergey Shelukhin <se...@apache.org>
AuthorDate: Fri Mar 29 13:38:11 2019 -0700
HBASE-22115 HBase RPC aspires to grow an infinite tree of trace scopes; some other places are also unsafe
Signed-off-by: Andrew Purtell <ap...@apache.org>, stack <st...@apache.org>
---
.../org/apache/hadoop/hbase/ipc/CallRunner.java | 7 +-
.../hadoop/hbase/regionserver/MemStoreFlusher.java | 130 ++++++++++-----------
2 files changed, 70 insertions(+), 67 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index e4763a5..85f58ba 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.StringUtils;
+import org.apache.htrace.core.TraceScope;
/**
* The request processing logic, which is usually executed in thread pools provided by an
@@ -115,6 +116,7 @@ public class CallRunner {
String error = null;
Pair<Message, CellScanner> resultPair = null;
RpcServer.CurCall.set(call);
+ TraceScope traceScope = null;
try {
if (!this.rpcServer.isStarted()) {
InetSocketAddress address = rpcServer.getListenerAddress();
@@ -125,7 +127,7 @@ public class CallRunner {
call.getService() != null ? call.getService().getDescriptorForType().getName() : "";
String methodName = (call.getMethod() != null) ? call.getMethod().getName() : "";
String traceString = serviceName + "." + methodName;
- TraceUtil.createTrace(traceString);
+ traceScope = TraceUtil.createTrace(traceString);
// make the call
resultPair = this.rpcServer.call(call, this.status);
} catch (TimeoutIOException e){
@@ -147,6 +149,9 @@ public class CallRunner {
throw (Error)e;
}
} finally {
+ if (traceScope != null) {
+ traceScope.close();
+ }
RpcServer.CurCall.set(null);
if (resultPair != null) {
this.rpcServer.addCallSize(call.getSize() * -1);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index 134cbae..a53008d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -693,83 +693,81 @@ class MemStoreFlusher implements FlushRequester {
* amount of memstore consumption.
*/
public void reclaimMemStoreMemory() {
- TraceScope scope = TraceUtil.createTrace("MemStoreFluser.reclaimMemStoreMemory");
- FlushType flushType = isAboveHighWaterMark();
- if (flushType != FlushType.NORMAL) {
- TraceUtil.addTimelineAnnotation("Force Flush. We're above high water mark.");
- long start = EnvironmentEdgeManager.currentTime();
- long nextLogTimeMs = start;
- synchronized (this.blockSignal) {
- boolean blocked = false;
- long startTime = 0;
- boolean interrupted = false;
- try {
- flushType = isAboveHighWaterMark();
- while (flushType != FlushType.NORMAL && !server.isStopped()) {
- server.cacheFlusher.setFlushType(flushType);
- if (!blocked) {
- startTime = EnvironmentEdgeManager.currentTime();
- if (!server.getRegionServerAccounting().isOffheap()) {
- logMsg("global memstore heapsize",
- server.getRegionServerAccounting().getGlobalMemStoreHeapSize(),
- server.getRegionServerAccounting().getGlobalMemStoreLimit());
- } else {
- switch (flushType) {
- case ABOVE_OFFHEAP_HIGHER_MARK:
- logMsg("the global offheap memstore datasize",
- server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(),
- server.getRegionServerAccounting().getGlobalMemStoreLimit());
- break;
- case ABOVE_ONHEAP_HIGHER_MARK:
+ try (TraceScope scope = TraceUtil.createTrace("MemStoreFluser.reclaimMemStoreMemory")) {
+ FlushType flushType = isAboveHighWaterMark();
+ if (flushType != FlushType.NORMAL) {
+ TraceUtil.addTimelineAnnotation("Force Flush. We're above high water mark.");
+ long start = EnvironmentEdgeManager.currentTime();
+ long nextLogTimeMs = start;
+ synchronized (this.blockSignal) {
+ boolean blocked = false;
+ long startTime = 0;
+ boolean interrupted = false;
+ try {
+ flushType = isAboveHighWaterMark();
+ while (flushType != FlushType.NORMAL && !server.isStopped()) {
+ server.cacheFlusher.setFlushType(flushType);
+ if (!blocked) {
+ startTime = EnvironmentEdgeManager.currentTime();
+ if (!server.getRegionServerAccounting().isOffheap()) {
logMsg("global memstore heapsize",
server.getRegionServerAccounting().getGlobalMemStoreHeapSize(),
- server.getRegionServerAccounting().getGlobalOnHeapMemStoreLimit());
- break;
- default:
- break;
+ server.getRegionServerAccounting().getGlobalMemStoreLimit());
+ } else {
+ switch (flushType) {
+ case ABOVE_OFFHEAP_HIGHER_MARK:
+ logMsg("the global offheap memstore datasize",
+ server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(),
+ server.getRegionServerAccounting().getGlobalMemStoreLimit());
+ break;
+ case ABOVE_ONHEAP_HIGHER_MARK:
+ logMsg("global memstore heapsize",
+ server.getRegionServerAccounting().getGlobalMemStoreHeapSize(),
+ server.getRegionServerAccounting().getGlobalOnHeapMemStoreLimit());
+ break;
+ default:
+ break;
+ }
}
}
+ blocked = true;
+ wakeupFlushThread();
+ try {
+ // we should be able to wait forever, but we've seen a bug where
+ // we miss a notify, so put a 5 second bound on it at least.
+ blockSignal.wait(5 * 1000);
+ } catch (InterruptedException ie) {
+ LOG.warn("Interrupted while waiting");
+ interrupted = true;
+ }
+ long nowMs = EnvironmentEdgeManager.currentTime();
+ if (nowMs >= nextLogTimeMs) {
+ LOG.warn("Memstore is above high water mark and block {} ms", nowMs - start);
+ nextLogTimeMs = nowMs + 1000;
+ }
+ flushType = isAboveHighWaterMark();
}
- blocked = true;
- wakeupFlushThread();
- try {
- // we should be able to wait forever, but we've seen a bug where
- // we miss a notify, so put a 5 second bound on it at least.
- blockSignal.wait(5 * 1000);
- } catch (InterruptedException ie) {
- LOG.warn("Interrupted while waiting");
- interrupted = true;
- }
- long nowMs = EnvironmentEdgeManager.currentTime();
- if (nowMs >= nextLogTimeMs) {
- LOG.warn("Memstore is above high water mark and block {} ms", nowMs - start);
- nextLogTimeMs = nowMs + 1000;
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
}
- flushType = isAboveHighWaterMark();
- }
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
}
- }
- if(blocked){
- final long totalTime = EnvironmentEdgeManager.currentTime() - startTime;
- if(totalTime > 0){
- this.updatesBlockedMsHighWater.add(totalTime);
+ if(blocked){
+ final long totalTime = EnvironmentEdgeManager.currentTime() - startTime;
+ if(totalTime > 0){
+ this.updatesBlockedMsHighWater.add(totalTime);
+ }
+ LOG.info("Unblocking updates for server " + server.toString());
}
- LOG.info("Unblocking updates for server " + server.toString());
+ }
+ } else {
+ flushType = isAboveLowWaterMark();
+ if (flushType != FlushType.NORMAL) {
+ server.cacheFlusher.setFlushType(flushType);
+ wakeupFlushThread();
}
}
- } else {
- flushType = isAboveLowWaterMark();
- if (flushType != FlushType.NORMAL) {
- server.cacheFlusher.setFlushType(flushType);
- wakeupFlushThread();
- }
- }
- if(scope!= null) {
- scope.close();
}
}