You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2019/03/28 09:57:06 UTC
[hbase] 33/49: HBASE-22074 Should use procedure store to persist
the state in reportRegionStateTransition
This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch HBASE-21879
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit ae0b6b6e04e9d65d28de0443ac2b27eaf63c655b
Author: zhangduo <zh...@apache.org>
AuthorDate: Tue Mar 26 11:28:41 2019 +0800
HBASE-22074 Should use procedure store to persist the state in reportRegionStateTransition
---
.../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 32 +--
.../hbase/shaded/protobuf/RequestConverter.java | 28 +--
.../src/main/protobuf/Admin.proto | 3 +-
.../src/main/protobuf/MasterProcedure.proto | 11 ++
.../src/main/protobuf/RegionServerStatus.proto | 1 +
.../hbase/master/assignment/AssignProcedure.java | 6 +-
.../hbase/master/assignment/AssignmentManager.java | 12 +-
.../master/assignment/CloseRegionProcedure.java | 26 ++-
.../master/assignment/OpenRegionProcedure.java | 60 +++++-
.../assignment/RegionRemoteProcedureBase.java | 219 ++++++++++++++++-----
.../assignment/TransitRegionStateProcedure.java | 208 ++++---------------
.../hbase/master/assignment/UnassignProcedure.java | 4 +-
.../master/procedure/RSProcedureDispatcher.java | 54 ++---
.../hadoop/hbase/regionserver/HRegionServer.java | 10 +-
.../hadoop/hbase/regionserver/RSRpcServices.java | 8 +-
.../hbase/regionserver/RegionServerServices.java | 31 ++-
.../hadoop/hbase/regionserver/SplitRequest.java | 5 +-
.../regionserver/handler/AssignRegionHandler.java | 14 +-
.../regionserver/handler/CloseRegionHandler.java | 3 +-
.../regionserver/handler/OpenRegionHandler.java | 14 +-
.../handler/UnassignRegionHandler.java | 15 +-
.../assignment/TestAssignmentManagerBase.java | 7 +
.../assignment/TestCloseRegionWhileRSCrash.java | 14 +-
.../assignment/TestOpenRegionProcedureHang.java | 209 ++++++++++++++++++++
.../procedure/TestServerRemoteProcedure.java | 16 +-
25 files changed, 636 insertions(+), 374 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 15a8c8a..336c59c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -3003,28 +3003,32 @@ public final class ProtobufUtil {
}
/**
- * Create a CloseRegionRequest for a given region name
- *
- * @param regionName the name of the region to close
- * @return a CloseRegionRequest
- */
- public static CloseRegionRequest buildCloseRegionRequest(ServerName server,
- final byte[] regionName) {
- return ProtobufUtil.buildCloseRegionRequest(server, regionName, null);
- }
+ * Create a CloseRegionRequest for a given region name
+ * @param regionName the name of the region to close
+ * @return a CloseRegionRequest
+ */
+ public static CloseRegionRequest buildCloseRegionRequest(ServerName server, byte[] regionName) {
+ return ProtobufUtil.buildCloseRegionRequest(server, regionName, null);
+ }
+
+ public static CloseRegionRequest buildCloseRegionRequest(ServerName server, byte[] regionName,
+ ServerName destinationServer) {
+ return buildCloseRegionRequest(server, regionName, destinationServer, -1);
+ }
- public static CloseRegionRequest buildCloseRegionRequest(ServerName server,
- final byte[] regionName, ServerName destinationServer) {
+ public static CloseRegionRequest buildCloseRegionRequest(ServerName server, byte[] regionName,
+ ServerName destinationServer, long closeProcId) {
CloseRegionRequest.Builder builder = CloseRegionRequest.newBuilder();
- RegionSpecifier region = RequestConverter.buildRegionSpecifier(
- RegionSpecifierType.REGION_NAME, regionName);
+ RegionSpecifier region =
+ RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
- if (destinationServer != null){
+ if (destinationServer != null) {
builder.setDestinationServer(toServerName(destinationServer));
}
if (server != null) {
builder.setServerStartCode(server.getStartcode());
}
+ builder.setCloseProcId(closeProcId);
return builder.build();
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 36c8fab..0c58d4b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -941,27 +941,6 @@ public final class RequestConverter {
}
/**
- * Create a protocol buffer OpenRegionRequest to open a list of regions
- * @param server the serverName for the RPC
- * @param regionOpenInfos info of a list of regions to open
- * @return a protocol buffer OpenRegionRequest
- */
- public static OpenRegionRequest buildOpenRegionRequest(ServerName server,
- final List<Pair<RegionInfo, List<ServerName>>> regionOpenInfos) {
- OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
- for (Pair<RegionInfo, List<ServerName>> regionOpenInfo : regionOpenInfos) {
- builder.addOpenInfo(buildRegionOpenInfo(regionOpenInfo.getFirst(),
- regionOpenInfo.getSecond()));
- }
- if (server != null) {
- builder.setServerStartCode(server.getStartcode());
- }
- // send the master's wall clock time as well, so that the RS can refer to it
- builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
- return builder.build();
- }
-
- /**
* Create a protocol buffer OpenRegionRequest for a given region
* @param server the serverName for the RPC
* @param region the region to open
@@ -971,7 +950,7 @@ public final class RequestConverter {
public static OpenRegionRequest buildOpenRegionRequest(ServerName server,
final RegionInfo region, List<ServerName> favoredNodes) {
OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
- builder.addOpenInfo(buildRegionOpenInfo(region, favoredNodes));
+ builder.addOpenInfo(buildRegionOpenInfo(region, favoredNodes, -1L));
if (server != null) {
builder.setServerStartCode(server.getStartcode());
}
@@ -1622,8 +1601,8 @@ public final class RequestConverter {
/**
* Create a RegionOpenInfo based on given region info and version of offline node
*/
- public static RegionOpenInfo buildRegionOpenInfo(
- final RegionInfo region, final List<ServerName> favoredNodes) {
+ public static RegionOpenInfo buildRegionOpenInfo(RegionInfo region, List<ServerName> favoredNodes,
+ long openProcId) {
RegionOpenInfo.Builder builder = RegionOpenInfo.newBuilder();
builder.setRegion(ProtobufUtil.toRegionInfo(region));
if (favoredNodes != null) {
@@ -1631,6 +1610,7 @@ public final class RequestConverter {
builder.addFavoredNodes(ProtobufUtil.toServerName(server));
}
}
+ builder.setOpenProcId(openProcId);
return builder.build();
}
diff --git a/hbase-protocol-shaded/src/main/protobuf/Admin.proto b/hbase-protocol-shaded/src/main/protobuf/Admin.proto
index c622d58..85b9113 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Admin.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Admin.proto
@@ -88,6 +88,7 @@ message OpenRegionRequest {
repeated ServerName favored_nodes = 3;
// open region for distributedLogReplay
// optional bool DEPRECATED_openForDistributedLogReplay = 4;
+ optional int64 open_proc_id = 5 [default = -1];
}
}
@@ -102,7 +103,6 @@ message OpenRegionResponse {
}
message WarmupRegionRequest {
-
required RegionInfo regionInfo = 1;
}
@@ -120,6 +120,7 @@ message CloseRegionRequest {
optional ServerName destination_server = 4;
// the intended server for this RPC.
optional uint64 serverStartCode = 5;
+ optional int64 close_proc_id = 6 [default = -1];
}
message CloseRegionResponse {
diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
index 64ac398..d5a390c 100644
--- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
@@ -28,6 +28,7 @@ import "HBase.proto";
import "RPC.proto";
import "Snapshot.proto";
import "Replication.proto";
+import "RegionServerStatus.proto";
// ============================================================================
// WARNING - Compatibility rules
@@ -548,9 +549,19 @@ message RegionStateTransitionStateData {
required bool force_new_plan = 3;
}
+enum RegionRemoteProcedureBaseState {
+ REGION_REMOTE_PROCEDURE_DISPATCH = 1;
+ REGION_REMOTE_PROCEDURE_REPORT_SUCCEED = 2;
+ REGION_REMOTE_PROCEDURE_DISPATCH_FAIL = 3;
+ REGION_REMOTE_PROCEDURE_SERVER_CRASH = 4;
+}
+
message RegionRemoteProcedureBaseStateData {
required RegionInfo region = 1;
required ServerName target_server = 2;
+ required RegionRemoteProcedureBaseState state = 3;
+ optional RegionStateTransition.TransitionCode transition_code = 4;
+ optional int64 seq_id = 5;
}
message OpenRegionProcedureStateData {
diff --git a/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto b/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto
index 002432a..0137cb1 100644
--- a/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto
@@ -96,6 +96,7 @@ message RegionStateTransition {
/** For newly opened region, the open seq num is needed */
optional uint64 open_seq_num = 3;
+ repeated int64 proc_id = 4;
enum TransitionCode {
OPENED = 0;
FAILED_OPEN = 1;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java
index 33a3545..35510d6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionOpenOperation;
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
@@ -38,7 +37,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
* @deprecated Do not use any more.
* @see TransitRegionStateProcedure
*/
-// TODO: Add being able to assign a region to open read-only.
@Deprecated
@InterfaceAudience.Private
public class AssignProcedure extends RegionTransitionProcedure {
@@ -121,9 +119,7 @@ public class AssignProcedure extends RegionTransitionProcedure {
@Override
public RemoteOperation remoteCallBuild(final MasterProcedureEnv env,
final ServerName serverName) {
- assert serverName.equals(getRegionState(env).getRegionLocation());
- return new RegionOpenOperation(this, getRegionInfo(),
- env.getAssignmentManager().getFavoredNodes(getRegionInfo()), false);
+ return null;
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
index 2d0c3be..5e43637 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
@@ -835,8 +835,10 @@ public class AssignmentManager {
case CLOSED:
assert transition.getRegionInfoCount() == 1 : transition;
final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
+ long procId =
+ transition.getProcIdCount() > 0 ? transition.getProcId(0) : Procedure.NO_PROC_ID;
updateRegionTransition(serverName, transition.getTransitionCode(), hri,
- transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM);
+ transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM, procId);
break;
case READY_TO_SPLIT:
case SPLIT:
@@ -903,7 +905,7 @@ public class AssignmentManager {
}
private void updateRegionTransition(ServerName serverName, TransitionCode state,
- RegionInfo regionInfo, long seqId) throws IOException {
+ RegionInfo regionInfo, long seqId, long procId) throws IOException {
checkMetaLoaded(regionInfo);
RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
@@ -919,7 +921,7 @@ public class AssignmentManager {
ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
regionNode.lock();
try {
- if (!reportTransition(regionNode, serverNode, state, seqId)) {
+ if (!reportTransition(regionNode, serverNode, state, seqId, procId)) {
// Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages:
// 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for
// rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958,
@@ -941,14 +943,14 @@ public class AssignmentManager {
}
private boolean reportTransition(RegionStateNode regionNode, ServerStateNode serverNode,
- TransitionCode state, long seqId) throws IOException {
+ TransitionCode state, long seqId, long procId) throws IOException {
ServerName serverName = serverNode.getServerName();
TransitRegionStateProcedure proc = regionNode.getProcedure();
if (proc == null) {
return false;
}
proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(), regionNode,
- serverName, state, seqId);
+ serverName, state, seqId, procId);
return true;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/CloseRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/CloseRegionProcedure.java
index f867e96..4fd60f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/CloseRegionProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/CloseRegionProcedure.java
@@ -20,15 +20,17 @@ package org.apache.hadoop.hbase.master.assignment;
import java.io.IOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionCloseOperation;
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
import org.apache.yetus.audience.InterfaceAudience;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.CloseRegionProcedureStateData;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
/**
* The remote procedure used to close a region.
@@ -46,9 +48,9 @@ public class CloseRegionProcedure extends RegionRemoteProcedureBase {
super();
}
- public CloseRegionProcedure(RegionInfo region, ServerName targetServer,
- ServerName assignCandidate) {
- super(region, targetServer);
+ public CloseRegionProcedure(TransitRegionStateProcedure parent, RegionInfo region,
+ ServerName targetServer, ServerName assignCandidate) {
+ super(parent, region, targetServer);
this.assignCandidate = assignCandidate;
}
@@ -59,7 +61,7 @@ public class CloseRegionProcedure extends RegionRemoteProcedureBase {
@Override
public RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
- return new RegionCloseOperation(this, region, assignCandidate);
+ return new RegionCloseOperation(this, region, getProcId(), assignCandidate);
}
@Override
@@ -88,7 +90,17 @@ public class CloseRegionProcedure extends RegionRemoteProcedureBase {
}
@Override
- protected boolean shouldDispatch(RegionStateNode regionNode) {
- return regionNode.isInState(RegionState.State.CLOSING);
+ protected void reportTransition(RegionStateNode regionNode, TransitionCode transitionCode,
+ long seqId) throws IOException {
+ if (transitionCode != TransitionCode.CLOSED) {
+ throw new UnexpectedStateException("Received report unexpected " + transitionCode +
+ " transition, " + regionNode.toShortString() + ", " + this + ", expected CLOSED.");
+ }
+ }
+
+ @Override
+ protected void updateTransition(MasterProcedureEnv env, RegionStateNode regionNode,
+ TransitionCode transitionCode, long seqId) throws IOException {
+ env.getAssignmentManager().regionClosed(regionNode, true);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/OpenRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/OpenRegionProcedure.java
index 4b3a976..579b757 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/OpenRegionProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/OpenRegionProcedure.java
@@ -20,15 +20,18 @@ package org.apache.hadoop.hbase.master.assignment;
import java.io.IOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionOpenOperation;
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.OpenRegionProcedureStateData;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
/**
* The remote procedure used to open a region.
@@ -36,12 +39,15 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.O
@InterfaceAudience.Private
public class OpenRegionProcedure extends RegionRemoteProcedureBase {
+ private static final Logger LOG = LoggerFactory.getLogger(OpenRegionProcedure.class);
+
public OpenRegionProcedure() {
super();
}
- public OpenRegionProcedure(RegionInfo region, ServerName targetServer) {
- super(region, targetServer);
+ public OpenRegionProcedure(TransitRegionStateProcedure parent, RegionInfo region,
+ ServerName targetServer) {
+ super(parent, region, targetServer);
}
@Override
@@ -51,8 +57,7 @@ public class OpenRegionProcedure extends RegionRemoteProcedureBase {
@Override
public RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
- return new RegionOpenOperation(this, region, env.getAssignmentManager().getFavoredNodes(region),
- false);
+ return new RegionOpenOperation(this, region, getProcId());
}
@Override
@@ -73,7 +78,48 @@ public class OpenRegionProcedure extends RegionRemoteProcedureBase {
}
@Override
- protected boolean shouldDispatch(RegionStateNode regionNode) {
- return regionNode.isInState(RegionState.State.OPENING);
+ protected void reportTransition(RegionStateNode regionNode, TransitionCode transitionCode,
+ long seqId) throws IOException {
+ switch (transitionCode) {
+ case OPENED:
+ // this is the openSeqNum
+ if (seqId < 0) {
+ throw new UnexpectedStateException("Received report unexpected " + TransitionCode.OPENED +
+ " transition openSeqNum=" + seqId + ", " + regionNode + ", proc=" + this);
+ }
+ break;
+ case FAILED_OPEN:
+ // nothing to check
+ break;
+ default:
+ throw new UnexpectedStateException(
+ "Received report unexpected " + transitionCode + " transition, " +
+ regionNode.toShortString() + ", " + this + ", expected OPENED or FAILED_OPEN.");
+ }
+ }
+
+ @Override
+ protected void updateTransition(MasterProcedureEnv env, RegionStateNode regionNode,
+ TransitionCode transitionCode, long openSeqNum) throws IOException {
+ switch (transitionCode) {
+ case OPENED:
+ if (openSeqNum < regionNode.getOpenSeqNum()) {
+ LOG.warn(
+ "Received report {} transition from {} for {}, pid={} but the new openSeqNum {}" +
+ " is less than the current one {}, ignoring...",
+ transitionCode, targetServer, regionNode, getProcId(), openSeqNum,
+ regionNode.getOpenSeqNum());
+ } else {
+ regionNode.setOpenSeqNum(openSeqNum);
+ }
+ env.getAssignmentManager().regionOpened(regionNode);
+ break;
+ case FAILED_OPEN:
+ env.getAssignmentManager().regionFailedOpen(regionNode, false);
+ break;
+ default:
+ throw new UnexpectedStateException("Unexpected transition code: " + transitionCode);
+ }
+
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java
index f6d3a2e..4a6f375 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java
@@ -18,9 +18,11 @@
package org.apache.hadoop.hbase.master.assignment;
import java.io.IOException;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
@@ -28,6 +30,7 @@ import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
@@ -36,7 +39,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionRemoteProcedureBaseState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionRemoteProcedureBaseStateData;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
/**
* The base class for the remote procedures used to open/close a region.
@@ -53,16 +59,25 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
protected RegionInfo region;
- private ServerName targetServer;
+ protected ServerName targetServer;
- private boolean dispatched;
+ private RegionRemoteProcedureBaseState state =
+ RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH;
+
+ private TransitionCode transitionCode;
+
+ private long seqId;
+
+ private int attempt;
protected RegionRemoteProcedureBase() {
}
- protected RegionRemoteProcedureBase(RegionInfo region, ServerName targetServer) {
+ protected RegionRemoteProcedureBase(TransitRegionStateProcedure parent, RegionInfo region,
+ ServerName targetServer) {
this.region = region;
this.targetServer = targetServer;
+ parent.attachRemoteProc(this);
}
@Override
@@ -86,22 +101,26 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
RegionStateNode regionNode = getRegionNode(env);
regionNode.lock();
try {
- LOG.warn("The remote operation {} for region {} to server {} failed", this, region,
- targetServer, exception);
- // This could happen as the RSProcedureDispatcher and dead server processor are executed in
- // different threads. It is possible that we have already scheduled SCP for the targetServer
- // and woken up this procedure, and assigned the region to another RS, and then the
- // RSProcedureDispatcher notices that the targetServer is dead so it can not send the request
- // out and call remoteCallFailed, which makes us arrive here, especially that if the target
- // machine is completely down, which means you can only receive a ConnectionTimeout after a
- // very long time(depends on the timeout settings and in HBase usually it will be at least 15
- // seconds, or even 1 minute). So here we need to check whether we are stilling waiting on the
- // given event, if not, this means that we have already been woken up so do not wake it up
- // again.
- if (!regionNode.getProcedureEvent().wakeIfSuspended(env.getProcedureScheduler(), this)) {
- LOG.warn("{} is not waiting on the event for region {}, targer server = {}, ignore.", this,
- region, targetServer);
+ if (!env.getMasterServices().getServerManager().isServerOnline(remote)) {
+ // the SCP will interrupt us, give up
+ LOG.debug("{} for region {}, targetServer {} is dead, SCP will interrupt us, give up", this,
+ regionNode, remote);
+ return;
}
+ if (state != RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH) {
+ // not sure how can this happen but anyway let's add a check here to avoid waking the wrong
+ // procedure...
+ LOG.warn("{} for region {}, targetServer={} has already been woken up, ignore", this,
+ regionNode, remote);
+ return;
+ }
+ LOG.warn("The remote operation {} for region {} to server {} failed", this, regionNode,
+ remote, exception);
+ // It is OK to not persist the state here, as we do not need to change the region state if the
+ // remote call is failed. If the master crashed before we actually execute the procedure and
+ // persist the new state, it is fine to retry on the same target server again.
+ state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH_FAIL;
+ regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
} finally {
regionNode.unlock();
}
@@ -133,46 +152,127 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
return false;
}
- /**
- * Check whether we still need to make the call to RS.
- * <p/>
- * This could happen when master restarts. Since we do not know whether a request has already been
- * sent to the region server after we add a remote operation to the dispatcher, so the safe way is
- * to not persist the dispatched field and try to add the remote operation again. But it is
- * possible that we do have already sent the request to region server and it has also sent back
- * the response, so here we need to check the region state, if it is not in the expecting state,
- * we should give up, otherwise we may hang for ever, as the region server will just ignore
- * redundant calls.
- */
- protected abstract boolean shouldDispatch(RegionStateNode regionNode);
+ // do some checks to see if the report is valid, without actually updating meta.
+ protected abstract void reportTransition(RegionStateNode regionNode,
+ TransitionCode transitionCode, long seqId) throws IOException;
+
+ // A bit strange but the procedure store will throw RuntimeException if we can not persist the
+ // state, so upper layer should take care of this...
+ private void persistAndWake(MasterProcedureEnv env, RegionStateNode regionNode) {
+ env.getMasterServices().getMasterProcedureExecutor().getStore().update(this);
+ regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
+ }
+
+ // should be called with RegionStateNode locked, to avoid race with the execute method below
+ void reportTransition(MasterProcedureEnv env, RegionStateNode regionNode, ServerName serverName,
+ TransitionCode transitionCode, long seqId) throws IOException {
+ if (state != RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH) {
+ // should be a retry
+ return;
+ }
+ if (!targetServer.equals(serverName)) {
+ throw new UnexpectedStateException("Received report from " + serverName + ", expected " +
+ targetServer + ", " + regionNode + ", proc=" + this);
+ }
+ reportTransition(regionNode, transitionCode, seqId);
+ // this state means we have received the report from RS, does not mean the result is fine, as we
+ // may received a FAILED_OPEN.
+ this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_REPORT_SUCCEED;
+ this.transitionCode = transitionCode;
+ this.seqId = seqId;
+ // Persist the transition code and openSeqNum(if provided).
+ // We should not update the hbase:meta directly as this may cause races when master restarts,
+ // as the old active master may incorrectly report back to RS and cause the new master to hang
+ // on a OpenRegionProcedure forever. See HBASE-22060 and HBASE-22074 for more details.
+ boolean succ = false;
+ try {
+ persistAndWake(env, regionNode);
+ succ = true;
+ } finally {
+ if (!succ) {
+ this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH;
+ this.transitionCode = null;
+ this.seqId = HConstants.NO_SEQNUM;
+ }
+ }
+ }
+
+ void serverCrashed(MasterProcedureEnv env, RegionStateNode regionNode, ServerName serverName) {
+ if (state != RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH) {
+ // should be a retry
+ return;
+ }
+ this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_SERVER_CRASH;
+ boolean succ = false;
+ try {
+ persistAndWake(env, regionNode);
+ succ = true;
+ } finally {
+ if (!succ) {
+ this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH;
+ }
+ }
+ }
+
+ private TransitRegionStateProcedure getParent(MasterProcedureEnv env) {
+ return (TransitRegionStateProcedure) env.getMasterServices().getMasterProcedureExecutor()
+ .getProcedure(getParentProcId());
+ }
+
+ private void unattach(MasterProcedureEnv env) {
+ getParent(env).unattachRemoteProc(this);
+ }
+
+ // actually update the state to meta
+ protected abstract void updateTransition(MasterProcedureEnv env, RegionStateNode regionNode,
+ TransitionCode transitionCode, long seqId) throws IOException;
@Override
protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
- if (dispatched) {
- // we are done, the parent procedure will check whether we are succeeded.
- return null;
- }
RegionStateNode regionNode = getRegionNode(env);
regionNode.lock();
try {
- if (!shouldDispatch(regionNode)) {
- return null;
+ switch (state) {
+ case REGION_REMOTE_PROCEDURE_DISPATCH: {
+ // The code which wakes us up also needs to lock the RSN so here we do not need to
+ // synchronize
+ // on the event.
+ ProcedureEvent<?> event = regionNode.getProcedureEvent();
+ try {
+ env.getRemoteDispatcher().addOperationToNode(targetServer, this);
+ } catch (FailedRemoteDispatchException e) {
+ LOG.warn("Can not add remote operation {} for region {} to server {}, this usually " +
+ "because the server is alread dead, give up and mark the procedure as complete, " +
+ "the parent procedure will take care of this.", this, region, targetServer, e);
+ unattach(env);
+ return null;
+ }
+ event.suspend();
+ event.suspendIfNotReady(this);
+ throw new ProcedureSuspendedException();
+ }
+ case REGION_REMOTE_PROCEDURE_REPORT_SUCCEED:
+ updateTransition(env, regionNode, transitionCode, seqId);
+ unattach(env);
+ return null;
+ case REGION_REMOTE_PROCEDURE_DISPATCH_FAIL:
+ // the remote call is failed so we do not need to change the region state, just return.
+ unattach(env);
+ return null;
+ case REGION_REMOTE_PROCEDURE_SERVER_CRASH:
+ env.getAssignmentManager().regionClosed(regionNode, false);
+ unattach(env);
+ return null;
+ default:
+ throw new IllegalStateException("Unknown state: " + state);
}
- // The code which wakes us up also needs to lock the RSN so here we do not need to synchronize
- // on the event.
- ProcedureEvent<?> event = regionNode.getProcedureEvent();
- try {
- env.getRemoteDispatcher().addOperationToNode(targetServer, this);
- } catch (FailedRemoteDispatchException e) {
- LOG.warn("Can not add remote operation {} for region {} to server {}, this usually " +
- "because the server is alread dead, give up and mark the procedure as complete, " +
- "the parent procedure will take care of this.", this, region, targetServer, e);
- return null;
- }
- dispatched = true;
- event.suspend();
- event.suspendIfNotReady(this);
+ } catch (IOException e) {
+ long backoff = ProcedureUtil.getBackoffTimeMs(this.attempt++);
+ LOG.warn("Failed updating meta, suspend {}secs {}; {};", backoff / 1000, this, regionNode, e);
+ setTimeout(Math.toIntExact(backoff));
+ setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
+ skipPersistence();
throw new ProcedureSuspendedException();
} finally {
regionNode.unlock();
@@ -186,9 +286,14 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
- serializer.serialize(
+ RegionRemoteProcedureBaseStateData.Builder builder =
RegionRemoteProcedureBaseStateData.newBuilder().setRegion(ProtobufUtil.toRegionInfo(region))
- .setTargetServer(ProtobufUtil.toServerName(targetServer)).build());
+ .setTargetServer(ProtobufUtil.toServerName(targetServer)).setState(state);
+ if (transitionCode != null) {
+ builder.setTransitionCode(transitionCode);
+ builder.setSeqId(seqId);
+ }
+ serializer.serialize(builder.build());
}
@Override
@@ -197,5 +302,15 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur
serializer.deserialize(RegionRemoteProcedureBaseStateData.class);
region = ProtobufUtil.toRegionInfo(data.getRegion());
targetServer = ProtobufUtil.toServerName(data.getTargetServer());
+ state = data.getState();
+ if (data.hasTransitionCode()) {
+ transitionCode = data.getTransitionCode();
+ seqId = data.getSeqId();
+ }
+ }
+
+ @Override
+ protected void afterReplay(MasterProcedureEnv env) {
+ getParent(env).attachRemoteProc(this);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java
index d3429b5..1be7a9a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java
@@ -17,9 +17,6 @@
*/
package org.apache.hadoop.hbase.master.assignment;
-import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED;
-import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED;
-
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseIOException;
@@ -28,7 +25,6 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
-import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
import org.apache.hadoop.hbase.master.MetricsAssignmentManager;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.procedure.AbstractStateMachineRegionProcedure;
@@ -121,6 +117,8 @@ public class TransitRegionStateProcedure
private int attempt;
+ private RegionRemoteProcedureBase remoteProc;
+
public TransitRegionStateProcedure() {
}
@@ -143,6 +141,7 @@ public class TransitRegionStateProcedure
throw new IllegalArgumentException("Unknown TransitionType: " + type);
}
}
+
@VisibleForTesting
protected TransitRegionStateProcedure(MasterProcedureEnv env, RegionInfo hri,
ServerName assignCandidate, boolean forceNewPlan, TransitionType type) {
@@ -204,21 +203,18 @@ public class TransitRegionStateProcedure
return;
}
env.getAssignmentManager().regionOpening(regionNode);
- addChildProcedure(new OpenRegionProcedure(getRegion(), loc));
+ addChildProcedure(new OpenRegionProcedure(this, getRegion(), loc));
setNextState(RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED);
}
private Flow confirmOpened(MasterProcedureEnv env, RegionStateNode regionNode)
throws IOException {
- // notice that, for normal case, if we successfully opened a region, we will not arrive here, as
- // in reportTransition we will call unsetProcedure, and in executeFromState we will return
- // directly. But if the master is crashed before we finish the procedure, then next time we will
- // arrive here. So we still need to add code for normal cases.
if (regionNode.isInState(State.OPEN)) {
attempt = 0;
if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED) {
// we are the last state, finish
regionNode.unsetProcedure(this);
+ ServerCrashProcedure.updateProgress(env, getParentProcId());
return Flow.NO_MORE_STATE;
}
// It is possible that we arrive here but confirm opened is not the last state, for example,
@@ -250,8 +246,8 @@ public class TransitRegionStateProcedure
if (regionNode.isInState(State.OPEN, State.CLOSING, State.MERGING, State.SPLITTING)) {
// this is the normal case
env.getAssignmentManager().regionClosing(regionNode);
- addChildProcedure(
- new CloseRegionProcedure(getRegion(), regionNode.getRegionLocation(), assignCandidate));
+ addChildProcedure(new CloseRegionProcedure(this, getRegion(), regionNode.getRegionLocation(),
+ assignCandidate));
setNextState(RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED);
} else {
forceNewPlan = true;
@@ -262,10 +258,6 @@ public class TransitRegionStateProcedure
private Flow confirmClosed(MasterProcedureEnv env, RegionStateNode regionNode)
throws IOException {
- // notice that, for normal case, if we successfully opened a region, we will not arrive here, as
- // in reportTransition we will call unsetProcedure, and in executeFromState we will return
- // directly. But if the master is crashed before we finish the procedure, then next time we will
- // arrive here. So we still need to add code for normal cases.
if (regionNode.isInState(State.CLOSED)) {
attempt = 0;
if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED) {
@@ -324,14 +316,6 @@ public class TransitRegionStateProcedure
protected Flow executeFromState(MasterProcedureEnv env, RegionStateTransitionState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
RegionStateNode regionNode = getRegionStateNode(env);
- if (regionNode.getProcedure() != this) {
- // This is possible, and is the normal case, as we will call unsetProcedure in
- // reportTransition, this means we have already done
- // This is because that, when we mark the region as OPENED or CLOSED, then all the works
- // should have already been done, and logically we could have another TRSP scheduled for this
- // region immediately(think of a RS crash at the point...).
- return Flow.NO_MORE_STATE;
- }
try {
switch (state) {
case REGION_STATE_TRANSITION_GET_ASSIGN_CANDIDATE:
@@ -373,171 +357,47 @@ public class TransitRegionStateProcedure
return false; // 'false' means that this procedure handled the timeout
}
- private boolean isOpening(RegionStateNode regionNode, ServerName serverName,
- TransitionCode code) {
- if (!regionNode.isInState(State.OPENING)) {
- LOG.warn("Received report {} transition from {} for {}, pid={}, but the region is not in" +
- " OPENING state, should be a retry, ignore", code, serverName, regionNode, getProcId());
- return false;
- }
- if (getCurrentState() != REGION_STATE_TRANSITION_CONFIRM_OPENED) {
+ // Should be called with RegionStateNode locked
+ public void reportTransition(MasterProcedureEnv env, RegionStateNode regionNode,
+ ServerName serverName, TransitionCode code, long seqId, long procId) throws IOException {
+ if (remoteProc == null) {
LOG.warn(
- "Received report {} transition from {} for {}, pid={}," +
- " but the TRSP is not in {} state, should be a retry, ignore",
- code, serverName, regionNode, getProcId(), REGION_STATE_TRANSITION_CONFIRM_OPENED);
- return false;
- }
- return true;
- }
-
- private void reportTransitionOpen(MasterProcedureEnv env, RegionStateNode regionNode,
- ServerName serverName, long openSeqNum) throws IOException {
- if (!isOpening(regionNode, serverName, TransitionCode.OPENED)) {
+ "There is no outstanding remote region procedure for {}, serverName={}, code={}," +
+ " seqId={}, proc={}, should be a retry, ignore",
+ regionNode, serverName, code, seqId, this);
return;
}
- if (openSeqNum < 0) {
- throw new UnexpectedStateException("Received report unexpected " + TransitionCode.OPENED +
- " transition openSeqNum=" + openSeqNum + ", " + regionNode + ", proc=" + this);
- }
- if (openSeqNum < regionNode.getOpenSeqNum()) {
- // use the openSeqNum as a fence, if this is not a retry, then the openSeqNum should be
- // greater than or equal to the existing one.
+ // The procId could be -1 if it is from an old region server, we need to deal with it so that we
+ // can do rolling upgraing.
+ if (procId >= 0 && remoteProc.getProcId() != procId) {
LOG.warn(
- "Received report {} transition from {} for {}, pid={} but the new openSeqNum {}" +
- " is less than the current one {}, should be a retry, ignore",
- TransitionCode.OPENED, serverName, regionNode, getProcId(), openSeqNum,
- regionNode.getOpenSeqNum());
- return;
- }
- // notice that it is possible for a region to still have the same openSeqNum if it crashes and
- // we haven't written anything into it. That's why we can not just change the above condition
- // from '<' to '<='. So here we still need to check whether the serverName
- // matches, to determine whether this is a retry when the openSeqNum is not changed.
- if (!regionNode.getRegionLocation().equals(serverName)) {
- LOG.warn("Received report {} transition from {} for {}, pid={} but the region is not on it," +
- " should be a retry, ignore", TransitionCode.OPENED, serverName, regionNode, getProcId());
+ "The pid of remote region procedure for {} is {}, the reported pid={}, serverName={}," +
+ " code={}, seqId={}, proc={}, should be a retry, ignore",
+ regionNode, remoteProc.getProcId(), procId, serverName, code, seqId, this);
return;
}
- regionNode.setOpenSeqNum(openSeqNum);
- env.getAssignmentManager().regionOpened(regionNode);
- if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED) {
- // if parent procedure is ServerCrashProcedure, update progress
- ServerCrashProcedure.updateProgress(env, getParentProcId());
- // we are done
- regionNode.unsetProcedure(this);
- }
- regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
+ remoteProc.reportTransition(env, regionNode, serverName, code, seqId);
}
- private void reportTransitionFailedOpen(MasterProcedureEnv env, RegionStateNode regionNode,
- ServerName serverName) {
- if (!isOpening(regionNode, serverName, TransitionCode.FAILED_OPEN)) {
- return;
- }
- // there is no openSeqNum for FAILED_OPEN, so we will check the target server instead
- if (!regionNode.getRegionLocation().equals(serverName)) {
- LOG.warn(
- "Received report {} transition from {} for {}, pid={}," +
- " but the region is not on it, should be a retry, ignore",
- TransitionCode.FAILED_OPEN, regionNode, serverName, getProcId());
- return;
- }
- // just wake up the procedure and see if we can retry
- // Notice that, even if we arrive here, this call could still be a retry, as we may retry
- // opening on the same server again. And the assumption here is that, once the region state is
- // OPENING, and the TRSP state is REGION_STATE_TRANSITION_CONFIRM_OPENED, the TRSP must have
- // been suspended on the procedure event, so after the waking operation here, the TRSP will be
- // executed and try to schedule new OpenRegionProcedure again. Once there is a successful open
- // then we are done, so the TRSP will not be stuck.
- // TODO: maybe we could send the procedure id of the OpenRegionProcedure to the region server
- // and let the region server send it back when done, so it will be easy to detect whether this
- // is a retry.
- regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
- }
-
- // we do not need seqId for closing a region
- private void reportTransitionClosed(MasterProcedureEnv env, RegionStateNode regionNode,
+ // Should be called with RegionStateNode locked
+ public void serverCrashed(MasterProcedureEnv env, RegionStateNode regionNode,
ServerName serverName) throws IOException {
- if (!regionNode.isInState(State.CLOSING)) {
- LOG.warn(
- "Received report {} transition from {} for {}, pid={}" +
- ", but the region is not in CLOSING state, should be a retry, ignore",
- TransitionCode.CLOSED, serverName, regionNode, getProcId());
- return;
- }
- if (getCurrentState() != REGION_STATE_TRANSITION_CONFIRM_CLOSED) {
- LOG.warn(
- "Received report {} transition from {} for {}, pid={} but the proc is not in {}" +
- " state, should be a retry, ignore",
- TransitionCode.CLOSED, serverName, regionNode, getProcId(),
- REGION_STATE_TRANSITION_CONFIRM_CLOSED);
- return;
- }
- if (!regionNode.getRegionLocation().equals(serverName)) {
- LOG.warn(
- "Received report {} transition from {} for {}, pid={}," +
- " but the region is not on it, should be a retry, ignore",
- TransitionCode.CLOSED, serverName, regionNode, getProcId());
- return;
- }
- env.getAssignmentManager().regionClosed(regionNode, true);
- if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED) {
- // we are done
- regionNode.unsetProcedure(this);
+ if (remoteProc != null) {
+ // this means we are waiting for the sub procedure, so wake it up
+ remoteProc.serverCrashed(env, regionNode, serverName);
+ } else {
+ // we are in RUNNING state, just update the region state, and we will process it later.
+ env.getAssignmentManager().regionClosed(regionNode, false);
}
- regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
}
- // Should be called with RegionStateNode locked
- public void reportTransition(MasterProcedureEnv env, RegionStateNode regionNode,
- ServerName serverName, TransitionCode code, long seqId) throws IOException {
- // It is possible that the previous reportRegionStateTransition call was succeeded at master
- // side, but before returning the result to region server, the rpc connection was broken, or the
- // master restarted. The region server will try calling reportRegionStateTransition again under
- // this scenario, so here we need to check whether this is a retry.
- switch (code) {
- case OPENED:
- reportTransitionOpen(env, regionNode, serverName, seqId);
- break;
- case FAILED_OPEN:
- reportTransitionFailedOpen(env, regionNode, serverName);
- break;
- case CLOSED:
- reportTransitionClosed(env, regionNode, serverName);
- break;
- default:
- throw new UnexpectedStateException("Received report unexpected " + code + " transition, " +
- regionNode.toShortString() + ", " + this + ", expected OPENED or FAILED_OPEN or CLOSED.");
- }
+ void attachRemoteProc(RegionRemoteProcedureBase proc) {
+ this.remoteProc = proc;
}
- // Should be called with RegionStateNode locked
- public void serverCrashed(MasterProcedureEnv env, RegionStateNode regionNode,
- ServerName serverName) throws IOException {
- // Notice that, in this method, we do not change the procedure state, instead, we update the
- // region state in hbase:meta. This is because that, the procedure state change will not be
- // persisted until the region is woken up and finish one step, if we crash before that then the
- // information will be lost. So here we will update the region state in hbase:meta, and when the
- // procedure is woken up, it will process the error and jump to the correct procedure state.
- RegionStateTransitionState currentState = getCurrentState();
- switch (currentState) {
- case REGION_STATE_TRANSITION_CLOSE:
- case REGION_STATE_TRANSITION_CONFIRM_CLOSED:
- case REGION_STATE_TRANSITION_CONFIRM_OPENED:
- // for these 3 states, the region may still be online on the crashed server
- env.getAssignmentManager().regionClosed(regionNode, false);
- if (currentState != RegionStateTransitionState.REGION_STATE_TRANSITION_CLOSE) {
- regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
- }
- break;
- default:
- // If the procedure is in other 2 states, then actually we should not arrive here, as we
- // know that the region is not online on any server, so we need to do nothing... But anyway
- // let's add a log here
- LOG.warn("{} received unexpected server crash call for region {} from {}", this, regionNode,
- serverName);
-
- }
+ void unattachRemoteProc(RegionRemoteProcedureBase proc) {
+ assert this.remoteProc == proc;
+ this.remoteProc = null;
}
private boolean incrementAndCheckMaxAttempts(MasterProcedureEnv env, RegionStateNode regionNode) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java
index def8fd5..6f5c4af 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionCloseOperation;
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
@@ -129,8 +128,7 @@ public class UnassignProcedure extends RegionTransitionProcedure {
@Override
public RemoteOperation remoteCallBuild(final MasterProcedureEnv env,
final ServerName serverName) {
- assert serverName.equals(getRegionState(env).getRegionLocation());
- return new RegionCloseOperation(this, getRegionInfo(), this.destinationServer);
+ return null;
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
index b8ba7b3..b469cb8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
@@ -399,54 +399,36 @@ public class RSProcedureDispatcher
}
public static abstract class RegionOperation extends RemoteOperation {
- private final RegionInfo regionInfo;
+ protected final RegionInfo regionInfo;
+ protected final long procId;
- protected RegionOperation(final RemoteProcedure remoteProcedure,
- final RegionInfo regionInfo) {
+ protected RegionOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId) {
super(remoteProcedure);
this.regionInfo = regionInfo;
- }
-
- public RegionInfo getRegionInfo() {
- return this.regionInfo;
+ this.procId = procId;
}
}
public static class RegionOpenOperation extends RegionOperation {
- private final List<ServerName> favoredNodes;
- private final boolean openForReplay;
- private boolean failedOpen;
- public RegionOpenOperation(final RemoteProcedure remoteProcedure,
- final RegionInfo regionInfo, final List<ServerName> favoredNodes,
- final boolean openForReplay) {
- super(remoteProcedure, regionInfo);
- this.favoredNodes = favoredNodes;
- this.openForReplay = openForReplay;
- }
-
- protected void setFailedOpen(final boolean failedOpen) {
- this.failedOpen = failedOpen;
- }
-
- public boolean isFailedOpen() {
- return failedOpen;
+ public RegionOpenOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo,
+ long procId) {
+ super(remoteProcedure, regionInfo, procId);
}
public OpenRegionRequest.RegionOpenInfo buildRegionOpenInfoRequest(
final MasterProcedureEnv env) {
- return RequestConverter.buildRegionOpenInfo(getRegionInfo(),
- env.getAssignmentManager().getFavoredNodes(getRegionInfo()));
+ return RequestConverter.buildRegionOpenInfo(regionInfo,
+ env.getAssignmentManager().getFavoredNodes(regionInfo), procId);
}
}
public static class RegionCloseOperation extends RegionOperation {
private final ServerName destinationServer;
- private boolean closed = false;
- public RegionCloseOperation(final RemoteProcedure remoteProcedure,
- final RegionInfo regionInfo, final ServerName destinationServer) {
- super(remoteProcedure, regionInfo);
+ public RegionCloseOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId,
+ ServerName destinationServer) {
+ super(remoteProcedure, regionInfo, procId);
this.destinationServer = destinationServer;
}
@@ -454,17 +436,9 @@ public class RSProcedureDispatcher
return destinationServer;
}
- protected void setClosed(final boolean closed) {
- this.closed = closed;
- }
-
- public boolean isClosed() {
- return closed;
- }
-
public CloseRegionRequest buildCloseRegionRequest(final ServerName serverName) {
- return ProtobufUtil.buildCloseRegionRequest(serverName,
- getRegionInfo().getRegionName(), getDestinationServer());
+ return ProtobufUtil.buildCloseRegionRequest(serverName, regionInfo.getRegionName(),
+ getDestinationServer(), procId);
}
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index e407285..bcb1a07 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -2228,9 +2228,11 @@ public class HRegionServer extends HasThread implements
@Override
public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException {
HRegion r = context.getRegion();
+ long openProcId = context.getOpenProcId();
long masterSystemTime = context.getMasterSystemTime();
rpcServices.checkOpen();
- LOG.info("Post open deploy tasks for " + r.getRegionInfo().getRegionNameAsString());
+ LOG.info("Post open deploy tasks for {}, openProcId={}, masterSystemTime={}",
+ r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime);
// Do checks to see if we need to compact (references or too many files)
for (HStore s : r.stores.values()) {
if (s.hasReferences() || s.needsCompaction()) {
@@ -2247,7 +2249,7 @@ public class HRegionServer extends HasThread implements
// Notify master
if (!reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED,
- openSeqNum, masterSystemTime, r.getRegionInfo()))) {
+ openSeqNum, openProcId, masterSystemTime, r.getRegionInfo()))) {
throw new IOException(
"Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString());
}
@@ -2263,6 +2265,7 @@ public class HRegionServer extends HasThread implements
long openSeqNum = context.getOpenSeqNum();
long masterSystemTime = context.getMasterSystemTime();
RegionInfo[] hris = context.getHris();
+ long[] procIds = context.getProcIds();
if (TEST_SKIP_REPORTING_TRANSITION) {
// This is for testing only in case there is no master
@@ -2301,6 +2304,9 @@ public class HRegionServer extends HasThread implements
for (RegionInfo hri: hris) {
transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri));
}
+ for (long procId: procIds) {
+ transition.addProcId(procId);
+ }
ReportRegionStateTransitionRequest request = builder.build();
int tries = 0;
long pauseTime = INIT_PAUSE_TIME_MS;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 9b99ff8..2054ac7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -3714,8 +3714,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
regionServer.updateRegionFavoredNodesMapping(regionInfo.getEncodedName(),
regionOpenInfo.getFavoredNodesList());
}
- regionServer.executorService
- .submit(AssignRegionHandler.create(regionServer, regionInfo, tableDesc, masterSystemTime));
+ regionServer.executorService.submit(AssignRegionHandler.create(regionServer, regionInfo,
+ regionOpenInfo.getOpenProcId(), tableDesc, masterSystemTime));
}
}
@@ -3729,8 +3729,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
ServerName destination =
request.hasDestinationServer() ? ProtobufUtil.toServerName(request.getDestinationServer())
: null;
- regionServer.executorService
- .submit(UnassignRegionHandler.create(regionServer, encodedName, false, destination));
+ regionServer.executorService.submit(UnassignRegionHandler.create(regionServer, encodedName,
+ request.getCloseProcId(), false, destination));
}
private void executeProcedures(RemoteProcedureRequest request) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index e0638ac..17f318b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -100,16 +100,23 @@ public interface RegionServerServices extends Server, MutableOnlineRegions, Favo
*/
class PostOpenDeployContext {
private final HRegion region;
+ private final long openProcId;
private final long masterSystemTime;
- @InterfaceAudience.Private
- public PostOpenDeployContext(HRegion region, long masterSystemTime) {
+ public PostOpenDeployContext(HRegion region, long openProcId, long masterSystemTime) {
this.region = region;
+ this.openProcId = openProcId;
this.masterSystemTime = masterSystemTime;
}
+
public HRegion getRegion() {
return region;
}
+
+ public long getOpenProcId() {
+ return openProcId;
+ }
+
public long getMasterSystemTime() {
return masterSystemTime;
}
@@ -125,28 +132,46 @@ public interface RegionServerServices extends Server, MutableOnlineRegions, Favo
private final TransitionCode code;
private final long openSeqNum;
private final long masterSystemTime;
+ private final long[] procIds;
private final RegionInfo[] hris;
- @InterfaceAudience.Private
public RegionStateTransitionContext(TransitionCode code, long openSeqNum, long masterSystemTime,
RegionInfo... hris) {
this.code = code;
this.openSeqNum = openSeqNum;
this.masterSystemTime = masterSystemTime;
this.hris = hris;
+ this.procIds = new long[hris.length];
}
+
+ public RegionStateTransitionContext(TransitionCode code, long openSeqNum, long procId,
+ long masterSystemTime, RegionInfo hri) {
+ this.code = code;
+ this.openSeqNum = openSeqNum;
+ this.masterSystemTime = masterSystemTime;
+ this.hris = new RegionInfo[] { hri };
+ this.procIds = new long[] { procId };
+ }
+
public TransitionCode getCode() {
return code;
}
+
public long getOpenSeqNum() {
return openSeqNum;
}
+
public long getMasterSystemTime() {
return masterSystemTime;
}
+
public RegionInfo[] getHris() {
return hris;
}
+
+ public long[] getProcIds() {
+ return procIds;
+ }
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
index 9a0531c..9e806bb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.regionserver;
import java.security.PrivilegedAction;
-
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
@@ -30,7 +29,9 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
/**
@@ -87,7 +88,7 @@ class SplitRequest implements Runnable {
// hri_a and hri_b objects may not reflect the regions that will be created, those objects
// are created just to pass the information to the reportRegionStateTransition().
if (!server.reportRegionStateTransition(new RegionStateTransitionContext(
- TransitionCode.READY_TO_SPLIT, HConstants.NO_SEQNUM, -1, parent, hri_a, hri_b))) {
+ TransitionCode.READY_TO_SPLIT, HConstants.NO_SEQNUM, -1, parent, hri_a, hri_b))) {
LOG.error("Unable to ask master to split " + parent.getRegionNameAsString());
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
index c6fee2e..bc2425b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
@@ -51,16 +51,19 @@ public class AssignRegionHandler extends EventHandler {
private final RegionInfo regionInfo;
+ private final long openProcId;
+
private final TableDescriptor tableDesc;
private final long masterSystemTime;
private final RetryCounter retryCounter;
- public AssignRegionHandler(RegionServerServices server, RegionInfo regionInfo,
+ public AssignRegionHandler(RegionServerServices server, RegionInfo regionInfo, long openProcId,
@Nullable TableDescriptor tableDesc, long masterSystemTime, EventType eventType) {
super(server, eventType);
this.regionInfo = regionInfo;
+ this.openProcId = openProcId;
this.tableDesc = tableDesc;
this.masterSystemTime = masterSystemTime;
this.retryCounter = HandlerUtil.getRetryCounter();
@@ -76,7 +79,7 @@ public class AssignRegionHandler extends EventHandler {
RegionServerServices rs = getServer();
rs.getRegionsInTransitionInRS().remove(regionInfo.getEncodedNameAsBytes(), Boolean.TRUE);
if (!rs.reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.FAILED_OPEN,
- HConstants.NO_SEQNUM, masterSystemTime, regionInfo))) {
+ HConstants.NO_SEQNUM, openProcId, masterSystemTime, regionInfo))) {
throw new IOException(
"Failed to report failed open to master: " + regionInfo.getRegionNameAsString());
}
@@ -133,7 +136,7 @@ public class AssignRegionHandler extends EventHandler {
cleanUpAndReportFailure(e);
return;
}
- rs.postOpenDeployTasks(new PostOpenDeployContext(region, masterSystemTime));
+ rs.postOpenDeployTasks(new PostOpenDeployContext(region, openProcId, masterSystemTime));
rs.addRegion(region);
LOG.info("Opened {}", regionName);
Boolean current = rs.getRegionsInTransitionInRS().remove(regionInfo.getEncodedNameAsBytes());
@@ -156,7 +159,7 @@ public class AssignRegionHandler extends EventHandler {
}
public static AssignRegionHandler create(RegionServerServices server, RegionInfo regionInfo,
- TableDescriptor tableDesc, long masterSystemTime) {
+ long openProcId, TableDescriptor tableDesc, long masterSystemTime) {
EventType eventType;
if (regionInfo.isMetaRegion()) {
eventType = EventType.M_RS_CLOSE_META;
@@ -166,6 +169,7 @@ public class AssignRegionHandler extends EventHandler {
} else {
eventType = EventType.M_RS_OPEN_REGION;
}
- return new AssignRegionHandler(server, regionInfo, tableDesc, masterSystemTime, eventType);
+ return new AssignRegionHandler(server, regionInfo, openProcId, tableDesc, masterSystemTime,
+ eventType);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
index 0e35a0b..d4ea004 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
@@ -123,7 +124,7 @@ public class CloseRegionHandler extends EventHandler {
this.rsServices.removeRegion(region, destination);
rsServices.reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.CLOSED,
- HConstants.NO_SEQNUM, -1, regionInfo));
+ HConstants.NO_SEQNUM, Procedure.NO_PROC_ID, -1, regionInfo));
// Done! Region is closed on this RS
LOG.debug("Closed " + region.getRegionInfo().getRegionNameAsString());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
index 31177ef..8b644b0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
@@ -156,15 +157,14 @@ public class OpenRegionHandler extends EventHandler {
}
}
- private void doCleanUpOnFailedOpen(HRegion region)
- throws IOException {
+ private void doCleanUpOnFailedOpen(HRegion region) throws IOException {
try {
if (region != null) {
cleanupFailedOpen(region);
}
} finally {
rsServices.reportRegionStateTransition(new RegionStateTransitionContext(
- TransitionCode.FAILED_OPEN, HConstants.NO_SEQNUM, -1, regionInfo));
+ TransitionCode.FAILED_OPEN, HConstants.NO_SEQNUM, Procedure.NO_PROC_ID, -1, regionInfo));
}
}
@@ -248,19 +248,19 @@ public class OpenRegionHandler extends EventHandler {
@Override
public void run() {
try {
- this.services.postOpenDeployTasks(new PostOpenDeployContext(region, masterSystemTime));
+ this.services.postOpenDeployTasks(
+ new PostOpenDeployContext(region, Procedure.NO_PROC_ID, masterSystemTime));
} catch (Throwable e) {
String msg = "Exception running postOpenDeployTasks; region=" +
this.region.getRegionInfo().getEncodedName();
this.exception = e;
- if (e instanceof IOException
- && isRegionStillOpening(region.getRegionInfo(), services)) {
+ if (e instanceof IOException && isRegionStillOpening(region.getRegionInfo(), services)) {
server.abort(msg, e);
} else {
LOG.warn(msg, e);
}
}
- // We're done. Set flag then wake up anyone waiting on thread to complete.
+ // We're done. Set flag then wake up anyone waiting on thread to complete.
this.signaller.set(true);
synchronized (this.signaller) {
this.signaller.notify();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
index cd38db1..3ce7caa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
@@ -49,18 +49,22 @@ public class UnassignRegionHandler extends EventHandler {
private static final Logger LOG = LoggerFactory.getLogger(UnassignRegionHandler.class);
private final String encodedName;
+
+ private final long closeProcId;
// If true, the hosting server is aborting. Region close process is different
// when we are aborting.
+ // TODO: not used yet, we still use the old CloseRegionHandler when aborting
private final boolean abort;
private final ServerName destination;
private final RetryCounter retryCounter;
- public UnassignRegionHandler(RegionServerServices server, String encodedName, boolean abort,
- @Nullable ServerName destination, EventType eventType) {
+ public UnassignRegionHandler(RegionServerServices server, String encodedName, long closeProcId,
+ boolean abort, @Nullable ServerName destination, EventType eventType) {
super(server, eventType);
this.encodedName = encodedName;
+ this.closeProcId = closeProcId;
this.abort = abort;
this.destination = destination;
this.retryCounter = HandlerUtil.getRetryCounter();
@@ -117,7 +121,7 @@ public class UnassignRegionHandler extends EventHandler {
}
rs.removeRegion(region, destination);
if (!rs.reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.CLOSED,
- HConstants.NO_SEQNUM, -1, region.getRegionInfo()))) {
+ HConstants.NO_SEQNUM, closeProcId, -1, region.getRegionInfo()))) {
throw new IOException("Failed to report close to master: " + regionName);
}
rs.getRegionsInTransitionInRS().remove(encodedNameBytes, Boolean.FALSE);
@@ -131,13 +135,14 @@ public class UnassignRegionHandler extends EventHandler {
}
public static UnassignRegionHandler create(RegionServerServices server, String encodedName,
- boolean abort, @Nullable ServerName destination) {
+ long closeProcId, boolean abort, @Nullable ServerName destination) {
// Just try our best to determine whether it is for closing meta. It is not the end of the world
// if we put the handler into a wrong executor.
Region region = server.getRegion(encodedName);
EventType eventType =
region != null && region.getRegionInfo().isMetaRegion() ? EventType.M_RS_CLOSE_META
: EventType.M_RS_CLOSE_REGION;
- return new UnassignRegionHandler(server, encodedName, abort, destination, eventType);
+ return new UnassignRegionHandler(server, encodedName, closeProcId, abort, destination,
+ eventType);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java
index fb6668a..9f3aceb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java
@@ -400,6 +400,13 @@ public abstract class TestAssignmentManagerBase {
if (retries == timeoutTimes) {
LOG.info("Mark server=" + server + " as dead. retries=" + retries);
master.getServerManager().moveFromOnlineToDeadServers(server);
+ executor.schedule(new Runnable() {
+ @Override
+ public void run() {
+ LOG.info("Sending in CRASH of " + server);
+ doCrash(server);
+ }
+ }, 1, TimeUnit.SECONDS);
}
throw new SocketTimeoutException("simulate socket timeout");
} else {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestCloseRegionWhileRSCrash.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestCloseRegionWhileRSCrash.java
index d34bfbb..0a29958 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestCloseRegionWhileRSCrash.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestCloseRegionWhileRSCrash.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
@@ -167,7 +168,7 @@ public class TestCloseRegionWhileRSCrash {
HRegionServer dstRs = UTIL.getOtherRegionServer(srcRs);
ProcedureExecutor<MasterProcedureEnv> procExec =
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
- long dummyProcId = procExec.submitProcedure(new DummyServerProcedure(srcRs.getServerName()));
+ procExec.submitProcedure(new DummyServerProcedure(srcRs.getServerName()));
ARRIVE.await();
UTIL.getMiniHBaseCluster().killRegionServer(srcRs.getServerName());
UTIL.waitFor(30000,
@@ -185,13 +186,12 @@ public class TestCloseRegionWhileRSCrash {
30000);
// wait until the timeout value increase three times
ProcedureTestUtil.waitUntilProcedureTimeoutIncrease(UTIL, TransitRegionStateProcedure.class, 3);
- // let's close the connection to make sure that the SCP can not update meta successfully
- UTIL.getMiniHBaseCluster().getMaster().getConnection().close();
+ // close connection to make sure that we can not finish the TRSP
+ HMaster master = UTIL.getMiniHBaseCluster().getMaster();
+ master.getConnection().close();
RESUME.countDown();
- UTIL.waitFor(30000, () -> procExec.isFinished(dummyProcId));
- Thread.sleep(2000);
- // here we restart
- UTIL.getMiniHBaseCluster().stopMaster(0).join();
+ UTIL.waitFor(30000, () -> !master.isAlive());
+ // here we start a new master
UTIL.getMiniHBaseCluster().startMaster();
t.join();
// Make sure that the region is online, it may not on the original target server, as we will set
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestOpenRegionProcedureHang.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestOpenRegionProcedureHang.java
new file mode 100644
index 0000000..0463721
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestOpenRegionProcedureHang.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.assignment;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.PleaseHoldException;
+import org.apache.hadoop.hbase.StartMiniClusterOption;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
+import org.apache.zookeeper.KeeperException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
+
+/**
+ * See HBASE-22060 and HBASE-22074 for more details.
+ */
+@Category({ MasterTests.class, MediumTests.class })
+public class TestOpenRegionProcedureHang {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestOpenRegionProcedureHang.class);
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestOpenRegionProcedureHang.class);
+
+ private static CountDownLatch ARRIVE;
+ private static CountDownLatch RESUME;
+
+ private static CountDownLatch FINISH;
+
+ private static CountDownLatch ABORT;
+
+ private static final class AssignmentManagerForTest extends AssignmentManager {
+
+ public AssignmentManagerForTest(MasterServices master) {
+ super(master);
+ }
+
+ @Override
+ public ReportRegionStateTransitionResponse reportRegionStateTransition(
+ ReportRegionStateTransitionRequest req) throws PleaseHoldException {
+ RegionStateTransition transition = req.getTransition(0);
+ if (transition.getTransitionCode() == TransitionCode.OPENED &&
+ ProtobufUtil.toTableName(transition.getRegionInfo(0).getTableName()).equals(NAME) &&
+ ARRIVE != null) {
+ ARRIVE.countDown();
+ try {
+ RESUME.await();
+ RESUME = null;
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ try {
+ return super.reportRegionStateTransition(req);
+ } finally {
+ FINISH.countDown();
+ }
+ } else {
+ return super.reportRegionStateTransition(req);
+ }
+ }
+ }
+
+ public static final class HMasterForTest extends HMaster {
+
+ public HMasterForTest(Configuration conf) throws IOException, KeeperException {
+ super(conf);
+ }
+
+ @Override
+ protected AssignmentManager createAssignmentManager(MasterServices master) {
+ return new AssignmentManagerForTest(master);
+ }
+
+ @Override
+ public void abort(String reason, Throwable cause) {
+ // hang here so we can finish the reportRegionStateTransition call, which is the most
+ // important part to reproduce the bug
+ if (ABORT != null) {
+ try {
+ ABORT.await();
+ ABORT = null;
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ super.abort(reason, cause);
+ }
+ }
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static TableName NAME = TableName.valueOf("Open");
+
+ private static byte[] CF = Bytes.toBytes("cf");
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ Configuration conf = UTIL.getConfiguration();
+ conf.setClass(HConstants.MASTER_IMPL, HMasterForTest.class, HMaster.class);
+
+ // make sure we do not timeout when caling reportRegionStateTransition
+ conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 10 * 60 * 1000);
+ conf.setInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, 10 * 60 * 1000);
+ UTIL
+ .startMiniCluster(StartMiniClusterOption.builder().numMasters(2).numRegionServers(3).build());
+ UTIL.createTable(NAME, CF);
+ UTIL.waitTableAvailable(NAME);
+ UTIL.getAdmin().balancerSwitch(false, true);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void test() throws InterruptedException, KeeperException, IOException {
+ RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(NAME).get(0).getRegionInfo();
+ AssignmentManager am = UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager();
+
+ HRegionServer rs1 = UTIL.getRSForFirstRegionInTable(NAME);
+ HRegionServer rs2 = UTIL.getOtherRegionServer(rs1);
+
+ ARRIVE = new CountDownLatch(1);
+ RESUME = new CountDownLatch(1);
+ FINISH = new CountDownLatch(1);
+ ABORT = new CountDownLatch(1);
+ am.moveAsync(new RegionPlan(region, rs1.getServerName(), rs2.getServerName()));
+
+ ARRIVE.await();
+ ARRIVE = null;
+ HMaster master = UTIL.getMiniHBaseCluster().getMaster();
+ master.getZooKeeper().close();
+ UTIL.waitFor(30000, () -> {
+ for (MasterThread mt : UTIL.getMiniHBaseCluster().getMasterThreads()) {
+ if (mt.getMaster() != master && mt.getMaster().isActiveMaster()) {
+ return mt.getMaster().isInitialized();
+ }
+ }
+ return false;
+ });
+ ProcedureExecutor<MasterProcedureEnv> procExec =
+ UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
+ UTIL.waitFor(30000,
+ () -> procExec.getProcedures().stream().filter(p -> p instanceof OpenRegionProcedure)
+ .map(p -> (OpenRegionProcedure) p).anyMatch(p -> p.region.getTable().equals(NAME)));
+ OpenRegionProcedure proc = procExec.getProcedures().stream()
+ .filter(p -> p instanceof OpenRegionProcedure).map(p -> (OpenRegionProcedure) p)
+ .filter(p -> p.region.getTable().equals(NAME)).findFirst().get();
+ // wait a bit to let the OpenRegionProcedure send out the request
+ Thread.sleep(2000);
+ RESUME.countDown();
+ if (!FINISH.await(15, TimeUnit.SECONDS)) {
+ LOG.info("Wait reportRegionStateTransition to finish timed out, this is possible if" +
+ " we update the procedure store, as the WALProcedureStore" +
+ " will retry forever to roll the writer if it is not closed");
+ }
+ FINISH = null;
+ // if the reportRegionTransition is finished, wait a bit to let it return the data to RS
+ Thread.sleep(2000);
+ ABORT.countDown();
+
+ UTIL.waitFor(30000, () -> procExec.isFinished(proc.getProcId()));
+ UTIL.waitFor(30000, () -> procExec.isFinished(proc.getParentProcId()));
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java
index d4745b9..f03794a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.master.procedure;
import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SWITCH_RPC_THROTTLE;
+import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.NavigableMap;
@@ -31,7 +32,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ServerName;
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.assignment.MockMasterServices;
import org.apache.hadoop.hbase.master.assignment.OpenRegionProcedure;
+import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
@@ -62,6 +63,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
@Category({ MasterTests.class, MediumTests.class })
@@ -134,19 +136,21 @@ public class TestServerRemoteProcedure {
}
@Test
- public void testRegionOpenProcedureIsNotHandledByDisPatcher() throws Exception {
+ public void testRegionOpenProcedureIsNotHandledByDispatcher() throws Exception {
TableName tableName = TableName.valueOf("testRegionOpenProcedureIsNotHandledByDisPatcher");
RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(1))
- .setEndKey(Bytes.toBytes(2)).setSplit(false).setRegionId(0).build();
- master.getMasterProcedureExecutor().getEnvironment().getAssignmentManager().getRegionStates()
- .getOrCreateRegionStateNode(hri);
+ .setEndKey(Bytes.toBytes(2)).setSplit(false).setRegionId(0).build();
+ MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
+ env.getAssignmentManager().getRegionStates().getOrCreateRegionStateNode(hri);
+ TransitRegionStateProcedure proc = TransitRegionStateProcedure.assign(env, hri, null);
ServerName worker = master.getServerManager().getOnlineServersList().get(0);
- OpenRegionProcedure openRegionProcedure = new OpenRegionProcedure(hri, worker);
+ OpenRegionProcedure openRegionProcedure = new OpenRegionProcedure(proc, hri, worker);
Future<byte[]> future = submitProcedure(openRegionProcedure);
Thread.sleep(2000);
rsDispatcher.removeNode(worker);
try {
future.get(2000, TimeUnit.MILLISECONDS);
+ fail();
} catch (TimeoutException e) {
LOG.info("timeout is expected");
}