You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/12/31 12:45:54 UTC
[18/47] hbase git commit: HBASE-21565 Delete dead server from dead
server list too early leads to concurrent Server Crash Procedures(SCP) for a
same server
HBASE-21565 Delete dead server from dead server list too early leads to concurrent Server Crash Procedures(SCP) for a same server
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c448604c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c448604c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c448604c
Branch: refs/heads/HBASE-21512
Commit: c448604ceb987d113913f0583452b2abce04db0d
Parents: f782846
Author: Jingyun Tian <ti...@gmail.com>
Authored: Mon Dec 17 19:32:23 2018 +0800
Committer: Jingyun Tian <ti...@apache.org>
Committed: Tue Dec 18 16:57:11 2018 +0800
----------------------------------------------------------------------
.../hbase/master/RegionServerTracker.java | 3 +
.../hadoop/hbase/master/ServerManager.java | 25 ++++----
.../master/assignment/AssignmentManager.java | 28 ++++++---
.../hbase/master/assignment/RegionStates.java | 3 +-
.../hbase/master/assignment/ServerState.java | 2 +-
.../master/assignment/ServerStateNode.java | 2 +-
.../master/procedure/ServerCrashProcedure.java | 16 ++---
.../hadoop/hbase/HBaseTestingUtility.java | 7 ++-
.../hadoop/hbase/master/TestRestartCluster.java | 65 ++++++++++++++++++++
.../procedure/TestServerCrashProcedure.java | 38 ++++++++++++
10 files changed, 155 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java
index f419732..9d33a21 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java
@@ -128,6 +128,9 @@ public class RegionServerTracker extends ZKListener {
// '-SPLITTING'. Each splitting server should have a corresponding SCP. Log if not.
splittingServersFromWALDir.stream().filter(s -> !deadServersFromPE.contains(s)).
forEach(s -> LOG.error("{} has no matching ServerCrashProcedure", s));
+ //create ServerNode for all possible live servers from wal directory
+ liveServersFromWALDir.stream()
+ .forEach(sn -> server.getAssignmentManager().getRegionStates().getOrCreateServer(sn));
watcher.registerListener(this);
synchronized (this) {
List<String> servers =
http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index dc76d72..86d72d1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -602,19 +602,22 @@ public class ServerManager {
return false;
}
LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
- master.getAssignmentManager().submitServerCrash(serverName, true);
-
- // Tell our listeners that a server was removed
- if (!this.listeners.isEmpty()) {
- for (ServerListener listener : this.listeners) {
- listener.serverRemoved(serverName);
+ long pid = master.getAssignmentManager().submitServerCrash(serverName, true);
+ if(pid <= 0) {
+ return false;
+ } else {
+ // Tell our listeners that a server was removed
+ if (!this.listeners.isEmpty()) {
+ for (ServerListener listener : this.listeners) {
+ listener.serverRemoved(serverName);
+ }
}
+ // trigger a persist of flushedSeqId
+ if (flushedSeqIdFlusher != null) {
+ flushedSeqIdFlusher.triggerNow();
+ }
+ return true;
}
- // trigger a persist of flushedSeqId
- if (flushedSeqIdFlusher != null) {
- flushedSeqIdFlusher.triggerNow();
- }
- return true;
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
index a564ea9..b7c2203 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
@@ -1343,24 +1343,36 @@ public class AssignmentManager {
public long submitServerCrash(ServerName serverName, boolean shouldSplitWal) {
boolean carryingMeta;
long pid;
- ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
+ ServerStateNode serverNode = regionStates.getServerNode(serverName);
+ if(serverNode == null){
+ LOG.info("Skip to add SCP for {} since this server should be OFFLINE already", serverName);
+ return -1;
+ }
// we hold the write lock here for fencing on reportRegionStateTransition. Once we set the
// server state to CRASHED, we will no longer accept the reportRegionStateTransition call from
// this server. This is used to simplify the implementation for TRSP and SCP, where we can make
// sure that, the region list fetched by SCP will not be changed any more.
serverNode.writeLock().lock();
try {
- serverNode.setState(ServerState.CRASHED);
- carryingMeta = isCarryingMeta(serverName);
ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
- pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(), serverName,
- shouldSplitWal, carryingMeta));
+ carryingMeta = isCarryingMeta(serverName);
+ if (!serverNode.isInState(ServerState.ONLINE)) {
+ LOG.info(
+ "Skip to add SCP for {} with meta= {}, " +
+ "since there should be a SCP is processing or already done for this server node",
+ serverName, carryingMeta);
+ return -1;
+ } else {
+ serverNode.setState(ServerState.CRASHED);
+ pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(),
+ serverName, shouldSplitWal, carryingMeta));
+ LOG.info(
+ "Added {} to dead servers which carryingMeta={}, submitted ServerCrashProcedure pid={}",
+ serverName, carryingMeta, pid);
+ }
} finally {
serverNode.writeLock().unlock();
}
- LOG.info(
- "Added {} to dead servers which carryingMeta={}, submitted ServerCrashProcedure pid={}",
- serverName, carryingMeta, pid);
return pid;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java
index 7b85409..1470a5a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java
@@ -738,7 +738,8 @@ public class RegionStates {
serverMap.remove(serverName);
}
- ServerStateNode getServerNode(final ServerName serverName) {
+ @VisibleForTesting
+ public ServerStateNode getServerNode(final ServerName serverName) {
return serverMap.get(serverName);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerState.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerState.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerState.java
index 3efe6e2..c86a60e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerState.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerState.java
@@ -23,7 +23,7 @@ import org.apache.yetus.audience.InterfaceAudience;
* Server State.
*/
@InterfaceAudience.Private
-enum ServerState {
+public enum ServerState {
/**
* Initial state. Available.
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java
index 6f763aa..11883db 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java
@@ -32,7 +32,7 @@ import org.apache.yetus.audience.InterfaceAudience;
* State of Server; list of hosted regions, etc.
*/
@InterfaceAudience.Private
-class ServerStateNode implements Comparable<ServerStateNode> {
+public class ServerStateNode implements Comparable<ServerStateNode> {
private final Set<RegionStateNode> regions;
private final ServerName serverName;
http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
index b93f8fa..05bcd28 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
@@ -333,17 +333,6 @@ public class ServerCrashProcedure
return ServerOperationType.CRASH_HANDLER;
}
- /**
- * For this procedure, yield at end of each successful flow step so that all crashed servers
- * can make progress rather than do the default which has each procedure running to completion
- * before we move to the next. For crashed servers, especially if running with distributed log
- * replay, we will want all servers to come along; we do not want the scenario where a server is
- * stuck waiting for regions to online so it can replay edits.
- */
- @Override
- protected boolean isYieldBeforeExecuteFromState(MasterProcedureEnv env, ServerCrashState state) {
- return true;
- }
@Override
protected boolean shouldWaitClientAck(MasterProcedureEnv env) {
@@ -390,4 +379,9 @@ public class ServerCrashProcedure
protected ProcedureMetrics getProcedureMetrics(MasterProcedureEnv env) {
return env.getMasterServices().getMasterMetrics().getServerCrashProcMetrics();
}
+
+ @Override
+ protected boolean holdLock(MasterProcedureEnv env) {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 0cd5a22..7bfbfe1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -1187,6 +1187,11 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
* @param servers number of region servers
*/
public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
+ this.restartHBaseCluster(servers, null);
+ }
+
+ public void restartHBaseCluster(int servers, List<Integer> ports)
+ throws IOException, InterruptedException {
if (hbaseAdmin != null) {
hbaseAdmin.close();
hbaseAdmin = null;
@@ -1195,7 +1200,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
this.connection.close();
this.connection = null;
}
- this.hbaseCluster = new MiniHBaseCluster(this.conf, servers);
+ this.hbaseCluster = new MiniHBaseCluster(this.conf, 1, servers, ports, null, null);
// Don't leave here till we've done a successful scan of the hbase:meta
Connection conn = ConnectionFactory.createConnection(this.conf);
Table t = conn.getTable(TableName.META_TABLE_NAME);
http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
index 4ba1876..e55e375 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
@@ -33,12 +35,18 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.assignment.ServerState;
+import org.apache.hadoop.hbase.master.assignment.ServerStateNode;
+import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
+import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.After;
+import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -67,6 +75,63 @@ public class TestRestartCluster {
}
@Test
+ public void testClusterRestartFailOver() throws Exception {
+ UTIL.startMiniCluster(3);
+ UTIL.waitFor(60000, () -> UTIL.getMiniHBaseCluster().getMaster().isInitialized());
+ //wait for all SCPs finished
+ UTIL.waitFor(20000, () -> UTIL.getHBaseCluster().getMaster().getProcedures().stream()
+ .noneMatch(p -> p instanceof ServerCrashProcedure));
+ TableName tableName = TABLES[0];
+ ServerName testServer = UTIL.getHBaseCluster().getRegionServer(0).getServerName();
+ ServerStateNode serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager()
+ .getRegionStates().getServerNode(testServer);
+ Assert.assertNotNull(serverNode);
+ Assert.assertTrue("serverNode should be ONLINE when cluster runs normally",
+ serverNode.isInState(ServerState.ONLINE));
+ UTIL.createMultiRegionTable(tableName, FAMILY);
+ UTIL.waitTableEnabled(tableName);
+ Table table = UTIL.getConnection().getTable(tableName);
+ for (int i = 0; i < 100; i++) {
+ UTIL.loadTable(table, FAMILY);
+ }
+ List<Integer> ports =
+ UTIL.getHBaseCluster().getMaster().getServerManager().getOnlineServersList().stream()
+ .map(serverName -> serverName.getPort()).collect(Collectors.toList());
+ LOG.info("Shutting down cluster");
+ UTIL.getHBaseCluster().killAll();
+ UTIL.getHBaseCluster().waitUntilShutDown();
+ LOG.info("Starting cluster the second time");
+ UTIL.restartHBaseCluster(3, ports);
+ UTIL.waitFor(10000, () -> UTIL.getHBaseCluster().getMaster().isInitialized());
+ serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
+ .getServerNode(testServer);
+ Assert.assertNotNull("serverNode should not be null when restart whole cluster", serverNode);
+ Assert.assertFalse(serverNode.isInState(ServerState.ONLINE));
+ LOG.info("start to find the procedure of SCP for the severName we choose");
+ UTIL.waitFor(20000,
+ () -> UTIL.getHBaseCluster().getMaster().getProcedures().stream()
+ .anyMatch(procedure -> (procedure instanceof ServerCrashProcedure)
+ && ((ServerCrashProcedure) procedure).getServerName().equals(testServer)));
+ Assert.assertFalse("serverNode should not be ONLINE during SCP processing",
+ serverNode.isInState(ServerState.ONLINE));
+ LOG.info("start to submit the SCP for the same serverName {} which should fail", testServer);
+ Assert.assertFalse(
+ UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(testServer));
+ Procedure procedure = UTIL.getHBaseCluster().getMaster().getProcedures().stream()
+ .filter(p -> (p instanceof ServerCrashProcedure)
+ && ((ServerCrashProcedure) p).getServerName().equals(testServer))
+ .findAny().get();
+ UTIL.waitFor(20000, () -> procedure.isFinished());
+ LOG.info("even when the SCP is finished, the duplicate SCP should not be scheduled for {}",
+ testServer);
+ Assert.assertFalse(
+ UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(testServer));
+ serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
+ .getServerNode(testServer);
+ Assert.assertNull("serverNode should be deleted after SCP finished", serverNode);
+ }
+
+ @Test
public void testClusterRestart() throws Exception {
UTIL.startMiniCluster(3);
while (!UTIL.getMiniHBaseCluster().getMaster().isInitialized()) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java
index 0e4a84b..af2076e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
+import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
@@ -170,6 +171,43 @@ public class TestServerCrashProcedure {
}
}
+ @Test
+ public void testConcurrentSCPForSameServer() throws Exception {
+ final TableName tableName = TableName.valueOf("testConcurrentSCPForSameServer");
+ try (Table t = createTable(tableName)) {
+ // Load the table with a bit of data so some logs to split and some edits in each region.
+ this.util.loadTable(t, HBaseTestingUtility.COLUMNS[0]);
+ final int count = util.countRows(t);
+ assertTrue("expected some rows", count > 0);
+ // find the first server that match the request and executes the test
+ ServerName rsToKill = null;
+ for (RegionInfo hri : util.getAdmin().getRegions(tableName)) {
+ final ServerName serverName = AssignmentTestingUtil.getServerHoldingRegion(util, hri);
+ if (AssignmentTestingUtil.isServerHoldingMeta(util, serverName) == true) {
+ rsToKill = serverName;
+ break;
+ }
+ }
+ HMaster master = util.getHBaseCluster().getMaster();
+ final ProcedureExecutor<MasterProcedureEnv> pExecutor = master.getMasterProcedureExecutor();
+ ServerCrashProcedure procB =
+ new ServerCrashProcedure(pExecutor.getEnvironment(), rsToKill, false, false);
+ AssignmentTestingUtil.killRs(util, rsToKill);
+ long procId = getSCPProcId(pExecutor);
+ Procedure procA = pExecutor.getProcedure(procId);
+ LOG.info("submit SCP procedureA");
+ util.waitFor(5000, () -> procA.hasLock());
+ LOG.info("procedureA acquired the lock");
+ assertEquals(Procedure.LockState.LOCK_EVENT_WAIT,
+ procB.acquireLock(pExecutor.getEnvironment()));
+ LOG.info("procedureB should not be able to get the lock");
+ util.waitFor(60000,
+ () -> procB.acquireLock(pExecutor.getEnvironment()) == Procedure.LockState.LOCK_ACQUIRED);
+ LOG.info("when procedure B get the lock, procedure A should be finished");
+ assertTrue(procA.isFinished());
+ }
+ }
+
protected void assertReplicaDistributed(final Table t) {
return;
}