You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2020/01/03 04:25:44 UTC
[hbase] branch master updated: HBASE-23632 DeadServer cleanup (#979)
This is an automated email from the ASF dual-hosted git repository.
stack pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new abcb1ee HBASE-23632 DeadServer cleanup (#979)
abcb1ee is described below
commit abcb1ee81bdd791a91d6f74ae3e1fc34aa5b15ae
Author: Michael Stack <sa...@users.noreply.github.com>
AuthorDate: Thu Jan 2 20:21:33 2020 -0800
HBASE-23632 DeadServer cleanup (#979)
Signed-off-by: Bharath Vissapragada <bh...@apache.org>
---
.../org/apache/hadoop/hbase/master/DeadServer.java | 165 +++++++++------------
.../apache/hadoop/hbase/master/ServerManager.java | 28 ++--
.../master/procedure/ServerCrashProcedure.java | 2 +-
.../apache/hadoop/hbase/master/TestDeadServer.java | 28 ++--
.../hadoop/hbase/master/procedure/TestHBCKSCP.java | 5 +-
5 files changed, 103 insertions(+), 125 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java
index e49a69f..c527bc0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java
@@ -1,5 +1,4 @@
-/**
- *
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,7 +19,6 @@ package org.apache.hadoop.hbase.master;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@@ -40,7 +38,13 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
/**
* Class to hold dead servers list and utility querying dead server list.
- * On znode expiration, servers are added here.
+ * Servers are added when they expire or when we find them in filesystem on startup.
+ * When a server crash procedure is queued, it will populate the processing list and
+ * then remove the server from processing list when done. Servers are removed from
+ * dead server list when a new instance is started over the old on same hostname and
+ * port or when new Master comes online tidying up after all initialization. Processing
+ * list and deadserver list are not tied together (you don't have to be in deadservers
+ * list to be processing and vice versa).
*/
@InterfaceAudience.Private
public class DeadServer {
@@ -56,37 +60,11 @@ public class DeadServer {
private final Map<ServerName, Long> deadServers = new HashMap<>();
/**
- * Set of dead servers currently being processed
- */
- private final Set<ServerName> processingServers = new HashSet<ServerName>();
-
- /**
- * Handles restart of a server. The new server instance has a different start code.
- * The new start code should be greater than the old one. We don't check that here.
- *
- * @param newServerName Servername as either <code>host:port</code> or
- * <code>host,port,startcode</code>.
- * @return true if this server was dead before and coming back alive again
+ * Set of dead servers currently being processed by a SCP.
+ * Added to this list at the start of SCP and removed after it is done
+ * processing the crash.
*/
- public synchronized boolean cleanPreviousInstance(final ServerName newServerName) {
- Iterator<ServerName> it = deadServers.keySet().iterator();
- while (it.hasNext()) {
- ServerName sn = it.next();
- if (ServerName.isSameAddress(sn, newServerName)) {
- // remove from deadServers
- it.remove();
- // remove from processingServers
- boolean removed = processingServers.remove(sn);
- if (removed) {
- LOG.debug("Removed {}, processing={}, numProcessing={}", sn, removed,
- processingServers.size());
- }
- return true;
- }
- }
-
- return false;
- }
+ private final Set<ServerName> processingServers = new HashSet<>();
/**
* @param serverName server name.
@@ -97,21 +75,13 @@ public class DeadServer {
}
/**
- * @param serverName server name.
- * @return true if this server is on the processing servers list false otherwise
- */
- public synchronized boolean isProcessingServer(final ServerName serverName) {
- return processingServers.contains(serverName);
- }
-
- /**
* Checks if there are currently any dead servers being processed by the
* master. Returns true if at least one region server is currently being
* processed as dead.
*
* @return true if any RS are being processed as dead
*/
- public synchronized boolean areDeadServersInProgress() {
+ synchronized boolean areDeadServersInProgress() {
return !processingServers.isEmpty();
}
@@ -124,41 +94,30 @@ public class DeadServer {
/**
* Adds the server to the dead server list if it's not there already.
*/
- public synchronized void add(ServerName sn) {
- if (!deadServers.containsKey(sn)){
- deadServers.put(sn, EnvironmentEdgeManager.currentTime());
- }
- boolean added = processingServers.add(sn);
- if (LOG.isDebugEnabled() && added) {
- LOG.debug("Added " + sn + "; numProcessing=" + processingServers.size());
- }
+ synchronized void putIfAbsent(ServerName sn) {
+ this.deadServers.putIfAbsent(sn, EnvironmentEdgeManager.currentTime());
+ processing(sn);
}
/**
- * Notify that we started processing this dead server.
- * @param sn ServerName for the dead server.
+ * Add <code>sn<</code> to set of processing deadservers.
+ * @see #finish(ServerName)
*/
- public synchronized void notifyServer(ServerName sn) {
- boolean added = processingServers.add(sn);
- if (LOG.isDebugEnabled()) {
- if (added) {
- LOG.debug("Added " + sn + "; numProcessing=" + processingServers.size());
- }
- LOG.debug("Started processing " + sn + "; numProcessing=" + processingServers.size());
+ public synchronized void processing(ServerName sn) {
+ if (processingServers.add(sn)) {
+ // Only log on add.
+ LOG.debug("Processing {}; numProcessing={}", sn, processingServers.size());
}
}
/**
* Complete processing for this dead server.
* @param sn ServerName for the dead server.
+ * @see #processing(ServerName)
*/
public synchronized void finish(ServerName sn) {
- boolean removed = processingServers.remove(sn);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Finished processing " + sn + "; numProcessing=" + processingServers.size());
- if (removed) {
- LOG.debug("Removed " + sn + " ; numProcessing=" + processingServers.size());
- }
+ if (processingServers.remove(sn)) {
+ LOG.debug("Removed {} from processing; numProcessing={}", sn, processingServers.size());
}
}
@@ -166,30 +125,59 @@ public class DeadServer {
return deadServers.size();
}
- public synchronized boolean isEmpty() {
+ synchronized boolean isEmpty() {
return deadServers.isEmpty();
}
- public synchronized void cleanAllPreviousInstances(final ServerName newServerName) {
+ /**
+ * Handles restart of a server. The new server instance has a different start code.
+ * The new start code should be greater than the old one. We don't check that here.
+ * Removes the old server from deadserver list.
+ *
+ * @param newServerName Servername as either <code>host:port</code> or
+ * <code>host,port,startcode</code>.
+ * @return true if this server was dead before and coming back alive again
+ */
+ synchronized boolean cleanPreviousInstance(final ServerName newServerName) {
Iterator<ServerName> it = deadServers.keySet().iterator();
while (it.hasNext()) {
- ServerName sn = it.next();
- if (ServerName.isSameAddress(sn, newServerName)) {
- // remove from deadServers
- it.remove();
- // remove from processingServers
- boolean removed = processingServers.remove(sn);
- if (removed) {
- LOG.debug("Removed " + sn + " ; numProcessing=" + processingServers.size());
- }
+ if (cleanOldServerName(newServerName, it)) {
+ return true;
}
}
+ return false;
+ }
+
+ synchronized void cleanAllPreviousInstances(final ServerName newServerName) {
+ Iterator<ServerName> it = deadServers.keySet().iterator();
+ while (it.hasNext()) {
+ cleanOldServerName(newServerName, it);
+ }
+ }
+
+ /**
+ * @param newServerName Server to match port and hostname against.
+ * @param deadServerIterator Iterator primed so can call 'next' on it.
+ * @return True if <code>newServerName</code> and current primed
+ * iterator ServerName have same host and port and we removed old server
+ * from iterator and from processing list.
+ */
+ private boolean cleanOldServerName(ServerName newServerName,
+ Iterator<ServerName> deadServerIterator) {
+ ServerName sn = deadServerIterator.next();
+ if (ServerName.isSameAddress(sn, newServerName)) {
+ // Remove from dead servers list. Don't remove from the processing list --
+ // let the SCP do it when it is done.
+ deadServerIterator.remove();
+ return true;
+ }
+ return false;
}
@Override
public synchronized String toString() {
// Display unified set of servers from both maps
- Set<ServerName> servers = new HashSet<ServerName>();
+ Set<ServerName> servers = new HashSet<>();
servers.addAll(deadServers.keySet());
servers.addAll(processingServers);
StringBuilder sb = new StringBuilder();
@@ -211,7 +199,7 @@ public class DeadServer {
* @param ts the time, 0 for all
* @return a sorted array list, by death time, lowest values first.
*/
- public synchronized List<Pair<ServerName, Long>> copyDeadServersSince(long ts){
+ synchronized List<Pair<ServerName, Long>> copyDeadServersSince(long ts) {
List<Pair<ServerName, Long>> res = new ArrayList<>(size());
for (Map.Entry<ServerName, Long> entry:deadServers.entrySet()){
@@ -220,7 +208,7 @@ public class DeadServer {
}
}
- Collections.sort(res, ServerNameDeathDateComparator);
+ Collections.sort(res, (o1, o2) -> o1.getSecond().compareTo(o2.getSecond()));
return res;
}
@@ -234,28 +222,15 @@ public class DeadServer {
return time == null ? null : new Date(time);
}
- private static Comparator<Pair<ServerName, Long>> ServerNameDeathDateComparator =
- new Comparator<Pair<ServerName, Long>>(){
-
- @Override
- public int compare(Pair<ServerName, Long> o1, Pair<ServerName, Long> o2) {
- return o1.getSecond().compareTo(o2.getSecond());
- }
- };
-
/**
- * remove the specified dead server
+ * Called from rpc by operator cleaning up deadserver list.
* @param deadServerName the dead server name
* @return true if this server was removed
*/
-
public synchronized boolean removeDeadServer(final ServerName deadServerName) {
Preconditions.checkState(!processingServers.contains(deadServerName),
"Asked to remove server still in processingServers set " + deadServerName +
" (numProcessing=" + processingServers.size() + ")");
- if (deadServers.remove(deadServerName) == null) {
- return false;
- }
- return true;
+ return this.deadServers.remove(deadServerName) != null;
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index 75ebfab..f9e5c77 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -345,7 +345,7 @@ public class ServerManager {
*/
void findDeadServersAndProcess(Set<ServerName> deadServersFromPE,
Set<ServerName> liveServersFromWALDir) {
- deadServersFromPE.forEach(deadservers::add);
+ deadServersFromPE.forEach(deadservers::putIfAbsent);
liveServersFromWALDir.stream().filter(sn -> !onlineServers.containsKey(sn))
.forEach(this::expireServer);
}
@@ -376,6 +376,8 @@ public class ServerManager {
}
/**
+ * Called when RegionServer first reports in for duty and thereafter each
+ * time it heartbeats to make sure it is has not been figured for dead.
* If this server is on the dead list, reject it with a YouAreDeadException.
* If it was dead but came back with a new start code, remove the old entry
* from the dead list.
@@ -384,21 +386,20 @@ public class ServerManager {
private void checkIsDead(final ServerName serverName, final String what)
throws YouAreDeadException {
if (this.deadservers.isDeadServer(serverName)) {
- // host name, port and start code all match with existing one of the
- // dead servers. So, this server must be dead.
+ // Exact match: host name, port and start code all match with existing one of the
+ // dead servers. So, this server must be dead. Tell it to kill itself.
String message = "Server " + what + " rejected; currently processing " +
serverName + " as dead server";
LOG.debug(message);
throw new YouAreDeadException(message);
}
- // remove dead server with same hostname and port of newly checking in rs after master
- // initialization.See HBASE-5916 for more information.
- if ((this.master == null || this.master.isInitialized())
- && this.deadservers.cleanPreviousInstance(serverName)) {
+ // Remove dead server with same hostname and port of newly checking in rs after master
+ // initialization. See HBASE-5916 for more information.
+ if ((this.master == null || this.master.isInitialized()) &&
+ this.deadservers.cleanPreviousInstance(serverName)) {
// This server has now become alive after we marked it as dead.
// We removed it's previous entry from the dead list to reflect it.
- LOG.debug(what + ":" + " Server " + serverName + " came back up," +
- " removed it from the dead servers list");
+ LOG.debug("{} {} came back up, removed it from the dead servers list", what, serverName);
}
}
@@ -609,7 +610,10 @@ public class ServerManager {
return pid;
}
- // Note: this is currently invoked from RPC, not just tests. Locking in this class needs cleanup.
+ /**
+ * Called when server has expired.
+ */
+ // Locking in this class needs cleanup.
@VisibleForTesting
public synchronized void moveFromOnlineToDeadServers(final ServerName sn) {
synchronized (this.onlineServers) {
@@ -618,7 +622,7 @@ public class ServerManager {
// Remove the server from the known servers lists and update load info BUT
// add to deadservers first; do this so it'll show in dead servers list if
// not in online servers list.
- this.deadservers.add(sn);
+ this.deadservers.putIfAbsent(sn);
this.onlineServers.remove(sn);
onlineServers.notifyAll();
} else {
@@ -872,7 +876,7 @@ public class ServerManager {
/**
* Check if a server is known to be dead. A server can be online,
* or known to be dead, or unknown to this manager (i.e, not online,
- * not known to be dead either. it is simply not tracked by the
+ * not known to be dead either; it is simply not tracked by the
* master any more, for example, a very old previous instance).
*/
public synchronized boolean isServerDead(ServerName serverName) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
index 12c699bc..a6ef5ca 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
@@ -124,7 +124,7 @@ public class ServerCrashProcedure
// This adds server to the DeadServer processing list but not to the DeadServers list.
// Server gets removed from processing list below on procedure successful finish.
if (!notifiedDeadServer) {
- services.getServerManager().getDeadServers().notifyServer(serverName);
+ services.getServerManager().getDeadServers().processing(serverName);
notifiedDeadServer = true;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDeadServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDeadServer.java
index 73ff789..f78d490 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDeadServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDeadServer.java
@@ -67,20 +67,20 @@ public class TestDeadServer {
@Test public void testIsDead() {
DeadServer ds = new DeadServer();
- ds.add(hostname123);
- ds.notifyServer(hostname123);
+ ds.putIfAbsent(hostname123);
+ ds.processing(hostname123);
assertTrue(ds.areDeadServersInProgress());
ds.finish(hostname123);
assertFalse(ds.areDeadServersInProgress());
- ds.add(hostname1234);
- ds.notifyServer(hostname1234);
+ ds.putIfAbsent(hostname1234);
+ ds.processing(hostname1234);
assertTrue(ds.areDeadServersInProgress());
ds.finish(hostname1234);
assertFalse(ds.areDeadServersInProgress());
- ds.add(hostname12345);
- ds.notifyServer(hostname12345);
+ ds.putIfAbsent(hostname12345);
+ ds.processing(hostname12345);
assertTrue(ds.areDeadServersInProgress());
ds.finish(hostname12345);
assertFalse(ds.areDeadServersInProgress());
@@ -90,7 +90,7 @@ public class TestDeadServer {
final ServerName deadServer = ServerName.valueOf("127.0.0.1", 9090, 112321L);
assertFalse(ds.cleanPreviousInstance(deadServer));
- ds.add(deadServer);
+ ds.putIfAbsent(deadServer);
assertTrue(ds.isDeadServer(deadServer));
Set<ServerName> deadServerNames = ds.copyServerNames();
for (ServerName eachDeadServer : deadServerNames) {
@@ -123,11 +123,11 @@ public class TestDeadServer {
DeadServer d = new DeadServer();
- d.add(hostname123);
+ d.putIfAbsent(hostname123);
mee.incValue(1);
- d.add(hostname1234);
+ d.putIfAbsent(hostname1234);
mee.incValue(1);
- d.add(hostname12345);
+ d.putIfAbsent(hostname12345);
List<Pair<ServerName, Long>> copy = d.copyDeadServersSince(2L);
Assert.assertEquals(2, copy.size());
@@ -144,7 +144,7 @@ public class TestDeadServer {
@Test
public void testClean(){
DeadServer d = new DeadServer();
- d.add(hostname123);
+ d.putIfAbsent(hostname123);
d.cleanPreviousInstance(hostname12345);
Assert.assertFalse(d.isEmpty());
@@ -159,8 +159,8 @@ public class TestDeadServer {
@Test
public void testClearDeadServer(){
DeadServer d = new DeadServer();
- d.add(hostname123);
- d.add(hostname1234);
+ d.putIfAbsent(hostname123);
+ d.putIfAbsent(hostname1234);
Assert.assertEquals(2, d.size());
d.finish(hostname123);
@@ -170,7 +170,7 @@ public class TestDeadServer {
d.removeDeadServer(hostname1234);
Assert.assertTrue(d.isEmpty());
- d.add(hostname1234);
+ d.putIfAbsent(hostname1234);
Assert.assertFalse(d.removeDeadServer(hostname123_2));
Assert.assertEquals(1, d.size());
d.finish(hostname1234);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestHBCKSCP.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestHBCKSCP.java
index a74106c..8396ac0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestHBCKSCP.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestHBCKSCP.java
@@ -104,7 +104,7 @@ public class TestHBCKSCP extends TestSCPBase {
assertEquals(RegionState.State.OPEN.toString(),
Bytes.toString(r.getValue(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER)));
ServerName serverName = MetaTableAccessor.getServerName(r, 0);
- assertTrue(rsServerName.equals(serverName));
+ assertEquals(rsServerName, serverName);
// moveFrom adds to dead servers and adds it to processing list only we will
// not be processing this server 'normally'. Remove it from processing by
// calling 'finish' and then remove it from dead servers so rsServerName
@@ -154,14 +154,13 @@ public class TestHBCKSCP extends TestSCPBase {
assertNotEquals(rsServerName, serverName);
// Make sure no mention of old server post SCP.
assertFalse(searchMeta(master, rsServerName));
- assertFalse(master.getServerManager().getDeadServers().isProcessingServer(rsServerName));
assertFalse(master.getServerManager().getDeadServers().isDeadServer(rsServerName));
}
/**
* @return True if we find reference to <code>sn</code> in meta table.
*/
- boolean searchMeta(HMaster master, ServerName sn) throws IOException {
+ private boolean searchMeta(HMaster master, ServerName sn) throws IOException {
List<Pair<RegionInfo, ServerName>> ps =
MetaTableAccessor.getTableRegionsAndLocations(master.getConnection(), null);
for (Pair<RegionInfo, ServerName> p: ps) {