You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/05/24 07:20:33 UTC
[10/27] hbase git commit: HBASE-14614 Procedure v2 - Core Assignment
Manager (Matteo Bertozzi) Move to a new AssignmentManager,
one that describes Assignment using a State Machine built on top of
ProcedureV2 facility.
http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
new file mode 100644
index 0000000..887e272
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
@@ -0,0 +1,541 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import com.google.common.collect.ArrayListMultimap;
+
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.ServerListener;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * A remote procecdure dispatcher for regionservers.
+ */
+public class RSProcedureDispatcher
+ extends RemoteProcedureDispatcher<MasterProcedureEnv, ServerName>
+ implements ServerListener {
+ private static final Log LOG = LogFactory.getLog(RSProcedureDispatcher.class);
+
+ public static final String RS_RPC_STARTUP_WAIT_TIME_CONF_KEY =
+ "hbase.regionserver.rpc.startup.waittime";
+ private static final int DEFAULT_RS_RPC_STARTUP_WAIT_TIME = 60000;
+
+ private static final int RS_VERSION_WITH_EXEC_PROCS = 0x0201000; // 2.1
+
+ protected final MasterServices master;
+ protected final long rsStartupWaitTime;
+
+ public RSProcedureDispatcher(final MasterServices master) {
+ super(master.getConfiguration());
+
+ this.master = master;
+ this.rsStartupWaitTime = master.getConfiguration().getLong(
+ RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, DEFAULT_RS_RPC_STARTUP_WAIT_TIME);
+ }
+
+ @Override
+ public boolean start() {
+ if (!super.start()) {
+ return false;
+ }
+
+ master.getServerManager().registerListener(this);
+ for (ServerName serverName: master.getServerManager().getOnlineServersList()) {
+ addNode(serverName);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean stop() {
+ if (!super.stop()) {
+ return false;
+ }
+
+ master.getServerManager().unregisterListener(this);
+ return true;
+ }
+
+ @Override
+ protected void remoteDispatch(final ServerName serverName,
+ final Set<RemoteProcedure> operations) {
+ final int rsVersion = master.getAssignmentManager().getServerVersion(serverName);
+ if (rsVersion >= RS_VERSION_WITH_EXEC_PROCS) {
+ LOG.info(String.format(
+ "Using procedure batch rpc execution for serverName=%s version=%s",
+ serverName, rsVersion));
+ submitTask(new ExecuteProceduresRemoteCall(serverName, operations));
+ } else {
+ LOG.info(String.format(
+ "Fallback to compat rpc execution for serverName=%s version=%s",
+ serverName, rsVersion));
+ submitTask(new CompatRemoteProcedureResolver(serverName, operations));
+ }
+ }
+
+ protected void abortPendingOperations(final ServerName serverName,
+ final Set<RemoteProcedure> operations) {
+ // TODO: Replace with a ServerNotOnlineException()
+ final IOException e = new DoNotRetryIOException("server not online " + serverName);
+ final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
+ for (RemoteProcedure proc: operations) {
+ proc.remoteCallFailed(env, serverName, e);
+ }
+ }
+
+ public void serverAdded(final ServerName serverName) {
+ addNode(serverName);
+ }
+
+ public void serverRemoved(final ServerName serverName) {
+ removeNode(serverName);
+ }
+
+ /**
+ * Base remote call
+ */
+ protected abstract class AbstractRSRemoteCall implements Callable<Void> {
+ private final ServerName serverName;
+
+ private int numberOfAttemptsSoFar = 0;
+ private long maxWaitTime = -1;
+
+ public AbstractRSRemoteCall(final ServerName serverName) {
+ this.serverName = serverName;
+ }
+
+ public abstract Void call();
+
+ protected AdminService.BlockingInterface getRsAdmin() throws IOException {
+ final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName);
+ if (admin == null) {
+ throw new IOException("Attempting to send OPEN RPC to server " + getServerName() +
+ " failed because no RPC connection found to this server");
+ }
+ return admin;
+ }
+
+ protected ServerName getServerName() {
+ return serverName;
+ }
+
+ protected boolean scheduleForRetry(final IOException e) {
+ // Should we wait a little before retrying? If the server is starting it's yes.
+ final boolean hold = (e instanceof ServerNotRunningYetException);
+ if (hold) {
+ LOG.warn(String.format("waiting a little before trying on the same server=%s try=%d",
+ serverName, numberOfAttemptsSoFar), e);
+ long now = EnvironmentEdgeManager.currentTime();
+ if (now < getMaxWaitTime()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("server is not yet up; waiting up to %dms",
+ (getMaxWaitTime() - now)), e);
+ }
+ submitTask(this, 100, TimeUnit.MILLISECONDS);
+ return true;
+ }
+
+ LOG.warn(String.format("server %s is not up for a while; try a new one", serverName), e);
+ return false;
+ }
+
+ // In case socket is timed out and the region server is still online,
+ // the openRegion RPC could have been accepted by the server and
+ // just the response didn't go through. So we will retry to
+ // open the region on the same server.
+ final boolean retry = !hold && (e instanceof SocketTimeoutException
+ && master.getServerManager().isServerOnline(serverName));
+ if (retry) {
+ // we want to retry as many times as needed as long as the RS is not dead.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Retrying to same RegionServer %s because: %s",
+ serverName, e.getMessage()), e);
+ }
+ submitTask(this);
+ return true;
+ }
+
+ // trying to send the request elsewhere instead
+ LOG.warn(String.format("the request should be tried elsewhere instead; server=%s try=%d",
+ serverName, numberOfAttemptsSoFar), e);
+ return false;
+ }
+
+ private long getMaxWaitTime() {
+ if (this.maxWaitTime < 0) {
+ // This is the max attempts, not retries, so it should be at least 1.
+ this.maxWaitTime = EnvironmentEdgeManager.currentTime() + rsStartupWaitTime;
+ }
+ return this.maxWaitTime;
+ }
+
+ protected IOException unwrapException(IOException e) {
+ if (e instanceof RemoteException) {
+ e = ((RemoteException)e).unwrapRemoteException();
+ }
+ return e;
+ }
+ }
+
+ private interface RemoteProcedureResolver {
+ void dispatchOpenRequests(MasterProcedureEnv env, List<RegionOpenOperation> operations);
+ void dispatchCloseRequests(MasterProcedureEnv env, List<RegionCloseOperation> operations);
+ }
+
+ public void splitAndResolveOperation(final ServerName serverName,
+ final Set<RemoteProcedure> operations, final RemoteProcedureResolver resolver) {
+ final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
+ final ArrayListMultimap<Class<?>, RemoteOperation> reqsByType =
+ buildAndGroupRequestByType(env, serverName, operations);
+
+ final List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class);
+ if (!openOps.isEmpty()) resolver.dispatchOpenRequests(env, openOps);
+
+ final List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class);
+ if (!closeOps.isEmpty()) resolver.dispatchCloseRequests(env, closeOps);
+
+ if (!reqsByType.isEmpty()) {
+ LOG.warn("unknown request type in the queue: " + reqsByType);
+ }
+ }
+
+ // ==========================================================================
+ // Compatibility calls
+ // ==========================================================================
+ protected class ExecuteProceduresRemoteCall extends AbstractRSRemoteCall
+ implements RemoteProcedureResolver {
+ private final Set<RemoteProcedure> operations;
+
+ private ExecuteProceduresRequest.Builder request = null;
+
+ public ExecuteProceduresRemoteCall(final ServerName serverName,
+ final Set<RemoteProcedure> operations) {
+ super(serverName);
+ this.operations = operations;
+ }
+
+ public Void call() {
+ final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
+
+ request = ExecuteProceduresRequest.newBuilder();
+ splitAndResolveOperation(getServerName(), operations, this);
+
+ try {
+ final ExecuteProceduresResponse response = sendRequest(getServerName(), request.build());
+ remoteCallCompleted(env, response);
+ } catch (IOException e) {
+ e = unwrapException(e);
+ // TODO: In the future some operation may want to bail out early.
+ // TODO: How many times should we retry (use numberOfAttemptsSoFar)
+ if (!scheduleForRetry(e)) {
+ remoteCallFailed(env, e);
+ }
+ }
+ return null;
+ }
+
+ public void dispatchOpenRequests(final MasterProcedureEnv env,
+ final List<RegionOpenOperation> operations) {
+ request.addOpenRegion(buildOpenRegionRequest(env, getServerName(), operations));
+ }
+
+ public void dispatchCloseRequests(final MasterProcedureEnv env,
+ final List<RegionCloseOperation> operations) {
+ for (RegionCloseOperation op: operations) {
+ request.addCloseRegion(op.buildCloseRegionRequest(getServerName()));
+ }
+ }
+
+ protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
+ final ExecuteProceduresRequest request) throws IOException {
+ try {
+ return getRsAdmin().executeProcedures(null, request);
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
+ }
+
+
+ private void remoteCallCompleted(final MasterProcedureEnv env,
+ final ExecuteProceduresResponse response) {
+ /*
+ for (RemoteProcedure proc: operations) {
+ proc.remoteCallCompleted(env, getServerName(), response);
+ }*/
+ }
+
+ private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
+ for (RemoteProcedure proc: operations) {
+ proc.remoteCallFailed(env, getServerName(), e);
+ }
+ }
+ }
+
+ // ==========================================================================
+ // Compatibility calls
+ // Since we don't have a "batch proc-exec" request on the target RS
+ // we have to chunk the requests by type and dispatch the specific request.
+ // ==========================================================================
+ private static OpenRegionRequest buildOpenRegionRequest(final MasterProcedureEnv env,
+ final ServerName serverName, final List<RegionOpenOperation> operations) {
+ final OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
+ builder.setServerStartCode(serverName.getStartcode());
+ builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
+ for (RegionOpenOperation op: operations) {
+ builder.addOpenInfo(op.buildRegionOpenInfoRequest(env));
+ }
+ return builder.build();
+ }
+
+ private final class OpenRegionRemoteCall extends AbstractRSRemoteCall {
+ private final List<RegionOpenOperation> operations;
+
+ public OpenRegionRemoteCall(final ServerName serverName,
+ final List<RegionOpenOperation> operations) {
+ super(serverName);
+ this.operations = operations;
+ }
+
+ @Override
+ public Void call() {
+ final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
+ final OpenRegionRequest request = buildOpenRegionRequest(env, getServerName(), operations);
+
+ try {
+ OpenRegionResponse response = sendRequest(getServerName(), request);
+ remoteCallCompleted(env, response);
+ } catch (IOException e) {
+ e = unwrapException(e);
+ // TODO: In the future some operation may want to bail out early.
+ // TODO: How many times should we retry (use numberOfAttemptsSoFar)
+ if (!scheduleForRetry(e)) {
+ remoteCallFailed(env, e);
+ }
+ }
+ return null;
+ }
+
+ private OpenRegionResponse sendRequest(final ServerName serverName,
+ final OpenRegionRequest request) throws IOException {
+ try {
+ return getRsAdmin().openRegion(null, request);
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
+ }
+
+ private void remoteCallCompleted(final MasterProcedureEnv env,
+ final OpenRegionResponse response) {
+ int index = 0;
+ for (RegionOpenOperation op: operations) {
+ OpenRegionResponse.RegionOpeningState state = response.getOpeningState(index++);
+ op.setFailedOpen(state == OpenRegionResponse.RegionOpeningState.FAILED_OPENING);
+ op.getRemoteProcedure().remoteCallCompleted(env, getServerName(), op);
+ }
+ }
+
+ private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
+ for (RegionOpenOperation op: operations) {
+ op.getRemoteProcedure().remoteCallFailed(env, getServerName(), e);
+ }
+ }
+ }
+
+ private final class CloseRegionRemoteCall extends AbstractRSRemoteCall {
+ private final RegionCloseOperation operation;
+
+ public CloseRegionRemoteCall(final ServerName serverName,
+ final RegionCloseOperation operation) {
+ super(serverName);
+ this.operation = operation;
+ }
+
+ @Override
+ public Void call() {
+ final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
+ final CloseRegionRequest request = operation.buildCloseRegionRequest(getServerName());
+ try {
+ CloseRegionResponse response = sendRequest(getServerName(), request);
+ remoteCallCompleted(env, response);
+ } catch (IOException e) {
+ e = unwrapException(e);
+ // TODO: In the future some operation may want to bail out early.
+ // TODO: How many times should we retry (use numberOfAttemptsSoFar)
+ if (!scheduleForRetry(e)) {
+ remoteCallFailed(env, e);
+ }
+ }
+ return null;
+ }
+
+ private CloseRegionResponse sendRequest(final ServerName serverName,
+ final CloseRegionRequest request) throws IOException {
+ try {
+ return getRsAdmin().closeRegion(null, request);
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
+ }
+
+ private void remoteCallCompleted(final MasterProcedureEnv env,
+ final CloseRegionResponse response) {
+ operation.setClosed(response.getClosed());
+ operation.getRemoteProcedure().remoteCallCompleted(env, getServerName(), operation);
+ }
+
+ private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
+ operation.getRemoteProcedure().remoteCallFailed(env, getServerName(), e);
+ }
+ }
+
+ protected class CompatRemoteProcedureResolver implements Callable<Void>, RemoteProcedureResolver {
+ private final Set<RemoteProcedure> operations;
+ private final ServerName serverName;
+
+ public CompatRemoteProcedureResolver(final ServerName serverName,
+ final Set<RemoteProcedure> operations) {
+ this.serverName = serverName;
+ this.operations = operations;
+ }
+
+ @Override
+ public Void call() {
+ splitAndResolveOperation(serverName, operations, this);
+ return null;
+ }
+
+ public void dispatchOpenRequests(final MasterProcedureEnv env,
+ final List<RegionOpenOperation> operations) {
+ submitTask(new OpenRegionRemoteCall(serverName, operations));
+ }
+
+ public void dispatchCloseRequests(final MasterProcedureEnv env,
+ final List<RegionCloseOperation> operations) {
+ for (RegionCloseOperation op: operations) {
+ submitTask(new CloseRegionRemoteCall(serverName, op));
+ }
+ }
+ }
+
+ // ==========================================================================
+ // RPC Messages
+ // - ServerOperation: refreshConfig, grant, revoke, ...
+ // - RegionOperation: open, close, flush, snapshot, ...
+ // ==========================================================================
+ public static abstract class ServerOperation extends RemoteOperation {
+ protected ServerOperation(final RemoteProcedure remoteProcedure) {
+ super(remoteProcedure);
+ }
+ }
+
+ public static abstract class RegionOperation extends RemoteOperation {
+ private final HRegionInfo regionInfo;
+
+ protected RegionOperation(final RemoteProcedure remoteProcedure,
+ final HRegionInfo regionInfo) {
+ super(remoteProcedure);
+ this.regionInfo = regionInfo;
+ }
+
+ public HRegionInfo getRegionInfo() {
+ return this.regionInfo;
+ }
+ }
+
+ public static class RegionOpenOperation extends RegionOperation {
+ private final List<ServerName> favoredNodes;
+ private final boolean openForReplay;
+ private boolean failedOpen;
+
+ public RegionOpenOperation(final RemoteProcedure remoteProcedure,
+ final HRegionInfo regionInfo, final List<ServerName> favoredNodes,
+ final boolean openForReplay) {
+ super(remoteProcedure, regionInfo);
+ this.favoredNodes = favoredNodes;
+ this.openForReplay = openForReplay;
+ }
+
+ protected void setFailedOpen(final boolean failedOpen) {
+ this.failedOpen = failedOpen;
+ }
+
+ public boolean isFailedOpen() {
+ return failedOpen;
+ }
+
+ public OpenRegionRequest.RegionOpenInfo buildRegionOpenInfoRequest(
+ final MasterProcedureEnv env) {
+ return RequestConverter.buildRegionOpenInfo(getRegionInfo(),
+ env.getAssignmentManager().getFavoredNodes(getRegionInfo()), false);
+ }
+ }
+
+ public static class RegionCloseOperation extends RegionOperation {
+ private final ServerName destinationServer;
+ private boolean closed = false;
+
+ public RegionCloseOperation(final RemoteProcedure remoteProcedure,
+ final HRegionInfo regionInfo, final ServerName destinationServer) {
+ super(remoteProcedure, regionInfo);
+ this.destinationServer = destinationServer;
+ }
+
+ public ServerName getDestinationServer() {
+ return destinationServer;
+ }
+
+ protected void setClosed(final boolean closed) {
+ this.closed = closed;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public CloseRegionRequest buildCloseRegionRequest(final ServerName serverName) {
+ return ProtobufUtil.buildCloseRegionRequest(serverName,
+ getRegionInfo().getRegionName(), getDestinationServer());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java
index 21709f8..cfd9df9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MetricsSnapshot;
-import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -416,17 +415,7 @@ public class RestoreSnapshotProcedure
try {
Connection conn = env.getMasterServices().getConnection();
- // 1. Forces all the RegionStates to be offline
- //
- // The AssignmentManager keeps all the region states around
- // with no possibility to remove them, until the master is restarted.
- // This means that a region marked as SPLIT before the restore will never be assigned again.
- // To avoid having all states around all the regions are switched to the OFFLINE state,
- // which is the same state that the regions will be after a delete table.
- forceRegionsOffline(env, regionsToAdd);
- forceRegionsOffline(env, regionsToRestore);
- forceRegionsOffline(env, regionsToRemove);
-
+ // 1. Prepare to restore
getMonitorStatus().setStatus("Preparing to restore each region");
// 2. Applies changes to hbase:meta
@@ -496,20 +485,6 @@ public class RestoreSnapshotProcedure
}
/**
- * Make sure that region states of the region list is in OFFLINE state.
- * @param env MasterProcedureEnv
- * @param hris region info list
- **/
- private void forceRegionsOffline(final MasterProcedureEnv env, final List<HRegionInfo> hris) {
- RegionStates states = env.getMasterServices().getAssignmentManager().getRegionStates();
- if (hris != null) {
- for (HRegionInfo hri: hris) {
- states.regionOffline(hri);
- }
- }
- }
-
- /**
* The procedure could be restarted from a different machine. If the variable is null, we need to
* retrieve it.
* @return traceEnabled
http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashException.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashException.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashException.java
new file mode 100644
index 0000000..ca351f69
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashException.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Passed as Exception by {@link ServerCrashProcedure}
+ * notifying on-going RIT that server has failed.
+ */
+@InterfaceAudience.Private
+@SuppressWarnings("serial")
+public class ServerCrashException extends HBaseIOException {
+ private final long procId;
+ private final ServerName serverName;
+
+ /**
+ * @param serverName The server that crashed.
+ */
+ public ServerCrashException(long procId, ServerName serverName) {
+ this.procId = procId;
+ this.serverName = serverName;
+ }
+
+ @Override
+ public String getMessage() {
+ return "ServerCrashProcedure pid=" + this.procId + ", server=" + this.serverName;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
index 2703947..71c6b89 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
@@ -19,55 +19,40 @@ package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.io.InputStream;
-import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
-import java.util.Set;
-import java.util.concurrent.locks.Lock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
-import org.apache.hadoop.hbase.client.TableState;
-import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.MasterWalManager;
-import org.apache.hadoop.hbase.master.RegionState;
-import org.apache.hadoop.hbase.master.RegionStates;
+import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
+import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
+import org.apache.hadoop.hbase.master.assignment.RegionTransitionProcedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ServerCrashState;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.zookeeper.KeeperException;
/**
* Handle crashed server. This is a port to ProcedureV2 of what used to be euphemistically called
* ServerShutdownHandler.
*
- * <p>The procedure flow varies dependent on whether meta is assigned, if we are
- * doing distributed log replay versus distributed log splitting, and if we are to split logs at
- * all.
- *
- * <p>This procedure asks that all crashed servers get processed equally; we yield after the
- * completion of each successful flow step. We do this so that we do not 'deadlock' waiting on
- * a region assignment so we can replay edits which could happen if a region moved there are edits
- * on two servers for replay.
+ * <p>The procedure flow varies dependent on whether meta is assigned and if we are to split logs.
*
- * <p>TODO: ASSIGN and WAIT_ON_ASSIGN (at least) are not idempotent. Revisit when assign is pv2.
- * TODO: We do not have special handling for system tables.
+ * <p>We come in here after ServerManager has noticed a server has expired. Procedures
+ * queued on the rpc should have been notified about fail and should be concurrently
+ * getting themselves ready to assign elsewhere.
*/
public class ServerCrashProcedure
extends StateMachineProcedure<MasterProcedureEnv, ServerCrashState>
@@ -75,36 +60,6 @@ implements ServerProcedureInterface {
private static final Log LOG = LogFactory.getLog(ServerCrashProcedure.class);
/**
- * Configuration key to set how long to wait in ms doing a quick check on meta state.
- */
- public static final String KEY_SHORT_WAIT_ON_META =
- "hbase.master.servercrash.short.wait.on.meta.ms";
-
- public static final int DEFAULT_SHORT_WAIT_ON_META = 1000;
-
- /**
- * Configuration key to set how many retries to cycle before we give up on meta.
- * Each attempt will wait at least {@link #KEY_SHORT_WAIT_ON_META} milliseconds.
- */
- public static final String KEY_RETRIES_ON_META =
- "hbase.master.servercrash.meta.retries";
-
- public static final int DEFAULT_RETRIES_ON_META = 10;
-
- /**
- * Configuration key to set how long to wait in ms on regions in transition.
- */
- public static final String KEY_WAIT_ON_RIT =
- "hbase.master.servercrash.wait.on.rit.ms";
-
- public static final int DEFAULT_WAIT_ON_RIT = 30000;
-
- private static final Set<HRegionInfo> META_REGION_SET = new HashSet<>();
- static {
- META_REGION_SET.add(HRegionInfo.FIRST_META_REGIONINFO);
- }
-
- /**
* Name of the crashed server to process.
*/
private ServerName serverName;
@@ -117,14 +72,8 @@ implements ServerProcedureInterface {
/**
* Regions that were on the crashed server.
*/
- private Set<HRegionInfo> regionsOnCrashedServer;
+ private List<HRegionInfo> regionsOnCrashedServer;
- /**
- * Regions assigned. Usually some subset of {@link #regionsOnCrashedServer}.
- */
- private List<HRegionInfo> regionsAssigned;
-
- private boolean distributedLogReplay = false;
private boolean carryingMeta = false;
private boolean shouldSplitWal;
@@ -164,20 +113,11 @@ implements ServerProcedureInterface {
super();
}
- private void throwProcedureYieldException(final String msg) throws ProcedureYieldException {
- String logMsg = msg + "; cycle=" + this.cycles + ", running for " +
- StringUtils.formatTimeDiff(System.currentTimeMillis(), getSubmittedTime());
- // The procedure executor logs ProcedureYieldException at trace level. For now, log these
- // yields for server crash processing at DEBUG. Revisit when stable.
- if (LOG.isDebugEnabled()) LOG.debug(logMsg);
- throw new ProcedureYieldException(logMsg);
- }
-
@Override
protected Flow executeFromState(MasterProcedureEnv env, ServerCrashState state)
- throws ProcedureYieldException {
+ throws ProcedureSuspendedException, ProcedureYieldException {
if (LOG.isTraceEnabled()) {
- LOG.trace(state);
+ LOG.trace(state + " " + this + "; cycles=" + this.cycles);
}
// Keep running count of cycles
if (state.ordinal() != this.previousState) {
@@ -186,11 +126,7 @@ implements ServerProcedureInterface {
} else {
this.cycles++;
}
- MasterServices services = env.getMasterServices();
- // Is master fully online? If not, yield. No processing of servers unless master is up
- if (!services.getAssignmentManager().isFailoverCleanupDone()) {
- throwProcedureYieldException("Waiting on master failover to complete");
- }
+ final MasterServices services = env.getMasterServices();
// HBASE-14802
// If we have not yet notified that we are processing a dead server, we should do now.
if (!notifiedDeadServer) {
@@ -201,102 +137,61 @@ implements ServerProcedureInterface {
try {
switch (state) {
case SERVER_CRASH_START:
- LOG.info("Start processing crashed " + this.serverName);
+ LOG.info("Start " + this);
start(env);
// If carrying meta, process it first. Else, get list of regions on crashed server.
- if (this.carryingMeta) setNextState(ServerCrashState.SERVER_CRASH_PROCESS_META);
- else setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
+ if (this.carryingMeta) {
+ setNextState(ServerCrashState.SERVER_CRASH_PROCESS_META);
+ } else {
+ setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
+ }
break;
case SERVER_CRASH_GET_REGIONS:
// If hbase:meta is not assigned, yield.
- if (!isMetaAssignedQuickTest(env)) {
- // isMetaAssignedQuickTest does not really wait. Let's delay a little before
- // another round of execution.
- long wait =
- env.getMasterConfiguration().getLong(KEY_SHORT_WAIT_ON_META,
- DEFAULT_SHORT_WAIT_ON_META);
- wait = wait / 10;
- Thread.sleep(wait);
- throwProcedureYieldException("Waiting on hbase:meta assignment");
+ if (env.getAssignmentManager().waitMetaInitialized(this)) {
+ throw new ProcedureSuspendedException();
}
- this.regionsOnCrashedServer =
- services.getAssignmentManager().getRegionStates().getServerRegions(this.serverName);
- // Where to go next? Depends on whether we should split logs at all or if we should do
- // distributed log splitting (DLS) vs distributed log replay (DLR).
+
+ this.regionsOnCrashedServer = services.getAssignmentManager().getRegionStates()
+ .getServerRegionInfoSet(serverName);
+ // Where to go next? Depends on whether we should split logs at all or
+ // if we should do distributed log splitting.
if (!this.shouldSplitWal) {
setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
- } else if (this.distributedLogReplay) {
- setNextState(ServerCrashState.SERVER_CRASH_PREPARE_LOG_REPLAY);
} else {
setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS);
}
break;
case SERVER_CRASH_PROCESS_META:
- // If we fail processing hbase:meta, yield.
- if (!processMeta(env)) {
- throwProcedureYieldException("Waiting on regions-in-transition to clear");
- }
+ processMeta(env);
setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
break;
- case SERVER_CRASH_PREPARE_LOG_REPLAY:
- prepareLogReplay(env, this.regionsOnCrashedServer);
- setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
- break;
-
case SERVER_CRASH_SPLIT_LOGS:
splitLogs(env);
- // If DLR, go to FINISH. Otherwise, if DLS, go to SERVER_CRASH_CALC_REGIONS_TO_ASSIGN
- if (this.distributedLogReplay) setNextState(ServerCrashState.SERVER_CRASH_FINISH);
- else setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
+ setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
break;
case SERVER_CRASH_ASSIGN:
- List<HRegionInfo> regionsToAssign = calcRegionsToAssign(env);
-
- // Assign may not be idempotent. SSH used to requeue the SSH if we got an IOE assigning
- // which is what we are mimicing here but it looks prone to double assignment if assign
- // fails midway. TODO: Test.
-
// If no regions to assign, skip assign and skip to the finish.
- boolean regions = regionsToAssign != null && !regionsToAssign.isEmpty();
- if (regions) {
- this.regionsAssigned = regionsToAssign;
- if (!assign(env, regionsToAssign)) {
- throwProcedureYieldException("Failed assign; will retry");
+ // Filter out meta regions. Those are handled elsewhere in this procedure.
+ // Filter changes this.regionsOnCrashedServer.
+ if (filterDefaultMetaRegions(regionsOnCrashedServer)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Assigning regions " +
+ HRegionInfo.getShortNameToLog(regionsOnCrashedServer) + ", " + this +
+ "; cycles=" + this.cycles);
}
+ handleRIT(env, regionsOnCrashedServer);
+ addChildProcedure(env.getAssignmentManager().
+ createAssignProcedures(regionsOnCrashedServer, true));
}
- if (this.shouldSplitWal && distributedLogReplay) {
- // Take this route even if there are apparently no regions assigned. This may be our
- // second time through here; i.e. we assigned and crashed just about here. On second
- // time through, there will be no regions because we assigned them in the previous step.
- // Even though no regions, we need to go through here to clean up the DLR zk markers.
- setNextState(ServerCrashState.SERVER_CRASH_WAIT_ON_ASSIGN);
- } else {
- setNextState(ServerCrashState.SERVER_CRASH_FINISH);
- }
- break;
-
- case SERVER_CRASH_WAIT_ON_ASSIGN:
- // TODO: The list of regionsAssigned may be more than we actually assigned. See down in
- // AM #1629 around 'if (regionStates.wasRegionOnDeadServer(encodedName)) {' where where we
- // will skip assigning a region because it is/was on a dead server. Should never happen!
- // It was on this server. Worst comes to worst, we'll still wait here till other server is
- // processed.
-
- // If the wait on assign failed, yield -- if we have regions to assign.
- if (this.regionsAssigned != null && !this.regionsAssigned.isEmpty()) {
- if (!waitOnAssign(env, this.regionsAssigned)) {
- throwProcedureYieldException("Waiting on region assign");
- }
- }
- setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS);
+ setNextState(ServerCrashState.SERVER_CRASH_FINISH);
break;
case SERVER_CRASH_FINISH:
- LOG.info("Finished processing of crashed " + serverName);
services.getServerManager().getDeadServers().finish(serverName);
return Flow.NO_MORE_STATE;
@@ -304,11 +199,7 @@ implements ServerProcedureInterface {
throw new UnsupportedOperationException("unhandled state=" + state);
}
} catch (IOException e) {
- LOG.warn("Failed serverName=" + this.serverName + ", state=" + state + "; retry", e);
- } catch (InterruptedException e) {
- // TODO: Make executor allow IEs coming up out of execute.
- LOG.warn("Interrupted serverName=" + this.serverName + ", state=" + state + "; retry", e);
- Thread.currentThread().interrupt();
+ LOG.warn("Failed state=" + state + ", retry " + this + "; cycles=" + this.cycles, e);
}
return Flow.HAS_MORE_STATE;
}
@@ -318,96 +209,60 @@ implements ServerProcedureInterface {
* @param env
* @throws IOException
*/
- private void start(final MasterProcedureEnv env) throws IOException {
- MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
- // Set recovery mode late. This is what the old ServerShutdownHandler used do.
- mwm.setLogRecoveryMode();
- this.distributedLogReplay = mwm.getLogRecoveryMode() == RecoveryMode.LOG_REPLAY;
- }
+ private void start(final MasterProcedureEnv env) throws IOException {}
/**
* @param env
- * @return False if we fail to assign and split logs on meta ('process').
* @throws IOException
* @throws InterruptedException
*/
- private boolean processMeta(final MasterProcedureEnv env)
- throws IOException {
+ private void processMeta(final MasterProcedureEnv env) throws IOException {
if (LOG.isDebugEnabled()) LOG.debug("Processing hbase:meta that was on " + this.serverName);
- MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
- AssignmentManager am = env.getMasterServices().getAssignmentManager();
- HRegionInfo metaHRI = HRegionInfo.FIRST_META_REGIONINFO;
+
if (this.shouldSplitWal) {
- if (this.distributedLogReplay) {
- prepareLogReplay(env, META_REGION_SET);
- } else {
- // TODO: Matteo. We BLOCK here but most important thing to be doing at this moment.
- mwm.splitMetaLog(serverName);
- am.getRegionStates().logSplit(metaHRI);
- }
+ // TODO: Matteo. We BLOCK here but most important thing to be doing at this moment.
+ env.getMasterServices().getMasterWalManager().splitMetaLog(serverName);
}
// Assign meta if still carrying it. Check again: region may be assigned because of RIT timeout
- boolean processed = true;
- if (am.isCarryingMeta(serverName)) {
- // TODO: May block here if hard time figuring state of meta.
- am.regionOffline(HRegionInfo.FIRST_META_REGIONINFO);
- verifyAndAssignMetaWithRetries(env);
- if (this.shouldSplitWal && distributedLogReplay) {
- int timeout = env.getMasterConfiguration().getInt(KEY_WAIT_ON_RIT, DEFAULT_WAIT_ON_RIT);
- if (!waitOnRegionToClearRegionsInTransition(am, metaHRI, timeout)) {
- processed = false;
- } else {
- // TODO: Matteo. We BLOCK here but most important thing to be doing at this moment.
- mwm.splitMetaLog(serverName);
- }
- }
+ final AssignmentManager am = env.getMasterServices().getAssignmentManager();
+ for (HRegionInfo hri: am.getRegionStates().getServerRegionInfoSet(serverName)) {
+ if (!isDefaultMetaRegion(hri)) continue;
+
+ am.offlineRegion(hri);
+ addChildProcedure(am.createAssignProcedure(hri, true));
}
- return processed;
}
- /**
- * @return True if region cleared RIT, else false if we timed out waiting.
- * @throws InterruptedIOException
- */
- private boolean waitOnRegionToClearRegionsInTransition(AssignmentManager am,
- final HRegionInfo hri, final int timeout)
- throws InterruptedIOException {
- try {
- if (!am.waitOnRegionToClearRegionsInTransition(hri, timeout)) {
- // Wait here is to avoid log replay hits current dead server and incur a RPC timeout
- // when replay happens before region assignment completes.
- LOG.warn("Region " + hri.getEncodedName() + " didn't complete assignment in time");
- return false;
+ private boolean filterDefaultMetaRegions(final List<HRegionInfo> regions) {
+ if (regions == null) return false;
+ final Iterator<HRegionInfo> it = regions.iterator();
+ while (it.hasNext()) {
+ final HRegionInfo hri = it.next();
+ if (isDefaultMetaRegion(hri)) {
+ it.remove();
}
- } catch (InterruptedException ie) {
- throw new InterruptedIOException("Caught " + ie +
- " during waitOnRegionToClearRegionsInTransition for " + hri);
}
- return true;
+ return !regions.isEmpty();
}
- private void prepareLogReplay(final MasterProcedureEnv env, final Set<HRegionInfo> regions)
- throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Mark " + size(this.regionsOnCrashedServer) + " regions-in-recovery from " +
- this.serverName);
- }
- MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
- AssignmentManager am = env.getMasterServices().getAssignmentManager();
- mwm.prepareLogReplay(this.serverName, regions);
- am.getRegionStates().logSplit(this.serverName);
+ private boolean isDefaultMetaRegion(final HRegionInfo hri) {
+ return hri.getTable().equals(TableName.META_TABLE_NAME) &&
+ RegionReplicaUtil.isDefaultReplica(hri);
}
private void splitLogs(final MasterProcedureEnv env) throws IOException {
if (LOG.isDebugEnabled()) {
- LOG.debug("Splitting logs from " + serverName + "; region count=" +
- size(this.regionsOnCrashedServer));
+ LOG.debug("Splitting WALs " + this);
}
MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
AssignmentManager am = env.getMasterServices().getAssignmentManager();
// TODO: For Matteo. Below BLOCKs!!!! Redo so can relinquish executor while it is running.
+ // PROBLEM!!! WE BLOCK HERE.
mwm.splitLog(this.serverName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Done splitting WALs " + this);
+ }
am.getRegionStates().logSplit(this.serverName);
}
@@ -415,124 +270,6 @@ implements ServerProcedureInterface {
return hris == null? 0: hris.size();
}
- /**
- * Figure out what we need to assign. Should be idempotent.
- * @param env
- * @return List of calculated regions to assign; may be empty or null.
- * @throws IOException
- */
- private List<HRegionInfo> calcRegionsToAssign(final MasterProcedureEnv env)
- throws IOException {
- AssignmentManager am = env.getMasterServices().getAssignmentManager();
- List<HRegionInfo> regionsToAssignAggregator = new ArrayList<>();
- int replicaCount = env.getMasterConfiguration().getInt(HConstants.META_REPLICAS_NUM,
- HConstants.DEFAULT_META_REPLICA_NUM);
- for (int i = 1; i < replicaCount; i++) {
- HRegionInfo metaHri =
- RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, i);
- if (am.isCarryingMetaReplica(this.serverName, metaHri)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Reassigning meta replica" + metaHri + " that was on " + this.serverName);
- }
- regionsToAssignAggregator.add(metaHri);
- }
- }
- // Clean out anything in regions in transition.
- List<HRegionInfo> regionsInTransition = am.cleanOutCrashedServerReferences(serverName);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Reassigning " + size(this.regionsOnCrashedServer) +
- " region(s) that " + (serverName == null? "null": serverName) +
- " was carrying (and " + regionsInTransition.size() +
- " regions(s) that were opening on this server)");
- }
- regionsToAssignAggregator.addAll(regionsInTransition);
-
- // Iterate regions that were on this server and figure which of these we need to reassign
- if (this.regionsOnCrashedServer != null && !this.regionsOnCrashedServer.isEmpty()) {
- RegionStates regionStates = am.getRegionStates();
- for (HRegionInfo hri: this.regionsOnCrashedServer) {
- if (regionsInTransition.contains(hri)) continue;
- String encodedName = hri.getEncodedName();
- Lock lock = am.acquireRegionLock(encodedName);
- try {
- RegionState rit = regionStates.getRegionTransitionState(hri);
- if (processDeadRegion(hri, am)) {
- ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
- if (addressFromAM != null && !addressFromAM.equals(this.serverName)) {
- // If this region is in transition on the dead server, it must be
- // opening or pending_open, which should have been covered by
- // AM#cleanOutCrashedServerReferences
- LOG.info("Skip assigning " + hri.getRegionNameAsString()
- + " because opened on " + addressFromAM.getServerName());
- continue;
- }
- if (rit != null) {
- if (rit.getServerName() != null && !rit.isOnServer(this.serverName)) {
- // Skip regions that are in transition on other server
- LOG.info("Skip assigning region in transition on other server" + rit);
- continue;
- }
- LOG.info("Reassigning region " + rit + " and clearing zknode if exists");
- regionStates.updateRegionState(hri, RegionState.State.OFFLINE);
- } else if (regionStates.isRegionInState(
- hri, RegionState.State.SPLITTING_NEW, RegionState.State.MERGING_NEW)) {
- regionStates.updateRegionState(hri, RegionState.State.OFFLINE);
- }
- regionsToAssignAggregator.add(hri);
- // TODO: The below else if is different in branch-1 from master branch.
- } else if (rit != null) {
- if ((rit.isClosing() || rit.isFailedClose() || rit.isOffline())
- && am.getTableStateManager().isTableState(hri.getTable(),
- TableState.State.DISABLED, TableState.State.DISABLING) ||
- am.getReplicasToClose().contains(hri)) {
- // If the table was partially disabled and the RS went down, we should clear the
- // RIT and remove the node for the region.
- // The rit that we use may be stale in case the table was in DISABLING state
- // but though we did assign we will not be clearing the znode in CLOSING state.
- // Doing this will have no harm. See HBASE-5927
- regionStates.updateRegionState(hri, RegionState.State.OFFLINE);
- am.offlineDisabledRegion(hri);
- } else {
- LOG.warn("THIS SHOULD NOT HAPPEN: unexpected region in transition "
- + rit + " not to be assigned by SSH of server " + serverName);
- }
- }
- } finally {
- lock.unlock();
- }
- }
- }
- return regionsToAssignAggregator;
- }
-
- private boolean assign(final MasterProcedureEnv env, final List<HRegionInfo> hris)
- throws InterruptedIOException {
- AssignmentManager am = env.getMasterServices().getAssignmentManager();
- try {
- am.assign(hris);
- } catch (InterruptedException ie) {
- LOG.error("Caught " + ie + " during round-robin assignment");
- throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
- } catch (IOException ioe) {
- LOG.info("Caught " + ioe + " during region assignment, will retry");
- return false;
- }
- return true;
- }
-
- private boolean waitOnAssign(final MasterProcedureEnv env, final List<HRegionInfo> hris)
- throws InterruptedIOException {
- int timeout = env.getMasterConfiguration().getInt(KEY_WAIT_ON_RIT, DEFAULT_WAIT_ON_RIT);
- for (HRegionInfo hri: hris) {
- // TODO: Blocks here.
- if (!waitOnRegionToClearRegionsInTransition(env.getMasterServices().getAssignmentManager(),
- hri, timeout)) {
- return false;
- }
- }
- return true;
- }
-
@Override
protected void rollbackState(MasterProcedureEnv env, ServerCrashState state)
throws IOException {
@@ -580,11 +317,11 @@ implements ServerProcedureInterface {
@Override
public void toStringClassDetails(StringBuilder sb) {
sb.append(getClass().getSimpleName());
- sb.append(" serverName=");
- sb.append(this.serverName);
- sb.append(", shouldSplitWal=");
+ sb.append(" server=");
+ sb.append(serverName);
+ sb.append(", splitWal=");
sb.append(shouldSplitWal);
- sb.append(", carryingMeta=");
+ sb.append(", meta=");
sb.append(carryingMeta);
}
@@ -595,7 +332,6 @@ implements ServerProcedureInterface {
MasterProcedureProtos.ServerCrashStateData.Builder state =
MasterProcedureProtos.ServerCrashStateData.newBuilder().
setServerName(ProtobufUtil.toServerName(this.serverName)).
- setDistributedLogReplay(this.distributedLogReplay).
setCarryingMeta(this.carryingMeta).
setShouldSplitWal(this.shouldSplitWal);
if (this.regionsOnCrashedServer != null && !this.regionsOnCrashedServer.isEmpty()) {
@@ -603,11 +339,6 @@ implements ServerProcedureInterface {
state.addRegionsOnCrashedServer(HRegionInfo.convert(hri));
}
}
- if (this.regionsAssigned != null && !this.regionsAssigned.isEmpty()) {
- for (HRegionInfo hri: this.regionsAssigned) {
- state.addRegionsAssigned(HRegionInfo.convert(hri));
- }
- }
state.build().writeDelimitedTo(stream);
}
@@ -618,142 +349,16 @@ implements ServerProcedureInterface {
MasterProcedureProtos.ServerCrashStateData state =
MasterProcedureProtos.ServerCrashStateData.parseDelimitedFrom(stream);
this.serverName = ProtobufUtil.toServerName(state.getServerName());
- this.distributedLogReplay = state.hasDistributedLogReplay()?
- state.getDistributedLogReplay(): false;
this.carryingMeta = state.hasCarryingMeta()? state.getCarryingMeta(): false;
// shouldSplitWAL has a default over in pb so this invocation will always work.
this.shouldSplitWal = state.getShouldSplitWal();
int size = state.getRegionsOnCrashedServerCount();
if (size > 0) {
- this.regionsOnCrashedServer = new HashSet<>(size);
+ this.regionsOnCrashedServer = new ArrayList<HRegionInfo>(size);
for (RegionInfo ri: state.getRegionsOnCrashedServerList()) {
this.regionsOnCrashedServer.add(HRegionInfo.convert(ri));
}
}
- size = state.getRegionsAssignedCount();
- if (size > 0) {
- this.regionsAssigned = new ArrayList<>(size);
- for (RegionInfo ri: state.getRegionsOnCrashedServerList()) {
- this.regionsAssigned.add(HRegionInfo.convert(ri));
- }
- }
- }
-
- /**
- * Process a dead region from a dead RS. Checks if the region is disabled or
- * disabling or if the region has a partially completed split.
- * @param hri
- * @param assignmentManager
- * @return Returns true if specified region should be assigned, false if not.
- * @throws IOException
- */
- private static boolean processDeadRegion(HRegionInfo hri, AssignmentManager assignmentManager)
- throws IOException {
- boolean tablePresent = assignmentManager.getTableStateManager().isTablePresent(hri.getTable());
- if (!tablePresent) {
- LOG.info("The table " + hri.getTable() + " was deleted. Hence not proceeding.");
- return false;
- }
- // If table is not disabled but the region is offlined,
- boolean disabled = assignmentManager.getTableStateManager().isTableState(hri.getTable(),
- TableState.State.DISABLED);
- if (disabled){
- LOG.info("The table " + hri.getTable() + " was disabled. Hence not proceeding.");
- return false;
- }
- if (hri.isOffline() && hri.isSplit()) {
- // HBASE-7721: Split parent and daughters are inserted into hbase:meta as an atomic operation.
- // If the meta scanner saw the parent split, then it should see the daughters as assigned
- // to the dead server. We don't have to do anything.
- return false;
- }
- boolean disabling = assignmentManager.getTableStateManager().isTableState(hri.getTable(),
- TableState.State.DISABLING);
- if (disabling) {
- LOG.info("The table " + hri.getTable() + " is disabled. Hence not assigning region" +
- hri.getEncodedName());
- return false;
- }
- return true;
- }
-
- /**
- * If hbase:meta is not assigned already, assign.
- * @throws IOException
- */
- private void verifyAndAssignMetaWithRetries(final MasterProcedureEnv env) throws IOException {
- MasterServices services = env.getMasterServices();
- int iTimes = services.getConfiguration().getInt(KEY_RETRIES_ON_META, DEFAULT_RETRIES_ON_META);
- // Just reuse same time as we have for short wait on meta. Adding another config is overkill.
- long waitTime =
- services.getConfiguration().getLong(KEY_SHORT_WAIT_ON_META, DEFAULT_SHORT_WAIT_ON_META);
- int iFlag = 0;
- while (true) {
- try {
- verifyAndAssignMeta(env);
- break;
- } catch (KeeperException e) {
- services.abort("In server shutdown processing, assigning meta", e);
- throw new IOException("Aborting", e);
- } catch (Exception e) {
- if (iFlag >= iTimes) {
- services.abort("verifyAndAssignMeta failed after" + iTimes + " retries, aborting", e);
- throw new IOException("Aborting", e);
- }
- try {
- Thread.sleep(waitTime);
- } catch (InterruptedException e1) {
- LOG.warn("Interrupted when is the thread sleep", e1);
- Thread.currentThread().interrupt();
- throw (InterruptedIOException)new InterruptedIOException().initCause(e1);
- }
- iFlag++;
- }
- }
- }
-
- /**
- * If hbase:meta is not assigned already, assign.
- * @throws InterruptedException
- * @throws IOException
- * @throws KeeperException
- */
- private void verifyAndAssignMeta(final MasterProcedureEnv env)
- throws InterruptedException, IOException, KeeperException {
- MasterServices services = env.getMasterServices();
- if (!isMetaAssignedQuickTest(env)) {
- services.getAssignmentManager().assignMeta(HRegionInfo.FIRST_META_REGIONINFO);
- } else if (serverName.equals(services.getMetaTableLocator().
- getMetaRegionLocation(services.getZooKeeper()))) {
- throw new IOException("hbase:meta is onlined on the dead server " + this.serverName);
- } else {
- LOG.info("Skip assigning hbase:meta because it is online at "
- + services.getMetaTableLocator().getMetaRegionLocation(services.getZooKeeper()));
- }
- }
-
- /**
- * A quick test that hbase:meta is assigned; blocks for short time only.
- * @return True if hbase:meta location is available and verified as good.
- * @throws InterruptedException
- * @throws IOException
- */
- private boolean isMetaAssignedQuickTest(final MasterProcedureEnv env)
- throws InterruptedException, IOException {
- ZooKeeperWatcher zkw = env.getMasterServices().getZooKeeper();
- MetaTableLocator mtl = env.getMasterServices().getMetaTableLocator();
- boolean metaAssigned = false;
- // Is hbase:meta location available yet?
- if (mtl.isLocationAvailable(zkw)) {
- ClusterConnection connection = env.getMasterServices().getClusterConnection();
- // Is hbase:meta location good yet?
- long timeout =
- env.getMasterConfiguration().getLong(KEY_SHORT_WAIT_ON_META, DEFAULT_SHORT_WAIT_ON_META);
- if (mtl.verifyMetaRegionLocation(connection, zkw, timeout)) {
- metaAssigned = true;
- }
- }
- return metaAssigned;
}
@Override
@@ -789,4 +394,46 @@ implements ServerProcedureInterface {
// the client does not know about this procedure.
return false;
}
-}
+
+ /**
+ * Handle any outstanding RIT that are up against this.serverName, the crashed server.
+ * Notify them of crash. Remove assign entries from the passed in <code>regions</code>
+ * otherwise we have two assigns going on and they will fight over who has lock.
+ * Notify Unassigns also.
+ * @param crashedServer Server that crashed.
+ * @param regions Regions that were on crashed server
+ * @return Subset of <code>regions</code> that were RIT against <code>crashedServer</code>
+ */
+ private void handleRIT(final MasterProcedureEnv env, final List<HRegionInfo> regions) {
+ if (regions == null) return;
+ AssignmentManager am = env.getMasterServices().getAssignmentManager();
+ final Iterator<HRegionInfo> it = regions.iterator();
+ ServerCrashException sce = null;
+ while (it.hasNext()) {
+ final HRegionInfo hri = it.next();
+ RegionTransitionProcedure rtp = am.getRegionStates().getRegionTransitionProcedure(hri);
+ if (rtp == null) continue;
+ // Make sure the RIT is against this crashed server. In the case where there are many
+ // processings of a crashed server -- backed up for whatever reason (slow WAL split) --
+ // then a previous SCP may have already failed an assign, etc., and it may have a new
+ // location target; DO NOT fail these else we make for assign flux.
+ ServerName rtpServerName = rtp.getServer(env);
+ if (rtpServerName == null) {
+ LOG.warn("RIT with ServerName null! " + rtp);
+ continue;
+ }
+ if (!rtpServerName.equals(this.serverName)) continue;
+ LOG.info("pid=" + getProcId() + " found RIT " + rtp + "; " +
+ rtp.getRegionState(env).toShortString());
+ // Notify RIT on server crash.
+ if (sce == null) {
+ sce = new ServerCrashException(getProcId(), getServerName());
+ }
+ rtp.remoteCallFailed(env, this.serverName, sce);
+ if (rtp instanceof AssignProcedure) {
+ // If an assign, include it in our return and remove from passed-in list of regions.
+ it.remove();
+ }
+ }
+ }
+}
\ No newline at end of file