You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jh...@apache.org on 2019/08/30 21:04:19 UTC
[hadoop] branch branch-2 updated: YARN-7585. NodeManager should go
unhealthy when state store throws DBException. Contributed by Wilfred
Spiegelenburg.
This is an automated email from the ASF dual-hosted git repository.
jhung pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 655154c YARN-7585. NodeManager should go unhealthy when state store throws DBException. Contributed by Wilfred Spiegelenburg.
655154c is described below
commit 655154cb458a8db1d5b35f6400d54d3c8fb72c0c
Author: Miklos Szegedi <sz...@apache.org>
AuthorDate: Tue Jan 2 18:03:04 2018 -0800
YARN-7585. NodeManager should go unhealthy when state store throws DBException. Contributed by Wilfred Spiegelenburg.
(cherry picked from commit 7f515f57ede74dae787994f37bfafd5d20c9aa4c)
---
.../yarn/server/nodemanager/NodeManager.java | 1 +
.../recovery/NMLeveldbStateStoreService.java | 72 ++++++++++++++++++++++
.../nodemanager/recovery/NMStateStoreService.java | 11 ++++
.../recovery/TestNMLeveldbStateStoreService.java | 35 +++++++++++
4 files changed, 119 insertions(+)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 30346e0..a9bc022 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -456,6 +456,7 @@ public class NodeManager extends CompositeService
// so that we make sure everything is up before registering with RM.
addService(nodeStatusUpdater);
((NMContext) context).setNodeStatusUpdater(nodeStatusUpdater);
+ nmStore.setNodeStatusUpdater(nodeStatusUpdater);
// Do secure login before calling init for added services.
try {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index 0cbf078..49c2764 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
import org.apache.hadoop.yarn.server.records.Version;
@@ -158,6 +159,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
private DB db;
private boolean isNewlyCreated;
+ private boolean isHealthy;
private Timer compactionTimer;
/**
@@ -172,6 +174,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
@Override
protected void startStorage() throws IOException {
+ // Assume that we're healthy when we start
+ isHealthy = true;
}
@Override
@@ -190,6 +194,36 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
return isNewlyCreated;
}
+ /**
+ * If the state store throws an error after recovery has been performed
+ * then we can not trust it any more to reflect the NM state. We need to
+ * mark the store and node unhealthy.
+ * Errors during the recovery will cause a service failure and thus a NM
+ * start failure. Do not need to mark the store unhealthy for those.
+ * @param dbErr Exception
+ */
+ private void markStoreUnHealthy(DBException dbErr) {
+ // Always log the error here, we might not see the error in the caller
+ LOG.error("Statestore exception: ", dbErr);
+ // We have already been marked unhealthy so no need to do it again.
+ if (!isHealthy) {
+ return;
+ }
+ // Mark unhealthy, an out of band heartbeat will be sent and the state
+ // will remain unhealthy (not recoverable).
+ // No need to close the store: does not make any difference at this point.
+ isHealthy = false;
+ // We could get here before the nodeStatusUpdater is set
+ NodeStatusUpdater nsu = getNodeStatusUpdater();
+ if (nsu != null) {
+ nsu.reportException(dbErr);
+ }
+ }
+
+ @VisibleForTesting
+ boolean isHealthy() {
+ return isHealthy;
+ }
@Override
public List<RecoveredContainerState> loadContainersState()
@@ -362,6 +396,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
db.write(batch);
}
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -386,6 +421,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.put(bytes(key), EMPTY_VALUE);
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -401,6 +437,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.delete(bytes(key));
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -416,6 +453,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.put(bytes(key), EMPTY_VALUE);
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -432,6 +470,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.delete(bytes(key));
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -449,6 +488,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.put(bytes(key), bytes(diagnostics.toString()));
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -467,6 +507,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.put(bytes(key), EMPTY_VALUE);
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -496,6 +537,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
batch.close();
}
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -512,6 +554,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.put(bytes(key), EMPTY_VALUE);
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -528,6 +571,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.put(bytes(key), bytes(Integer.toString(exitCode)));
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -540,6 +584,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.put(bytes(key), bytes(Integer.toString(remainingRetryAttempts)));
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -552,6 +597,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.put(bytes(key), bytes(workDir));
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -564,6 +610,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.put(bytes(key), bytes(logDir));
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -597,6 +644,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
batch.close();
}
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -646,6 +694,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.put(bytes(key), p.toByteArray());
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -667,6 +716,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
batch.close();
}
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -823,6 +873,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.put(bytes(key), proto.toByteArray());
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -846,6 +897,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
batch.close();
}
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -869,6 +921,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
batch.close();
}
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -934,6 +987,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.put(bytes(key), taskProto.toByteArray());
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -944,6 +998,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.delete(bytes(key));
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -1017,6 +1072,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.delete(bytes(key));
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -1031,6 +1087,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.put(bytes(dbKey), pb.getProto().toByteArray());
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -1104,6 +1161,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.put(bytes(key), bytes(expTime.toString()));
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -1115,6 +1173,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.delete(bytes(key));
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -1165,6 +1224,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.put(bytes(key), proto.toByteArray());
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -1175,6 +1235,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.delete(bytes(key));
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -1207,6 +1268,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
batch.close();
}
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
@@ -1370,6 +1432,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.delete(bytes(dbkey));
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
return;
@@ -1384,6 +1447,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.put(bytes(fullkey), data);
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -1395,6 +1459,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
try {
db.delete(bytes(fullkey));
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -1418,6 +1483,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
candidates.add(key);
}
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
} finally {
if (iter != null) {
@@ -1431,6 +1497,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
db.delete(bytes(key));
}
} catch (DBException e) {
+ markStoreUnHealthy(e);
throw new IOException(e);
}
}
@@ -1552,6 +1619,11 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
return db;
}
+ @VisibleForTesting
+ void setDB(DB testDb) {
+ this.db = testDb;
+ }
+
/**
* 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
* 2) Any incompatible change of state-store is a major upgrade, and any
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
index 350f242..bedf2a5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Localize
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
@@ -51,10 +52,20 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Reso
@Unstable
public abstract class NMStateStoreService extends AbstractService {
+ private NodeStatusUpdater nodeStatusUpdater = null;
+
public NMStateStoreService(String name) {
super(name);
}
+ protected NodeStatusUpdater getNodeStatusUpdater() {
+ return nodeStatusUpdater;
+ }
+
+ public void setNodeStatusUpdater(NodeStatusUpdater nodeStatusUpdater) {
+ this.nodeStatusUpdater = nodeStatusUpdater;
+ }
+
public static class RecoveredApplicationsState {
List<ContainerManagerApplicationProto> applications;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index 20c5240..ea2cb2e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
@@ -89,10 +90,12 @@ import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
public class TestNMLeveldbStateStoreService {
private static final File TMP_DIR = new File(
@@ -1185,6 +1188,38 @@ public class TestNMLeveldbStateStoreService {
resourceMappings.getAssignedResources("numa").equals(numaRes));
}
+ @Test
+ public void testStateStoreNodeHealth() throws IOException {
+ // keep the working DB clean, break a temp DB
+ DB keepDB = stateStore.getDB();
+ DB myMocked = mock(DB.class);
+ stateStore.setDB(myMocked);
+
+ ApplicationId appId = ApplicationId.newInstance(1234, 1);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ DBException toThrow = new DBException();
+ Mockito.doThrow(toThrow).when(myMocked).
+ put(any(byte[].class), any(byte[].class));
+ // write some data
+ try {
+ // chosen a simple method could be any of the "void" methods
+ ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1);
+ stateStore.storeContainerKilled(containerId);
+ } catch (IOException ioErr) {
+ // Cause should be wrapped DBException
+ assertTrue(ioErr.getCause() instanceof DBException);
+ // check the store is marked unhealthy
+ assertFalse("Statestore should have been unhealthy",
+ stateStore.isHealthy());
+ return;
+ } finally {
+ // restore the working DB
+ stateStore.setDB(keepDB);
+ }
+ Assert.fail("Expected exception not thrown");
+ }
+
private StartContainerRequest storeMockContainer(ContainerId containerId)
throws IOException {
// create a container request
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org