You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/01/15 10:33:57 UTC
[31/48] hbase git commit: HBASE-19635 Introduce a thread at RS side
to call reportProcedureDone
HBASE-19635 Introduce a thread at RS side to call reportProcedureDone
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8d0b8b9d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8d0b8b9d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8d0b8b9d
Branch: refs/heads/HBASE-19397-branch-2
Commit: 8d0b8b9d1447b723122bad7a801f58071954e44e
Parents: e504f1d
Author: zhangduo <zh...@apache.org>
Authored: Wed Dec 27 20:13:42 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Mon Jan 15 18:27:56 2018 +0800
----------------------------------------------------------------------
.../src/main/protobuf/RegionServerStatus.proto | 5 +-
.../hadoop/hbase/master/MasterRpcServices.java | 15 ++-
.../hbase/regionserver/HRegionServer.java | 72 ++++--------
.../RemoteProcedureResultReporter.java | 111 +++++++++++++++++++
.../handler/RSProcedureHandler.java | 2 +-
5 files changed, 149 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/8d0b8b9d/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto b/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto
index 4f75941..3f836cd 100644
--- a/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto
@@ -146,7 +146,7 @@ message RegionSpaceUseReportRequest {
message RegionSpaceUseReportResponse {
}
-message ReportProcedureDoneRequest {
+message RemoteProcedureResult {
required uint64 proc_id = 1;
enum Status {
SUCCESS = 1;
@@ -155,6 +155,9 @@ message ReportProcedureDoneRequest {
required Status status = 2;
optional ForeignExceptionMessage error = 3;
}
+message ReportProcedureDoneRequest {
+ repeated RemoteProcedureResult result = 1;
+}
message ReportProcedureDoneResponse {
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8d0b8b9d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 72bf2d1..377a9c6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -265,6 +265,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RemoteProcedureResult;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
@@ -2254,12 +2255,14 @@ public class MasterRpcServices extends RSRpcServices
@Override
public ReportProcedureDoneResponse reportProcedureDone(RpcController controller,
ReportProcedureDoneRequest request) throws ServiceException {
- if (request.getStatus() == ReportProcedureDoneRequest.Status.SUCCESS) {
- master.remoteProcedureCompleted(request.getProcId());
- } else {
- master.remoteProcedureFailed(request.getProcId(),
- RemoteProcedureException.fromProto(request.getError()));
- }
+ request.getResultList().forEach(result -> {
+ if (result.getStatus() == RemoteProcedureResult.Status.SUCCESS) {
+ master.remoteProcedureCompleted(result.getProcId());
+ } else {
+ master.remoteProcedureFailed(result.getProcId(),
+ RemoteProcedureException.fromProto(result.getError()));
+ }
+ });
return ReportProcedureDoneResponse.getDefaultInstance();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8d0b8b9d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 9ff6295..516a77e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -146,7 +146,6 @@ import org.apache.hadoop.hbase.util.CompressionTest;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.JvmPauseMonitor;
import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
@@ -379,6 +378,9 @@ public class HRegionServer extends HasThread implements
// eclipse warning when accessed by inner classes
protected LogRoller walRoller;
+ // A thread which calls reportProcedureDone
+ private RemoteProcedureResultReporter procedureResultReporter;
+
// flag set after we're done setting up server threads
final AtomicBoolean online = new AtomicBoolean(false);
@@ -1884,6 +1886,7 @@ public class HRegionServer extends HasThread implements
this.walRoller = new LogRoller(this, this);
this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
+ this.procedureResultReporter = new RemoteProcedureResultReporter(this);
// Create the CompactedFileDischarger chore executorService. This chore helps to
// remove the compacted files
@@ -1927,6 +1930,8 @@ public class HRegionServer extends HasThread implements
Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
uncaughtExceptionHandler);
this.cacheFlusher.start(uncaughtExceptionHandler);
+ Threads.setDaemonThreadRunning(this.procedureResultReporter,
+ getName() + ".procedureResultReporter", uncaughtExceptionHandler);
if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker);
if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher);
@@ -3709,55 +3714,26 @@ public class HRegionServer extends HasThread implements
executorService.submit(new RSProcedureHandler(this, procId, callable));
}
- public void reportProcedureDone(long procId, Throwable error) {
- ReportProcedureDoneRequest.Builder builder =
- ReportProcedureDoneRequest.newBuilder().setProcId(procId);
- if (error != null) {
- builder.setStatus(ReportProcedureDoneRequest.Status.ERROR)
- .setError(ForeignExceptionUtil.toProtoForeignException(serverName.toString(), error));
- } else {
- builder.setStatus(ReportProcedureDoneRequest.Status.SUCCESS);
+ public void remoteProcedureComplete(long procId, Throwable error) {
+ procedureResultReporter.complete(procId, error);
+ }
+
+ void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException {
+ RegionServerStatusService.BlockingInterface rss = rssStub;
+ for (;;) {
+ rss = rssStub;
+ if (rss != null) {
+ break;
+ }
+ createRegionServerStatusStub();
}
- ReportProcedureDoneRequest request = builder.build();
- int tries = 0;
- long pauseTime = INIT_PAUSE_TIME_MS;
- while (keepLooping()) {
- RegionServerStatusService.BlockingInterface rss = rssStub;
- try {
- if (rss == null) {
- createRegionServerStatusStub();
- continue;
- }
- rss.reportProcedureDone(null, request);
- // Log if we had to retry else don't log unless TRACE. We want to
- // know if were successful after an attempt showed in logs as failed.
- if (tries > 0 || LOG.isTraceEnabled()) {
- LOG.info("PROCEDURE REPORTED " + request);
- }
- return;
- } catch (ServiceException se) {
- IOException ioe = ProtobufUtil.getRemoteException(se);
- boolean pause =
- ioe instanceof ServerNotRunningYetException || ioe instanceof PleaseHoldException;
- if (pause) {
- // Do backoff else we flood the Master with requests.
- pauseTime = ConnectionUtils.getPauseTime(INIT_PAUSE_TIME_MS, tries);
- } else {
- pauseTime = INIT_PAUSE_TIME_MS; // Reset.
- }
- LOG.info(
- "Failed to report transition " + TextFormat.shortDebugString(request) + "; retry (#" +
- tries + ")" + (pause ? " after " + pauseTime + "ms delay (Master is coming online...)."
- : " immediately."),
- ioe);
- if (pause) {
- Threads.sleep(pauseTime);
- }
- tries++;
- if (rssStub == rss) {
- rssStub = null;
- }
+ try {
+ rss.reportProcedureDone(null, request);
+ } catch (ServiceException se) {
+ if (rssStub == rss) {
+ rssStub = null;
}
+ throw ProtobufUtil.getRemoteException(se);
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8d0b8b9d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java
new file mode 100644
index 0000000..e4be422
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.hadoop.hbase.PleaseHoldException;
+import org.apache.hadoop.hbase.client.ConnectionUtils;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RemoteProcedureResult;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
+
+/**
+ * A thread which calls {@code reportProcedureDone} to tell master the result of a remote procedure.
+ */
+@InterfaceAudience.Private
+class RemoteProcedureResultReporter extends Thread {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RemoteProcedureResultReporter.class);
+
+ // Time to pause if master says 'please hold'. Make configurable if needed.
+ private static final int INIT_PAUSE_TIME_MS = 1000;
+
+ private static final int MAX_BATCH = 100;
+
+ private final HRegionServer server;
+
+ private final LinkedBlockingQueue<RemoteProcedureResult> results = new LinkedBlockingQueue<>();
+
+ public RemoteProcedureResultReporter(HRegionServer server) {
+ this.server = server;
+ }
+
+ public void complete(long procId, Throwable error) {
+ RemoteProcedureResult.Builder builder = RemoteProcedureResult.newBuilder().setProcId(procId);
+ if (error != null) {
+ builder.setStatus(RemoteProcedureResult.Status.ERROR).setError(
+ ForeignExceptionUtil.toProtoForeignException(server.getServerName().toString(), error));
+ } else {
+ builder.setStatus(RemoteProcedureResult.Status.SUCCESS);
+ }
+ results.add(builder.build());
+ }
+
+ @Override
+ public void run() {
+ ReportProcedureDoneRequest.Builder builder = ReportProcedureDoneRequest.newBuilder();
+ int tries = 0;
+ while (!server.isStopped()) {
+ if (builder.getResultCount() == 0) {
+ try {
+ builder.addResult(results.take());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ continue;
+ }
+ }
+ while (builder.getResultCount() < MAX_BATCH) {
+ RemoteProcedureResult result = results.poll();
+ if (result == null) {
+ break;
+ }
+ builder.addResult(result);
+ }
+ ReportProcedureDoneRequest request = builder.build();
+ try {
+ server.reportProcedureDone(builder.build());
+ builder.clear();
+ tries = 0;
+ } catch (IOException e) {
+ boolean pause =
+ e instanceof ServerNotRunningYetException || e instanceof PleaseHoldException;
+ long pauseTime;
+ if (pause) {
+ // Do backoff else we flood the Master with requests.
+ pauseTime = ConnectionUtils.getPauseTime(INIT_PAUSE_TIME_MS, tries);
+ } else {
+ pauseTime = INIT_PAUSE_TIME_MS; // Reset.
+ }
+ LOG.info("Failed report procedure " + TextFormat.shortDebugString(request) + "; retry (#" +
+ tries + ")" + (pause ? " after " + pauseTime + "ms delay (Master is coming online...)."
+ : " immediately."),
+ e);
+ Threads.sleep(pauseTime);
+ tries++;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8d0b8b9d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java
index 240b0a7..d2175d0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java
@@ -49,6 +49,6 @@ public class RSProcedureHandler extends EventHandler {
LOG.error("Catch exception when call RSProcedureCallable: ", e);
error = e;
}
- ((HRegionServer) server).reportProcedureDone(procId, error);
+ ((HRegionServer) server).remoteProcedureComplete(procId, error);
}
}