You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2018/07/11 23:10:46 UTC
[46/56] [abbrv] hadoop git commit: YARN-8512. ATSv2 entities are not
published to HBase from second attempt onwards. Contributed by Rohith Sharma
K S.
YARN-8512. ATSv2 entities are not published to HBase from second attempt onwards. Contributed by Rohith Sharma K S.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7f1d3d0e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7f1d3d0e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7f1d3d0e
Branch: refs/heads/HDFS-12943
Commit: 7f1d3d0e9dbe328fae0d43421665e0b6907b33fe
Parents: a47ec5d
Author: Sunil G <su...@apache.org>
Authored: Wed Jul 11 12:26:32 2018 +0530
Committer: Sunil G <su...@apache.org>
Committed: Wed Jul 11 12:26:32 2018 +0530
----------------------------------------------------------------------
.../containermanager/ContainerManagerImpl.java | 69 ++++++++----
.../application/ApplicationImpl.java | 7 +-
.../BaseContainerManagerTest.java | 25 +++++
.../TestContainerManagerRecovery.java | 106 +++++++++++++++++--
4 files changed, 180 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f1d3d0e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 3470910..ad63720 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -1102,24 +1102,8 @@ public class ContainerManagerImpl extends CompositeService implements
// Create the application
// populate the flow context from the launch context if the timeline
// service v.2 is enabled
- FlowContext flowContext = null;
- if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
- String flowName = launchContext.getEnvironment()
- .get(TimelineUtils.FLOW_NAME_TAG_PREFIX);
- String flowVersion = launchContext.getEnvironment()
- .get(TimelineUtils.FLOW_VERSION_TAG_PREFIX);
- String flowRunIdStr = launchContext.getEnvironment()
- .get(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
- long flowRunId = 0L;
- if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
- flowRunId = Long.parseLong(flowRunIdStr);
- }
- flowContext = new FlowContext(flowName, flowVersion, flowRunId);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Flow context: " + flowContext
- + " created for an application " + applicationID);
- }
- }
+ FlowContext flowContext =
+ getFlowContext(launchContext, applicationID);
Application application =
new ApplicationImpl(dispatcher, user, flowContext,
@@ -1138,6 +1122,31 @@ public class ContainerManagerImpl extends CompositeService implements
dispatcher.getEventHandler().handle(new ApplicationInitEvent(
applicationID, appAcls, logAggregationContext));
}
+ } else if (containerTokenIdentifier.getContainerType()
+ == ContainerType.APPLICATION_MASTER) {
+ FlowContext flowContext =
+ getFlowContext(launchContext, applicationID);
+ if (flowContext != null) {
+ ApplicationImpl application =
+ (ApplicationImpl) context.getApplications().get(applicationID);
+
+ // update flowContext reference in ApplicationImpl
+ application.setFlowContext(flowContext);
+
+ // Required to update state store for recovery.
+ context.getNMStateStore().storeApplication(applicationID,
+ buildAppProto(applicationID, user, credentials,
+ container.getLaunchContext().getApplicationACLs(),
+ containerTokenIdentifier.getLogAggregationContext(),
+ flowContext));
+
+ LOG.info(
+ "Updated application reference with flowContext " + flowContext
+ + " for app " + applicationID);
+ } else {
+ LOG.info("TimelineService V2.0 is not enabled. Skipping updating "
+ + "flowContext for application " + applicationID);
+ }
}
this.context.getNMStateStore().storeContainer(containerId,
@@ -1163,6 +1172,30 @@ public class ContainerManagerImpl extends CompositeService implements
}
}
+ private FlowContext getFlowContext(ContainerLaunchContext launchContext,
+ ApplicationId applicationID) {
+ FlowContext flowContext = null;
+ if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
+ String flowName = launchContext.getEnvironment()
+ .get(TimelineUtils.FLOW_NAME_TAG_PREFIX);
+ String flowVersion = launchContext.getEnvironment()
+ .get(TimelineUtils.FLOW_VERSION_TAG_PREFIX);
+ String flowRunIdStr = launchContext.getEnvironment()
+ .get(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
+ long flowRunId = 0L;
+ if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
+ flowRunId = Long.parseLong(flowRunIdStr);
+ }
+ flowContext = new FlowContext(flowName, flowVersion, flowRunId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Flow context: " + flowContext + " created for an application "
+ + applicationID);
+ }
+ }
+ return flowContext;
+ }
+
protected ContainerTokenIdentifier verifyAndGetContainerTokenIdentifier(
org.apache.hadoop.yarn.api.records.Token token,
ContainerTokenIdentifier containerTokenIdentifier) throws YarnException,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f1d3d0e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index 6d84fb2..ad995fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -25,6 +25,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,7 +68,6 @@ import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
-import com.google.common.annotations.VisibleForTesting;
/**
* The state machine for the representation of an Application
@@ -688,4 +689,8 @@ public class ApplicationImpl implements Application {
public long getFlowRunId() {
return flowContext == null ? 0L : flowContext.getFlowRunId();
}
+
+ public void setFlowContext(FlowContext fc) {
+ this.flowContext = fc;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f1d3d0e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
index 93d0afb..b31601c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
@@ -429,6 +429,16 @@ public abstract class BaseContainerManagerTest {
}
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
+ NodeId nodeId, String user,
+ NMContainerTokenSecretManager containerTokenSecretManager,
+ LogAggregationContext logAggregationContext, ContainerType containerType)
+ throws IOException {
+ Resource r = BuilderUtils.newResource(1024, 1);
+ return createContainerToken(cId, rmIdentifier, nodeId, user, r,
+ containerTokenSecretManager, logAggregationContext, containerType);
+ }
+
+ public static Token createContainerToken(ContainerId cId, long rmIdentifier,
NodeId nodeId, String user, Resource resource,
NMContainerTokenSecretManager containerTokenSecretManager,
LogAggregationContext logAggregationContext)
@@ -442,6 +452,21 @@ public abstract class BaseContainerManagerTest {
containerTokenIdentifier);
}
+ public static Token createContainerToken(ContainerId cId, long rmIdentifier,
+ NodeId nodeId, String user, Resource resource,
+ NMContainerTokenSecretManager containerTokenSecretManager,
+ LogAggregationContext logAggregationContext, ContainerType continerType)
+ throws IOException {
+ ContainerTokenIdentifier containerTokenIdentifier =
+ new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource,
+ System.currentTimeMillis() + 100000L, 123, rmIdentifier,
+ Priority.newInstance(0), 0, logAggregationContext, null,
+ continerType);
+ return BuilderUtils.newContainerToken(nodeId,
+ containerTokenSecretManager.retrievePassword(containerTokenIdentifier),
+ containerTokenIdentifier);
+ }
+
public static Token createContainerToken(ContainerId cId, int version,
long rmIdentifier, NodeId nodeId, String user, Resource resource,
NMContainerTokenSecretManager containerTokenSecretManager,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f1d3d0e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index bf8b500..0a834af 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
@@ -74,6 +75,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
@@ -205,7 +207,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
"includePatternInRollingAggregation",
"excludePatternInRollingAggregation");
StartContainersResponse startResponse = startContainer(context, cm, cid,
- clc, logAggregationContext);
+ clc, logAggregationContext, ContainerType.TASK);
assertTrue(startResponse.getFailedRequests().isEmpty());
assertEquals(1, context.getApplications().size());
Application app = context.getApplications().get(appId);
@@ -342,7 +344,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
null, null);
StartContainersResponse startResponse = startContainer(context, cm, cid,
- clc, null);
+ clc, null, ContainerType.TASK);
assertTrue(startResponse.getFailedRequests().isEmpty());
assertEquals(1, context.getApplications().size());
Application app = context.getApplications().get(appId);
@@ -579,7 +581,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
cm.init(conf);
cm.start();
StartContainersResponse startResponse = startContainer(context, cm, cid,
- clc, logAggregationContext);
+ clc, logAggregationContext, ContainerType.TASK);
assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
cm.stop();
verify(cm).handle(isA(CMgrCompletedAppsEvent.class));
@@ -595,7 +597,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
cm.init(conf);
cm.start();
startResponse = startContainer(context, cm, cid,
- clc, logAggregationContext);
+ clc, logAggregationContext, ContainerType.TASK);
assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
cm.stop();
memStore.close();
@@ -612,7 +614,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
cm.init(conf);
cm.start();
startResponse = startContainer(context, cm, cid,
- clc, logAggregationContext);
+ clc, logAggregationContext, ContainerType.TASK);
assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
cm.stop();
memStore.close();
@@ -661,7 +663,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
localResources, containerEnv, commands, serviceData,
containerTokens, acls);
StartContainersResponse startResponse = startContainer(
- context, cm, cid, clc, null);
+ context, cm, cid, clc, null, ContainerType.TASK);
assertTrue(startResponse.getFailedRequests().isEmpty());
assertEquals(1, context.getApplications().size());
// make sure the container reaches RUNNING state
@@ -736,14 +738,15 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
private StartContainersResponse startContainer(Context context,
final ContainerManagerImpl cm, ContainerId cid,
- ContainerLaunchContext clc, LogAggregationContext logAggregationContext)
+ ContainerLaunchContext clc, LogAggregationContext logAggregationContext,
+ ContainerType containerType)
throws Exception {
UserGroupInformation user = UserGroupInformation.createRemoteUser(
cid.getApplicationAttemptId().toString());
StartContainerRequest scReq = StartContainerRequest.newInstance(
clc, TestContainerManager.createContainerToken(cid, 0,
context.getNodeId(), user.getShortUserName(),
- context.getContainerTokenSecretManager(), logAggregationContext));
+ context.getContainerTokenSecretManager(), logAggregationContext, containerType));
final List<StartContainerRequest> scReqList =
new ArrayList<StartContainerRequest>();
scReqList.add(scReq);
@@ -910,4 +913,91 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
}
}
+ @Test
+ public void testApplicationRecoveryAfterFlowContextUpdated()
+ throws Exception {
+ conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
+ conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
+ conf.set(YarnConfiguration.YARN_ADMIN_ACL, "yarn_admin_user");
+ NMStateStoreService stateStore = new NMMemoryStateStoreService();
+ stateStore.init(conf);
+ stateStore.start();
+ Context context = createContext(conf, stateStore);
+ ContainerManagerImpl cm = createContainerManager(context);
+ cm.init(conf);
+ cm.start();
+
+ // add an application by starting a container
+ String appName = "app_name1";
+ ApplicationId appId = ApplicationId.newInstance(0, 1);
+ ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
+
+ // create 1nd attempt container with containerId 2
+ ContainerId cid = ContainerId.newContainerId(attemptId, 2);
+ Map<String, LocalResource> localResources = Collections.emptyMap();
+ Map<String, String> containerEnv = new HashMap<>();
+
+ List<String> containerCmds = Collections.emptyList();
+ Map<String, ByteBuffer> serviceData = Collections.emptyMap();
+ Credentials containerCreds = new Credentials();
+ DataOutputBuffer dob = new DataOutputBuffer();
+ containerCreds.writeTokenStorageToStream(dob);
+ ByteBuffer containerTokens =
+ ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ Map<ApplicationAccessType, String> acls =
+ new HashMap<ApplicationAccessType, String>();
+ ContainerLaunchContext clc = ContainerLaunchContext
+ .newInstance(localResources, containerEnv, containerCmds, serviceData,
+ containerTokens, acls);
+ // create the logAggregationContext
+ LogAggregationContext logAggregationContext = LogAggregationContext
+ .newInstance("includePattern", "excludePattern",
+ "includePatternInRollingAggregation",
+ "excludePatternInRollingAggregation");
+
+ StartContainersResponse startResponse =
+ startContainer(context, cm, cid, clc, logAggregationContext,
+ ContainerType.TASK);
+ assertTrue(startResponse.getFailedRequests().isEmpty());
+ assertEquals(1, context.getApplications().size());
+ ApplicationImpl app =
+ (ApplicationImpl) context.getApplications().get(appId);
+ assertNotNull(app);
+ waitForAppState(app, ApplicationState.INITING);
+ assertNull(app.getFlowName());
+
+ // 2nd attempt
+ ApplicationAttemptId attemptId2 =
+ ApplicationAttemptId.newInstance(appId, 2);
+ // create 2nd attempt master container
+ ContainerId cid2 = ContainerId.newContainerId(attemptId, 1);
+ setFlowContext(containerEnv, appName, appId);
+ // once again create for updating launch context
+ clc = ContainerLaunchContext
+ .newInstance(localResources, containerEnv, containerCmds, serviceData,
+ containerTokens, acls);
+ // start container with container type AM.
+ startResponse =
+ startContainer(context, cm, cid2, clc, logAggregationContext,
+ ContainerType.APPLICATION_MASTER);
+ assertTrue(startResponse.getFailedRequests().isEmpty());
+ assertEquals(1, context.getApplications().size());
+ waitForAppState(app, ApplicationState.INITING);
+ assertEquals(appName, app.getFlowName());
+
+ // reset container manager and verify flow context information
+ cm.stop();
+ context = createContext(conf, stateStore);
+ cm = createContainerManager(context);
+ cm.init(conf);
+ cm.start();
+ assertEquals(1, context.getApplications().size());
+ app = (ApplicationImpl) context.getApplications().get(appId);
+ assertNotNull(app);
+ assertEquals(appName, app.getFlowName());
+ waitForAppState(app, ApplicationState.INITING);
+
+ cm.stop();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org