You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2020/01/03 04:25:44 UTC

[hbase] branch master updated: HBASE-23632 DeadServer cleanup (#979)

This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new abcb1ee  HBASE-23632 DeadServer cleanup (#979)
abcb1ee is described below

commit abcb1ee81bdd791a91d6f74ae3e1fc34aa5b15ae
Author: Michael Stack <sa...@users.noreply.github.com>
AuthorDate: Thu Jan 2 20:21:33 2020 -0800

    HBASE-23632 DeadServer cleanup (#979)
    
    Signed-off-by: Bharath Vissapragada <bh...@apache.org>
---
 .../org/apache/hadoop/hbase/master/DeadServer.java | 165 +++++++++------------
 .../apache/hadoop/hbase/master/ServerManager.java  |  28 ++--
 .../master/procedure/ServerCrashProcedure.java     |   2 +-
 .../apache/hadoop/hbase/master/TestDeadServer.java |  28 ++--
 .../hadoop/hbase/master/procedure/TestHBCKSCP.java |   5 +-
 5 files changed, 103 insertions(+), 125 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java
index e49a69f..c527bc0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java
@@ -1,5 +1,4 @@
-/**
- *
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,7 +19,6 @@ package org.apache.hadoop.hbase.master;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -40,7 +38,13 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 
 /**
  * Class to hold dead servers list and utility querying dead server list.
- * On znode expiration, servers are added here.
+ * Servers are added when they expire or when we find them in filesystem on startup.
+ * When a server crash procedure is queued, it will populate the processing list and
+ * then remove the server from processing list when done. Servers are removed from
+ * dead server list when a new instance is started over the old on same hostname and
+ * port or when new Master comes online tidying up after all initialization. Processing
+ * list and deadserver list are not tied together (you don't have to be in deadservers
+ * list to be processing and vice versa).
  */
 @InterfaceAudience.Private
 public class DeadServer {
@@ -56,37 +60,11 @@ public class DeadServer {
   private final Map<ServerName, Long> deadServers = new HashMap<>();
 
   /**
-   * Set of dead servers currently being processed
-   */
-  private final Set<ServerName> processingServers = new HashSet<ServerName>();
-
-  /**
-   * Handles restart of a server. The new server instance has a different start code.
-   * The new start code should be greater than the old one. We don't check that here.
-   *
-   * @param newServerName Servername as either <code>host:port</code> or
-   *                      <code>host,port,startcode</code>.
-   * @return true if this server was dead before and coming back alive again
+   * Set of dead servers currently being processed by a SCP.
+   * Added to this list at the start of SCP and removed after it is done
+   * processing the crash.
    */
-  public synchronized boolean cleanPreviousInstance(final ServerName newServerName) {
-    Iterator<ServerName> it = deadServers.keySet().iterator();
-    while (it.hasNext()) {
-      ServerName sn = it.next();
-      if (ServerName.isSameAddress(sn, newServerName)) {
-        // remove from deadServers
-        it.remove();
-        // remove from processingServers
-        boolean removed = processingServers.remove(sn);
-        if (removed) {
-          LOG.debug("Removed {}, processing={}, numProcessing={}", sn, removed,
-              processingServers.size());
-        }
-        return true;
-      }
-    }
-
-    return false;
-  }
+  private final Set<ServerName> processingServers = new HashSet<>();
 
   /**
    * @param serverName server name.
@@ -97,21 +75,13 @@ public class DeadServer {
   }
 
   /**
-   * @param serverName server name.
-   * @return true if this server is on the processing servers list false otherwise
-   */
-  public synchronized boolean isProcessingServer(final ServerName serverName) {
-    return processingServers.contains(serverName);
-  }
-
-  /**
    * Checks if there are currently any dead servers being processed by the
    * master.  Returns true if at least one region server is currently being
    * processed as dead.
    *
    * @return true if any RS are being processed as dead
    */
-  public synchronized boolean areDeadServersInProgress() {
+  synchronized boolean areDeadServersInProgress() {
     return !processingServers.isEmpty();
   }
 
@@ -124,41 +94,30 @@ public class DeadServer {
   /**
    * Adds the server to the dead server list if it's not there already.
    */
-  public synchronized void add(ServerName sn) {
-    if (!deadServers.containsKey(sn)){
-      deadServers.put(sn, EnvironmentEdgeManager.currentTime());
-    }
-    boolean added = processingServers.add(sn);
-    if (LOG.isDebugEnabled() && added) {
-      LOG.debug("Added " + sn + "; numProcessing=" + processingServers.size());
-    }
+  synchronized void putIfAbsent(ServerName sn) {
+    this.deadServers.putIfAbsent(sn, EnvironmentEdgeManager.currentTime());
+    processing(sn);
   }
 
   /**
-   * Notify that we started processing this dead server.
-   * @param sn ServerName for the dead server.
+   * Add <code>sn<</code> to set of processing deadservers.
+   * @see #finish(ServerName)
    */
-  public synchronized void notifyServer(ServerName sn) {
-    boolean added = processingServers.add(sn);
-    if (LOG.isDebugEnabled()) {
-      if (added) {
-        LOG.debug("Added " + sn + "; numProcessing=" + processingServers.size());
-      }
-      LOG.debug("Started processing " + sn + "; numProcessing=" + processingServers.size());
+  public synchronized void processing(ServerName sn) {
+    if (processingServers.add(sn)) {
+      // Only log on add.
+      LOG.debug("Processing {}; numProcessing={}", sn, processingServers.size());
     }
   }
 
   /**
    * Complete processing for this dead server.
    * @param sn ServerName for the dead server.
+   * @see #processing(ServerName)
    */
   public synchronized void finish(ServerName sn) {
-    boolean removed = processingServers.remove(sn);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Finished processing " + sn + "; numProcessing=" + processingServers.size());
-      if (removed) {
-        LOG.debug("Removed " + sn + " ; numProcessing=" + processingServers.size());
-      }
+    if (processingServers.remove(sn)) {
+      LOG.debug("Removed {} from processing; numProcessing={}", sn, processingServers.size());
     }
   }
 
@@ -166,30 +125,59 @@ public class DeadServer {
     return deadServers.size();
   }
 
-  public synchronized boolean isEmpty() {
+  synchronized boolean isEmpty() {
     return deadServers.isEmpty();
   }
 
-  public synchronized void cleanAllPreviousInstances(final ServerName newServerName) {
+  /**
+   * Handles restart of a server. The new server instance has a different start code.
+   * The new start code should be greater than the old one. We don't check that here.
+   * Removes the old server from deadserver list.
+   *
+   * @param newServerName Servername as either <code>host:port</code> or
+   *                      <code>host,port,startcode</code>.
+   * @return true if this server was dead before and coming back alive again
+   */
+  synchronized boolean cleanPreviousInstance(final ServerName newServerName) {
     Iterator<ServerName> it = deadServers.keySet().iterator();
     while (it.hasNext()) {
-      ServerName sn = it.next();
-      if (ServerName.isSameAddress(sn, newServerName)) {
-        // remove from deadServers
-        it.remove();
-        // remove from processingServers
-        boolean removed = processingServers.remove(sn);
-        if (removed) {
-          LOG.debug("Removed " + sn + " ; numProcessing=" + processingServers.size());
-        }
+      if (cleanOldServerName(newServerName, it)) {
+        return true;
       }
     }
+    return false;
+  }
+
+  synchronized void cleanAllPreviousInstances(final ServerName newServerName) {
+    Iterator<ServerName> it = deadServers.keySet().iterator();
+    while (it.hasNext()) {
+      cleanOldServerName(newServerName, it);
+    }
+  }
+
+  /**
+   * @param newServerName Server to match port and hostname against.
+   * @param deadServerIterator Iterator primed so can call 'next' on it.
+   * @return True if <code>newServerName</code> and current primed
+   *   iterator ServerName have same host and port and we removed old server
+   *   from iterator and from processing list.
+   */
+  private boolean cleanOldServerName(ServerName newServerName,
+      Iterator<ServerName> deadServerIterator) {
+    ServerName sn = deadServerIterator.next();
+    if (ServerName.isSameAddress(sn, newServerName)) {
+      // Remove from dead servers list. Don't remove from the processing list --
+      // let the SCP do it when it is done.
+      deadServerIterator.remove();
+      return true;
+    }
+    return false;
   }
 
   @Override
   public synchronized String toString() {
     // Display unified set of servers from both maps
-    Set<ServerName> servers = new HashSet<ServerName>();
+    Set<ServerName> servers = new HashSet<>();
     servers.addAll(deadServers.keySet());
     servers.addAll(processingServers);
     StringBuilder sb = new StringBuilder();
@@ -211,7 +199,7 @@ public class DeadServer {
    * @param ts the time, 0 for all
    * @return a sorted array list, by death time, lowest values first.
    */
-  public synchronized List<Pair<ServerName, Long>> copyDeadServersSince(long ts){
+  synchronized List<Pair<ServerName, Long>> copyDeadServersSince(long ts) {
     List<Pair<ServerName, Long>> res =  new ArrayList<>(size());
 
     for (Map.Entry<ServerName, Long> entry:deadServers.entrySet()){
@@ -220,7 +208,7 @@ public class DeadServer {
       }
     }
 
-    Collections.sort(res, ServerNameDeathDateComparator);
+    Collections.sort(res, (o1, o2) -> o1.getSecond().compareTo(o2.getSecond()));
     return res;
   }
   
@@ -234,28 +222,15 @@ public class DeadServer {
     return time == null ? null : new Date(time);
   }
 
-  private static Comparator<Pair<ServerName, Long>> ServerNameDeathDateComparator =
-      new Comparator<Pair<ServerName, Long>>(){
-
-    @Override
-    public int compare(Pair<ServerName, Long> o1, Pair<ServerName, Long> o2) {
-      return o1.getSecond().compareTo(o2.getSecond());
-    }
-  };
-
   /**
-   * remove the specified dead server
+   * Called from rpc by operator cleaning up deadserver list.
    * @param deadServerName the dead server name
    * @return true if this server was removed
    */
-
   public synchronized boolean removeDeadServer(final ServerName deadServerName) {
     Preconditions.checkState(!processingServers.contains(deadServerName),
       "Asked to remove server still in processingServers set " + deadServerName +
           " (numProcessing=" + processingServers.size() + ")");
-    if (deadServers.remove(deadServerName) == null) {
-      return false;
-    }
-    return true;
+    return this.deadServers.remove(deadServerName) != null;
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index 75ebfab..f9e5c77 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -345,7 +345,7 @@ public class ServerManager {
    */
   void findDeadServersAndProcess(Set<ServerName> deadServersFromPE,
       Set<ServerName> liveServersFromWALDir) {
-    deadServersFromPE.forEach(deadservers::add);
+    deadServersFromPE.forEach(deadservers::putIfAbsent);
     liveServersFromWALDir.stream().filter(sn -> !onlineServers.containsKey(sn))
       .forEach(this::expireServer);
   }
@@ -376,6 +376,8 @@ public class ServerManager {
   }
 
   /**
+   * Called when RegionServer first reports in for duty and thereafter each
+   * time it heartbeats to make sure it is has not been figured for dead.
    * If this server is on the dead list, reject it with a YouAreDeadException.
    * If it was dead but came back with a new start code, remove the old entry
    * from the dead list.
@@ -384,21 +386,20 @@ public class ServerManager {
   private void checkIsDead(final ServerName serverName, final String what)
       throws YouAreDeadException {
     if (this.deadservers.isDeadServer(serverName)) {
-      // host name, port and start code all match with existing one of the
-      // dead servers. So, this server must be dead.
+      // Exact match: host name, port and start code all match with existing one of the
+      // dead servers. So, this server must be dead. Tell it to kill itself.
       String message = "Server " + what + " rejected; currently processing " +
           serverName + " as dead server";
       LOG.debug(message);
       throw new YouAreDeadException(message);
     }
-    // remove dead server with same hostname and port of newly checking in rs after master
-    // initialization.See HBASE-5916 for more information.
-    if ((this.master == null || this.master.isInitialized())
-        && this.deadservers.cleanPreviousInstance(serverName)) {
+    // Remove dead server with same hostname and port of newly checking in rs after master
+    // initialization. See HBASE-5916 for more information.
+    if ((this.master == null || this.master.isInitialized()) &&
+        this.deadservers.cleanPreviousInstance(serverName)) {
       // This server has now become alive after we marked it as dead.
       // We removed it's previous entry from the dead list to reflect it.
-      LOG.debug(what + ":" + " Server " + serverName + " came back up," +
-          " removed it from the dead servers list");
+      LOG.debug("{} {} came back up, removed it from the dead servers list", what, serverName);
     }
   }
 
@@ -609,7 +610,10 @@ public class ServerManager {
     return pid;
   }
 
-  // Note: this is currently invoked from RPC, not just tests. Locking in this class needs cleanup.
+  /**
+   * Called when server has expired.
+   */
+  // Locking in this class needs cleanup.
   @VisibleForTesting
   public synchronized void moveFromOnlineToDeadServers(final ServerName sn) {
     synchronized (this.onlineServers) {
@@ -618,7 +622,7 @@ public class ServerManager {
         // Remove the server from the known servers lists and update load info BUT
         // add to deadservers first; do this so it'll show in dead servers list if
         // not in online servers list.
-        this.deadservers.add(sn);
+        this.deadservers.putIfAbsent(sn);
         this.onlineServers.remove(sn);
         onlineServers.notifyAll();
       } else {
@@ -872,7 +876,7 @@ public class ServerManager {
   /**
    * Check if a server is known to be dead.  A server can be online,
    * or known to be dead, or unknown to this manager (i.e, not online,
-   * not known to be dead either. it is simply not tracked by the
+   * not known to be dead either; it is simply not tracked by the
    * master any more, for example, a very old previous instance).
    */
   public synchronized boolean isServerDead(ServerName serverName) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
index 12c699bc..a6ef5ca 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
@@ -124,7 +124,7 @@ public class ServerCrashProcedure
     // This adds server to the DeadServer processing list but not to the DeadServers list.
     // Server gets removed from processing list below on procedure successful finish.
     if (!notifiedDeadServer) {
-      services.getServerManager().getDeadServers().notifyServer(serverName);
+      services.getServerManager().getDeadServers().processing(serverName);
       notifiedDeadServer = true;
     }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDeadServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDeadServer.java
index 73ff789..f78d490 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDeadServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDeadServer.java
@@ -67,20 +67,20 @@ public class TestDeadServer {
 
   @Test public void testIsDead() {
     DeadServer ds = new DeadServer();
-    ds.add(hostname123);
-    ds.notifyServer(hostname123);
+    ds.putIfAbsent(hostname123);
+    ds.processing(hostname123);
     assertTrue(ds.areDeadServersInProgress());
     ds.finish(hostname123);
     assertFalse(ds.areDeadServersInProgress());
 
-    ds.add(hostname1234);
-    ds.notifyServer(hostname1234);
+    ds.putIfAbsent(hostname1234);
+    ds.processing(hostname1234);
     assertTrue(ds.areDeadServersInProgress());
     ds.finish(hostname1234);
     assertFalse(ds.areDeadServersInProgress());
 
-    ds.add(hostname12345);
-    ds.notifyServer(hostname12345);
+    ds.putIfAbsent(hostname12345);
+    ds.processing(hostname12345);
     assertTrue(ds.areDeadServersInProgress());
     ds.finish(hostname12345);
     assertFalse(ds.areDeadServersInProgress());
@@ -90,7 +90,7 @@ public class TestDeadServer {
 
     final ServerName deadServer = ServerName.valueOf("127.0.0.1", 9090, 112321L);
     assertFalse(ds.cleanPreviousInstance(deadServer));
-    ds.add(deadServer);
+    ds.putIfAbsent(deadServer);
     assertTrue(ds.isDeadServer(deadServer));
     Set<ServerName> deadServerNames = ds.copyServerNames();
     for (ServerName eachDeadServer : deadServerNames) {
@@ -123,11 +123,11 @@ public class TestDeadServer {
 
     DeadServer d = new DeadServer();
 
-    d.add(hostname123);
+    d.putIfAbsent(hostname123);
     mee.incValue(1);
-    d.add(hostname1234);
+    d.putIfAbsent(hostname1234);
     mee.incValue(1);
-    d.add(hostname12345);
+    d.putIfAbsent(hostname12345);
 
     List<Pair<ServerName, Long>> copy = d.copyDeadServersSince(2L);
     Assert.assertEquals(2, copy.size());
@@ -144,7 +144,7 @@ public class TestDeadServer {
   @Test
   public void testClean(){
     DeadServer d = new DeadServer();
-    d.add(hostname123);
+    d.putIfAbsent(hostname123);
 
     d.cleanPreviousInstance(hostname12345);
     Assert.assertFalse(d.isEmpty());
@@ -159,8 +159,8 @@ public class TestDeadServer {
   @Test
   public void testClearDeadServer(){
     DeadServer d = new DeadServer();
-    d.add(hostname123);
-    d.add(hostname1234);
+    d.putIfAbsent(hostname123);
+    d.putIfAbsent(hostname1234);
     Assert.assertEquals(2, d.size());
 
     d.finish(hostname123);
@@ -170,7 +170,7 @@ public class TestDeadServer {
     d.removeDeadServer(hostname1234);
     Assert.assertTrue(d.isEmpty());
 
-    d.add(hostname1234);
+    d.putIfAbsent(hostname1234);
     Assert.assertFalse(d.removeDeadServer(hostname123_2));
     Assert.assertEquals(1, d.size());
     d.finish(hostname1234);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestHBCKSCP.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestHBCKSCP.java
index a74106c..8396ac0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestHBCKSCP.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestHBCKSCP.java
@@ -104,7 +104,7 @@ public class TestHBCKSCP extends TestSCPBase {
     assertEquals(RegionState.State.OPEN.toString(),
         Bytes.toString(r.getValue(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER)));
     ServerName serverName = MetaTableAccessor.getServerName(r, 0);
-    assertTrue(rsServerName.equals(serverName));
+    assertEquals(rsServerName, serverName);
     // moveFrom adds to dead servers and adds it to processing list only we will
     // not be processing this server 'normally'. Remove it from processing by
     // calling 'finish' and then remove it from dead servers so rsServerName
@@ -154,14 +154,13 @@ public class TestHBCKSCP extends TestSCPBase {
     assertNotEquals(rsServerName, serverName);
     // Make sure no mention of old server post SCP.
     assertFalse(searchMeta(master, rsServerName));
-    assertFalse(master.getServerManager().getDeadServers().isProcessingServer(rsServerName));
     assertFalse(master.getServerManager().getDeadServers().isDeadServer(rsServerName));
   }
 
   /**
    * @return True if we find reference to <code>sn</code> in meta table.
    */
-  boolean searchMeta(HMaster master, ServerName sn) throws IOException {
+  private boolean searchMeta(HMaster master, ServerName sn) throws IOException {
     List<Pair<RegionInfo, ServerName>> ps =
       MetaTableAccessor.getTableRegionsAndLocations(master.getConnection(), null);
     for (Pair<RegionInfo, ServerName> p: ps) {