You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by se...@apache.org on 2019/03/29 20:45:03 UTC

[hbase] branch branch-2 updated: HBASE-22115 HBase RPC aspires to grow an infinite tree of trace scopes; some other places are also unsafe

This is an automated email from the ASF dual-hosted git repository.

sershe pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 8bceaf7  HBASE-22115 HBase RPC aspires to grow an infinite tree of trace scopes; some other places are also unsafe
8bceaf7 is described below

commit 8bceaf7f0b16badb6567f17bae0fbca714c8fceb
Author: Sergey Shelukhin <se...@apache.org>
AuthorDate: Fri Mar 29 13:38:11 2019 -0700

    HBASE-22115 HBase RPC aspires to grow an infinite tree of trace scopes; some other places are also unsafe
    
    Signed-off-by: Andrew Purtell <ap...@apache.org>, stack <st...@apache.org>
---
 .../org/apache/hadoop/hbase/ipc/CallRunner.java    |   7 +-
 .../hadoop/hbase/regionserver/MemStoreFlusher.java | 130 ++++++++++-----------
 2 files changed, 70 insertions(+), 67 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index e4763a5..85f58ba 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.security.User;
 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.htrace.core.TraceScope;
 
 /**
  * The request processing logic, which is usually executed in thread pools provided by an
@@ -115,6 +116,7 @@ public class CallRunner {
       String error = null;
       Pair<Message, CellScanner> resultPair = null;
       RpcServer.CurCall.set(call);
+      TraceScope traceScope = null;
       try {
         if (!this.rpcServer.isStarted()) {
           InetSocketAddress address = rpcServer.getListenerAddress();
@@ -125,7 +127,7 @@ public class CallRunner {
             call.getService() != null ? call.getService().getDescriptorForType().getName() : "";
         String methodName = (call.getMethod() != null) ? call.getMethod().getName() : "";
         String traceString = serviceName + "." + methodName;
-        TraceUtil.createTrace(traceString);
+        traceScope = TraceUtil.createTrace(traceString);
         // make the call
         resultPair = this.rpcServer.call(call, this.status);
       } catch (TimeoutIOException e){
@@ -147,6 +149,9 @@ public class CallRunner {
           throw (Error)e;
         }
       } finally {
+        if (traceScope != null) {
+          traceScope.close();
+        }
         RpcServer.CurCall.set(null);
         if (resultPair != null) {
           this.rpcServer.addCallSize(call.getSize() * -1);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index 134cbae..a53008d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -693,83 +693,81 @@ class MemStoreFlusher implements FlushRequester {
    * amount of memstore consumption.
    */
   public void reclaimMemStoreMemory() {
-    TraceScope scope = TraceUtil.createTrace("MemStoreFluser.reclaimMemStoreMemory");
-    FlushType flushType = isAboveHighWaterMark();
-    if (flushType != FlushType.NORMAL) {
-      TraceUtil.addTimelineAnnotation("Force Flush. We're above high water mark.");
-      long start = EnvironmentEdgeManager.currentTime();
-      long nextLogTimeMs = start;
-      synchronized (this.blockSignal) {
-        boolean blocked = false;
-        long startTime = 0;
-        boolean interrupted = false;
-        try {
-          flushType = isAboveHighWaterMark();
-          while (flushType != FlushType.NORMAL && !server.isStopped()) {
-            server.cacheFlusher.setFlushType(flushType);
-            if (!blocked) {
-              startTime = EnvironmentEdgeManager.currentTime();
-              if (!server.getRegionServerAccounting().isOffheap()) {
-                logMsg("global memstore heapsize",
-                    server.getRegionServerAccounting().getGlobalMemStoreHeapSize(),
-                    server.getRegionServerAccounting().getGlobalMemStoreLimit());
-              } else {
-                switch (flushType) {
-                case ABOVE_OFFHEAP_HIGHER_MARK:
-                  logMsg("the global offheap memstore datasize",
-                      server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(),
-                      server.getRegionServerAccounting().getGlobalMemStoreLimit());
-                  break;
-                case ABOVE_ONHEAP_HIGHER_MARK:
+    try (TraceScope scope = TraceUtil.createTrace("MemStoreFluser.reclaimMemStoreMemory")) {
+      FlushType flushType = isAboveHighWaterMark();
+      if (flushType != FlushType.NORMAL) {
+        TraceUtil.addTimelineAnnotation("Force Flush. We're above high water mark.");
+        long start = EnvironmentEdgeManager.currentTime();
+        long nextLogTimeMs = start;
+        synchronized (this.blockSignal) {
+          boolean blocked = false;
+          long startTime = 0;
+          boolean interrupted = false;
+          try {
+            flushType = isAboveHighWaterMark();
+            while (flushType != FlushType.NORMAL && !server.isStopped()) {
+              server.cacheFlusher.setFlushType(flushType);
+              if (!blocked) {
+                startTime = EnvironmentEdgeManager.currentTime();
+                if (!server.getRegionServerAccounting().isOffheap()) {
                   logMsg("global memstore heapsize",
                       server.getRegionServerAccounting().getGlobalMemStoreHeapSize(),
-                      server.getRegionServerAccounting().getGlobalOnHeapMemStoreLimit());
-                  break;
-                default:
-                  break;
+                      server.getRegionServerAccounting().getGlobalMemStoreLimit());
+                } else {
+                  switch (flushType) {
+                    case ABOVE_OFFHEAP_HIGHER_MARK:
+                      logMsg("the global offheap memstore datasize",
+                          server.getRegionServerAccounting().getGlobalMemStoreOffHeapSize(),
+                          server.getRegionServerAccounting().getGlobalMemStoreLimit());
+                      break;
+                    case ABOVE_ONHEAP_HIGHER_MARK:
+                      logMsg("global memstore heapsize",
+                          server.getRegionServerAccounting().getGlobalMemStoreHeapSize(),
+                          server.getRegionServerAccounting().getGlobalOnHeapMemStoreLimit());
+                      break;
+                    default:
+                      break;
+                  }
                 }
               }
+              blocked = true;
+              wakeupFlushThread();
+              try {
+                // we should be able to wait forever, but we've seen a bug where
+                // we miss a notify, so put a 5 second bound on it at least.
+                blockSignal.wait(5 * 1000);
+              } catch (InterruptedException ie) {
+                LOG.warn("Interrupted while waiting");
+                interrupted = true;
+              }
+              long nowMs = EnvironmentEdgeManager.currentTime();
+              if (nowMs >= nextLogTimeMs) {
+                LOG.warn("Memstore is above high water mark and block {} ms", nowMs - start);
+                nextLogTimeMs = nowMs + 1000;
+              }
+              flushType = isAboveHighWaterMark();
             }
-            blocked = true;
-            wakeupFlushThread();
-            try {
-              // we should be able to wait forever, but we've seen a bug where
-              // we miss a notify, so put a 5 second bound on it at least.
-              blockSignal.wait(5 * 1000);
-            } catch (InterruptedException ie) {
-              LOG.warn("Interrupted while waiting");
-              interrupted = true;
-            }
-            long nowMs = EnvironmentEdgeManager.currentTime();
-            if (nowMs >= nextLogTimeMs) {
-              LOG.warn("Memstore is above high water mark and block {} ms", nowMs - start);
-              nextLogTimeMs = nowMs + 1000;
+          } finally {
+            if (interrupted) {
+              Thread.currentThread().interrupt();
             }
-            flushType = isAboveHighWaterMark();
-          }
-        } finally {
-          if (interrupted) {
-            Thread.currentThread().interrupt();
           }
-        }
 
-        if(blocked){
-          final long totalTime = EnvironmentEdgeManager.currentTime() - startTime;
-          if(totalTime > 0){
-            this.updatesBlockedMsHighWater.add(totalTime);
+          if(blocked){
+            final long totalTime = EnvironmentEdgeManager.currentTime() - startTime;
+            if(totalTime > 0){
+              this.updatesBlockedMsHighWater.add(totalTime);
+            }
+            LOG.info("Unblocking updates for server " + server.toString());
           }
-          LOG.info("Unblocking updates for server " + server.toString());
+        }
+      } else {
+        flushType = isAboveLowWaterMark();
+        if (flushType != FlushType.NORMAL) {
+          server.cacheFlusher.setFlushType(flushType);
+          wakeupFlushThread();
         }
       }
-    } else {
-      flushType = isAboveLowWaterMark();
-      if (flushType != FlushType.NORMAL) {
-        server.cacheFlusher.setFlushType(flushType);
-        wakeupFlushThread();
-      }
-    }
-    if(scope!= null) {
-      scope.close();
     }
   }