You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ti...@apache.org on 2018/12/18 08:57:43 UTC

hbase git commit: HBASE-21565 Delete dead server from dead server list too early leads to concurrent Server Crash Procedures(SCP) for a same server

Repository: hbase
Updated Branches:
  refs/heads/master f78284685 -> c448604ce


HBASE-21565 Delete dead server from dead server list too early leads to concurrent Server Crash Procedures(SCP) for a same server


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c448604c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c448604c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c448604c

Branch: refs/heads/master
Commit: c448604ceb987d113913f0583452b2abce04db0d
Parents: f782846
Author: Jingyun Tian <ti...@gmail.com>
Authored: Mon Dec 17 19:32:23 2018 +0800
Committer: Jingyun Tian <ti...@apache.org>
Committed: Tue Dec 18 16:57:11 2018 +0800

----------------------------------------------------------------------
 .../hbase/master/RegionServerTracker.java       |  3 +
 .../hadoop/hbase/master/ServerManager.java      | 25 ++++----
 .../master/assignment/AssignmentManager.java    | 28 ++++++---
 .../hbase/master/assignment/RegionStates.java   |  3 +-
 .../hbase/master/assignment/ServerState.java    |  2 +-
 .../master/assignment/ServerStateNode.java      |  2 +-
 .../master/procedure/ServerCrashProcedure.java  | 16 ++---
 .../hadoop/hbase/HBaseTestingUtility.java       |  7 ++-
 .../hadoop/hbase/master/TestRestartCluster.java | 65 ++++++++++++++++++++
 .../procedure/TestServerCrashProcedure.java     | 38 ++++++++++++
 10 files changed, 155 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java
index f419732..9d33a21 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java
@@ -128,6 +128,9 @@ public class RegionServerTracker extends ZKListener {
     // '-SPLITTING'. Each splitting server should have a corresponding SCP. Log if not.
     splittingServersFromWALDir.stream().filter(s -> !deadServersFromPE.contains(s)).
       forEach(s -> LOG.error("{} has no matching ServerCrashProcedure", s));
+    //create ServerNode for all possible live servers from wal directory
+    liveServersFromWALDir.stream()
+        .forEach(sn -> server.getAssignmentManager().getRegionStates().getOrCreateServer(sn));
     watcher.registerListener(this);
     synchronized (this) {
       List<String> servers =

http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index dc76d72..86d72d1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -602,19 +602,22 @@ public class ServerManager {
       return false;
     }
     LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
-    master.getAssignmentManager().submitServerCrash(serverName, true);
-
-    // Tell our listeners that a server was removed
-    if (!this.listeners.isEmpty()) {
-      for (ServerListener listener : this.listeners) {
-        listener.serverRemoved(serverName);
+    long pid = master.getAssignmentManager().submitServerCrash(serverName, true);
+    if(pid <= 0) {
+      return false;
+    } else {
+      // Tell our listeners that a server was removed
+      if (!this.listeners.isEmpty()) {
+        for (ServerListener listener : this.listeners) {
+          listener.serverRemoved(serverName);
+        }
       }
+      // trigger a persist of flushedSeqId
+      if (flushedSeqIdFlusher != null) {
+        flushedSeqIdFlusher.triggerNow();
+      }
+      return true;
     }
-    // trigger a persist of flushedSeqId
-    if (flushedSeqIdFlusher != null) {
-      flushedSeqIdFlusher.triggerNow();
-    }
-    return true;
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
index a564ea9..b7c2203 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
@@ -1343,24 +1343,36 @@ public class AssignmentManager {
   public long submitServerCrash(ServerName serverName, boolean shouldSplitWal) {
     boolean carryingMeta;
     long pid;
-    ServerStateNode serverNode = regionStates.getOrCreateServer(serverName);
+    ServerStateNode serverNode = regionStates.getServerNode(serverName);
+    if(serverNode == null){
+      LOG.info("Skip to add SCP for {} since this server should be OFFLINE already", serverName);
+      return -1;
+    }
     // we hold the write lock here for fencing on reportRegionStateTransition. Once we set the
     // server state to CRASHED, we will no longer accept the reportRegionStateTransition call from
     // this server. This is used to simplify the implementation for TRSP and SCP, where we can make
     // sure that, the region list fetched by SCP will not be changed any more.
     serverNode.writeLock().lock();
     try {
-      serverNode.setState(ServerState.CRASHED);
-      carryingMeta = isCarryingMeta(serverName);
       ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
-      pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(), serverName,
-        shouldSplitWal, carryingMeta));
+      carryingMeta = isCarryingMeta(serverName);
+      if (!serverNode.isInState(ServerState.ONLINE)) {
+        LOG.info(
+          "Skip to add SCP for {} with meta= {}, " +
+              "since there should be a SCP is processing or already done for this server node",
+          serverName, carryingMeta);
+        return -1;
+      } else {
+        serverNode.setState(ServerState.CRASHED);
+        pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(),
+            serverName, shouldSplitWal, carryingMeta));
+        LOG.info(
+          "Added {} to dead servers which carryingMeta={}, submitted ServerCrashProcedure pid={}",
+          serverName, carryingMeta, pid);
+      }
     } finally {
       serverNode.writeLock().unlock();
     }
-    LOG.info(
-      "Added {} to dead servers which carryingMeta={}, submitted ServerCrashProcedure pid={}",
-      serverName, carryingMeta, pid);
     return pid;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java
index 7b85409..1470a5a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java
@@ -738,7 +738,8 @@ public class RegionStates {
     serverMap.remove(serverName);
   }
 
-  ServerStateNode getServerNode(final ServerName serverName) {
+  @VisibleForTesting
+  public ServerStateNode getServerNode(final ServerName serverName) {
     return serverMap.get(serverName);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerState.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerState.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerState.java
index 3efe6e2..c86a60e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerState.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerState.java
@@ -23,7 +23,7 @@ import org.apache.yetus.audience.InterfaceAudience;
  * Server State.
  */
 @InterfaceAudience.Private
-enum ServerState {
+public enum ServerState {
   /**
    * Initial state. Available.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java
index 6f763aa..11883db 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/ServerStateNode.java
@@ -32,7 +32,7 @@ import org.apache.yetus.audience.InterfaceAudience;
  * State of Server; list of hosted regions, etc.
  */
 @InterfaceAudience.Private
-class ServerStateNode implements Comparable<ServerStateNode> {
+public class ServerStateNode implements Comparable<ServerStateNode> {
 
   private final Set<RegionStateNode> regions;
   private final ServerName serverName;

http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
index b93f8fa..05bcd28 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
@@ -333,17 +333,6 @@ public class ServerCrashProcedure
     return ServerOperationType.CRASH_HANDLER;
   }
 
-  /**
-   * For this procedure, yield at end of each successful flow step so that all crashed servers
-   * can make progress rather than do the default which has each procedure running to completion
-   * before we move to the next. For crashed servers, especially if running with distributed log
-   * replay, we will want all servers to come along; we do not want the scenario where a server is
-   * stuck waiting for regions to online so it can replay edits.
-   */
-  @Override
-  protected boolean isYieldBeforeExecuteFromState(MasterProcedureEnv env, ServerCrashState state) {
-    return true;
-  }
 
   @Override
   protected boolean shouldWaitClientAck(MasterProcedureEnv env) {
@@ -390,4 +379,9 @@ public class ServerCrashProcedure
   protected ProcedureMetrics getProcedureMetrics(MasterProcedureEnv env) {
     return env.getMasterServices().getMasterMetrics().getServerCrashProcMetrics();
   }
+
+  @Override
+  protected boolean holdLock(MasterProcedureEnv env) {
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 0cd5a22..7bfbfe1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -1187,6 +1187,11 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
    * @param servers number of region servers
    */
   public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
+    this.restartHBaseCluster(servers, null);
+  }
+
+  public void restartHBaseCluster(int servers, List<Integer> ports)
+      throws IOException, InterruptedException {
     if (hbaseAdmin != null) {
       hbaseAdmin.close();
       hbaseAdmin = null;
@@ -1195,7 +1200,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
       this.connection.close();
       this.connection = null;
     }
-    this.hbaseCluster = new MiniHBaseCluster(this.conf, servers);
+    this.hbaseCluster = new MiniHBaseCluster(this.conf, 1, servers, ports, null, null);
     // Don't leave here till we've done a successful scan of the hbase:meta
     Connection conn = ConnectionFactory.createConnection(this.conf);
     Table t = conn.getTable(TableName.META_TABLE_NAME);

http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
index 4ba1876..e55e375 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
@@ -33,12 +35,18 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.assignment.ServerState;
+import org.apache.hadoop.hbase.master.assignment.ServerStateNode;
+import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
+import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.util.Threads;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -67,6 +75,63 @@ public class TestRestartCluster {
   }
 
   @Test
+  public void testClusterRestartFailOver() throws Exception {
+    UTIL.startMiniCluster(3);
+    UTIL.waitFor(60000, () -> UTIL.getMiniHBaseCluster().getMaster().isInitialized());
+    //wait for all SCPs finished
+    UTIL.waitFor(20000, () -> UTIL.getHBaseCluster().getMaster().getProcedures().stream()
+        .noneMatch(p -> p instanceof ServerCrashProcedure));
+    TableName tableName = TABLES[0];
+    ServerName testServer = UTIL.getHBaseCluster().getRegionServer(0).getServerName();
+    ServerStateNode serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager()
+        .getRegionStates().getServerNode(testServer);
+    Assert.assertNotNull(serverNode);
+    Assert.assertTrue("serverNode should be ONLINE when cluster runs normally",
+      serverNode.isInState(ServerState.ONLINE));
+    UTIL.createMultiRegionTable(tableName, FAMILY);
+    UTIL.waitTableEnabled(tableName);
+    Table table = UTIL.getConnection().getTable(tableName);
+    for (int i = 0; i < 100; i++) {
+      UTIL.loadTable(table, FAMILY);
+    }
+    List<Integer> ports =
+        UTIL.getHBaseCluster().getMaster().getServerManager().getOnlineServersList().stream()
+            .map(serverName -> serverName.getPort()).collect(Collectors.toList());
+    LOG.info("Shutting down cluster");
+    UTIL.getHBaseCluster().killAll();
+    UTIL.getHBaseCluster().waitUntilShutDown();
+    LOG.info("Starting cluster the second time");
+    UTIL.restartHBaseCluster(3, ports);
+    UTIL.waitFor(10000, () -> UTIL.getHBaseCluster().getMaster().isInitialized());
+    serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
+        .getServerNode(testServer);
+    Assert.assertNotNull("serverNode should not be null when restart whole cluster", serverNode);
+    Assert.assertFalse(serverNode.isInState(ServerState.ONLINE));
+    LOG.info("start to find the procedure of SCP for the severName we choose");
+    UTIL.waitFor(20000,
+      () -> UTIL.getHBaseCluster().getMaster().getProcedures().stream()
+          .anyMatch(procedure -> (procedure instanceof ServerCrashProcedure)
+              && ((ServerCrashProcedure) procedure).getServerName().equals(testServer)));
+    Assert.assertFalse("serverNode should not be ONLINE during SCP processing",
+      serverNode.isInState(ServerState.ONLINE));
+    LOG.info("start to submit the SCP for the same serverName {} which should fail", testServer);
+    Assert.assertFalse(
+      UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(testServer));
+    Procedure procedure = UTIL.getHBaseCluster().getMaster().getProcedures().stream()
+        .filter(p -> (p instanceof ServerCrashProcedure)
+            && ((ServerCrashProcedure) p).getServerName().equals(testServer))
+        .findAny().get();
+    UTIL.waitFor(20000, () -> procedure.isFinished());
+    LOG.info("even when the SCP is finished, the duplicate SCP should not be scheduled for {}",
+      testServer);
+    Assert.assertFalse(
+      UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(testServer));
+    serverNode = UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
+        .getServerNode(testServer);
+    Assert.assertNull("serverNode should be deleted after SCP finished", serverNode);
+  }
+
+  @Test
   public void testClusterRestart() throws Exception {
     UTIL.startMiniCluster(3);
     while (!UTIL.getMiniHBaseCluster().getMaster().isInitialized()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/c448604c/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java
index 0e4a84b..af2076e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
+import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
 import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
@@ -170,6 +171,43 @@ public class TestServerCrashProcedure {
     }
   }
 
+  @Test
+  public void testConcurrentSCPForSameServer() throws Exception {
+    final TableName tableName = TableName.valueOf("testConcurrentSCPForSameServer");
+    try (Table t = createTable(tableName)) {
+      // Load the table with a bit of data so some logs to split and some edits in each region.
+      this.util.loadTable(t, HBaseTestingUtility.COLUMNS[0]);
+      final int count = util.countRows(t);
+      assertTrue("expected some rows", count > 0);
+      // find the first server that match the request and executes the test
+      ServerName rsToKill = null;
+      for (RegionInfo hri : util.getAdmin().getRegions(tableName)) {
+        final ServerName serverName = AssignmentTestingUtil.getServerHoldingRegion(util, hri);
+        if (AssignmentTestingUtil.isServerHoldingMeta(util, serverName) == true) {
+          rsToKill = serverName;
+          break;
+        }
+      }
+      HMaster master = util.getHBaseCluster().getMaster();
+      final ProcedureExecutor<MasterProcedureEnv> pExecutor = master.getMasterProcedureExecutor();
+      ServerCrashProcedure procB =
+          new ServerCrashProcedure(pExecutor.getEnvironment(), rsToKill, false, false);
+      AssignmentTestingUtil.killRs(util, rsToKill);
+      long procId = getSCPProcId(pExecutor);
+      Procedure procA = pExecutor.getProcedure(procId);
+      LOG.info("submit SCP procedureA");
+      util.waitFor(5000, () -> procA.hasLock());
+      LOG.info("procedureA acquired the lock");
+      assertEquals(Procedure.LockState.LOCK_EVENT_WAIT,
+          procB.acquireLock(pExecutor.getEnvironment()));
+      LOG.info("procedureB should not be able to get the lock");
+      util.waitFor(60000,
+        () -> procB.acquireLock(pExecutor.getEnvironment()) == Procedure.LockState.LOCK_ACQUIRED);
+      LOG.info("when procedure B get the lock, procedure A should be finished");
+      assertTrue(procA.isFinished());
+    }
+  }
+
   protected void assertReplicaDistributed(final Table t) {
     return;
   }