You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/12/28 01:03:59 UTC

hbase git commit: HBASE-19635 Introduce a thread at RS side to call reportProcedureDone

Repository: hbase
Updated Branches:
  refs/heads/HBASE-19397 264890a91 -> 45c6b14c6


HBASE-19635 Introduce a thread at RS side to call reportProcedureDone


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/45c6b14c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/45c6b14c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/45c6b14c

Branch: refs/heads/HBASE-19397
Commit: 45c6b14c66ec0c2825e34ad475a905e409f41efc
Parents: 264890a
Author: zhangduo <zh...@apache.org>
Authored: Wed Dec 27 20:13:42 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu Dec 28 09:02:44 2017 +0800

----------------------------------------------------------------------
 .../src/main/protobuf/RegionServerStatus.proto  |   5 +-
 .../hadoop/hbase/master/MasterRpcServices.java  |  15 ++-
 .../hbase/regionserver/HRegionServer.java       |  80 +++++--------
 .../RemoteProcedureResultReporter.java          | 111 +++++++++++++++++++
 .../handler/RSProcedureHandler.java             |   2 +-
 5 files changed, 152 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/45c6b14c/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto b/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto
index 4f75941..3f836cd 100644
--- a/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto
@@ -146,7 +146,7 @@ message RegionSpaceUseReportRequest {
 message RegionSpaceUseReportResponse {
 }
 
-message ReportProcedureDoneRequest {
+message RemoteProcedureResult {
   required uint64 proc_id = 1;
   enum Status {
     SUCCESS = 1;
@@ -155,6 +155,9 @@ message ReportProcedureDoneRequest {
   required Status status = 2;
   optional ForeignExceptionMessage error = 3;
 }
+message ReportProcedureDoneRequest {
+  repeated RemoteProcedureResult result = 1;
+}
 
 message ReportProcedureDoneResponse {
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/45c6b14c/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 9f71bab..b8e02f5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -265,6 +265,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RemoteProcedureResult;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
@@ -2254,12 +2255,14 @@ public class MasterRpcServices extends RSRpcServices
   @Override
   public ReportProcedureDoneResponse reportProcedureDone(RpcController controller,
       ReportProcedureDoneRequest request) throws ServiceException {
-    if (request.getStatus() == ReportProcedureDoneRequest.Status.SUCCESS) {
-      master.remoteProcedureCompleted(request.getProcId());
-    } else {
-      master.remoteProcedureFailed(request.getProcId(),
-        RemoteProcedureException.fromProto(request.getError()));
-    }
+    request.getResultList().forEach(result -> {
+      if (result.getStatus() == RemoteProcedureResult.Status.SUCCESS) {
+        master.remoteProcedureCompleted(result.getProcId());
+      } else {
+        master.remoteProcedureFailed(result.getProcId(),
+          RemoteProcedureException.fromProto(result.getError()));
+      }
+    });
     return ReportProcedureDoneResponse.getDefaultInstance();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/45c6b14c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 49489d8..50b8378 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -47,11 +47,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
-
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 import javax.servlet.http.HttpServlet;
-
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.commons.lang3.SystemUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -148,7 +146,6 @@ import org.apache.hadoop.hbase.util.CompressionTest;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
 import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
 import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
@@ -177,6 +174,9 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import sun.misc.Signal;
+import sun.misc.SignalHandler;
+
 import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps;
@@ -215,9 +215,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
 
-import sun.misc.Signal;
-import sun.misc.SignalHandler;
-
 /**
  * HRegionServer makes a set of HRegions available to clients. It checks in with
  * the HMaster. There are many HRegionServers in a single HBase deployment.
@@ -381,6 +378,9 @@ public class HRegionServer extends HasThread implements
   // eclipse warning when accessed by inner classes
   protected LogRoller walRoller;
 
+  // A thread which calls reportProcedureDone
+  private RemoteProcedureResultReporter procedureResultReporter;
+
   // flag set after we're done setting up server threads
   final AtomicBoolean online = new AtomicBoolean(false);
 
@@ -1900,6 +1900,7 @@ public class HRegionServer extends HasThread implements
 
     this.walRoller = new LogRoller(this, this);
     this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
+    this.procedureResultReporter = new RemoteProcedureResultReporter(this);
 
     // Create the CompactedFileDischarger chore executorService. This chore helps to
     // remove the compacted files
@@ -1943,6 +1944,8 @@ public class HRegionServer extends HasThread implements
     Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
     uncaughtExceptionHandler);
     this.cacheFlusher.start(uncaughtExceptionHandler);
+    Threads.setDaemonThreadRunning(this.procedureResultReporter,
+      getName() + ".procedureResultReporter", uncaughtExceptionHandler);
 
     if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker);
     if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher);
@@ -3737,55 +3740,26 @@ public class HRegionServer extends HasThread implements
     executorService.submit(new RSProcedureHandler(this, procId, callable));
   }
 
-  public void reportProcedureDone(long procId, Throwable error) {
-    ReportProcedureDoneRequest.Builder builder =
-      ReportProcedureDoneRequest.newBuilder().setProcId(procId);
-    if (error != null) {
-      builder.setStatus(ReportProcedureDoneRequest.Status.ERROR)
-          .setError(ForeignExceptionUtil.toProtoForeignException(serverName.toString(), error));
-    } else {
-      builder.setStatus(ReportProcedureDoneRequest.Status.SUCCESS);
+  public void remoteProcedureComplete(long procId, Throwable error) {
+    procedureResultReporter.complete(procId, error);
+  }
+
+  void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException {
+    RegionServerStatusService.BlockingInterface rss = rssStub;
+    for (;;) {
+      rss = rssStub;
+      if (rss != null) {
+        break;
+      }
+      createRegionServerStatusStub();
     }
-    ReportProcedureDoneRequest request = builder.build();
-    int tries = 0;
-    long pauseTime = INIT_PAUSE_TIME_MS;
-    while (keepLooping()) {
-      RegionServerStatusService.BlockingInterface rss = rssStub;
-      try {
-        if (rss == null) {
-          createRegionServerStatusStub();
-          continue;
-        }
-        rss.reportProcedureDone(null, request);
-        // Log if we had to retry else don't log unless TRACE. We want to
-        // know if were successful after an attempt showed in logs as failed.
-        if (tries > 0 || LOG.isTraceEnabled()) {
-          LOG.info("PROCEDURE REPORTED " + request);
-        }
-        return;
-      } catch (ServiceException se) {
-        IOException ioe = ProtobufUtil.getRemoteException(se);
-        boolean pause =
-          ioe instanceof ServerNotRunningYetException || ioe instanceof PleaseHoldException;
-        if (pause) {
-          // Do backoff else we flood the Master with requests.
-          pauseTime = ConnectionUtils.getPauseTime(INIT_PAUSE_TIME_MS, tries);
-        } else {
-          pauseTime = INIT_PAUSE_TIME_MS; // Reset.
-        }
-        LOG.info(
-          "Failed to report transition " + TextFormat.shortDebugString(request) + "; retry (#" +
-            tries + ")" + (pause ? " after " + pauseTime + "ms delay (Master is coming online...)."
-              : " immediately."),
-          ioe);
-        if (pause) {
-          Threads.sleep(pauseTime);
-        }
-        tries++;
-        if (rssStub == rss) {
-          rssStub = null;
-        }
+    try {
+      rss.reportProcedureDone(null, request);
+    } catch (ServiceException se) {
+      if (rssStub == rss) {
+        rssStub = null;
       }
+      throw ProtobufUtil.getRemoteException(se);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/45c6b14c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java
new file mode 100644
index 0000000..e4be422
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.hadoop.hbase.PleaseHoldException;
+import org.apache.hadoop.hbase.client.ConnectionUtils;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RemoteProcedureResult;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
+
+/**
+ * A thread which calls {@code reportProcedureDone} to tell master the result of a remote procedure.
+ */
+@InterfaceAudience.Private
+class RemoteProcedureResultReporter extends Thread {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RemoteProcedureResultReporter.class);
+
+  // Time to pause if master says 'please hold'. Make configurable if needed.
+  private static final int INIT_PAUSE_TIME_MS = 1000;
+
+  private static final int MAX_BATCH = 100;
+
+  private final HRegionServer server;
+
+  private final LinkedBlockingQueue<RemoteProcedureResult> results = new LinkedBlockingQueue<>();
+
+  public RemoteProcedureResultReporter(HRegionServer server) {
+    this.server = server;
+  }
+
+  public void complete(long procId, Throwable error) {
+    RemoteProcedureResult.Builder builder = RemoteProcedureResult.newBuilder().setProcId(procId);
+    if (error != null) {
+      builder.setStatus(RemoteProcedureResult.Status.ERROR).setError(
+        ForeignExceptionUtil.toProtoForeignException(server.getServerName().toString(), error));
+    } else {
+      builder.setStatus(RemoteProcedureResult.Status.SUCCESS);
+    }
+    results.add(builder.build());
+  }
+
+  @Override
+  public void run() {
+    ReportProcedureDoneRequest.Builder builder = ReportProcedureDoneRequest.newBuilder();
+    int tries = 0;
+    while (!server.isStopped()) {
+      if (builder.getResultCount() == 0) {
+        try {
+          builder.addResult(results.take());
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          continue;
+        }
+      }
+      while (builder.getResultCount() < MAX_BATCH) {
+        RemoteProcedureResult result = results.poll();
+        if (result == null) {
+          break;
+        }
+        builder.addResult(result);
+      }
+      ReportProcedureDoneRequest request = builder.build();
+      try {
+        server.reportProcedureDone(builder.build());
+        builder.clear();
+        tries = 0;
+      } catch (IOException e) {
+        boolean pause =
+          e instanceof ServerNotRunningYetException || e instanceof PleaseHoldException;
+        long pauseTime;
+        if (pause) {
+          // Do backoff else we flood the Master with requests.
+          pauseTime = ConnectionUtils.getPauseTime(INIT_PAUSE_TIME_MS, tries);
+        } else {
+          pauseTime = INIT_PAUSE_TIME_MS; // Reset.
+        }
+        LOG.info("Failed report procedure " + TextFormat.shortDebugString(request) + "; retry (#" +
+          tries + ")" + (pause ? " after " + pauseTime + "ms delay (Master is coming online...)."
+            : " immediately."),
+          e);
+        Threads.sleep(pauseTime);
+        tries++;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/45c6b14c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java
index 240b0a7..d2175d0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java
@@ -49,6 +49,6 @@ public class RSProcedureHandler extends EventHandler {
       LOG.error("Catch exception when call RSProcedureCallable: ", e);
       error = e;
     }
-    ((HRegionServer) server).reportProcedureDone(procId, error);
+    ((HRegionServer) server).remoteProcedureComplete(procId, error);
   }
 }