You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by va...@apache.org on 2017/08/30 20:15:46 UTC
[21/50] [abbrv] hadoop git commit: YARN-6555. Store application flow
context in NM state store for work-preserving restart. (Rohith Sharma K S via
Haibo Chen)
YARN-6555. Store application flow context in NM state store for work-preserving restart. (Rohith Sharma K S via Haibo Chen)
(cherry picked from commit 47474fffac085e0e5ea46336bf80ccd0677017a3)
(cherry picked from commit 8817cb5c8424359b880c6d700e53092f0269c1bb)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/75c2a5b0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/75c2a5b0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/75c2a5b0
Branch: refs/heads/YARN-5355_branch2
Commit: 75c2a5b0c99d623db162d149dff2fe1a46987d8f
Parents: b407be6
Author: Haibo Chen <ha...@apache.org>
Authored: Thu May 25 21:15:27 2017 -0700
Committer: Varun Saxena <va...@apache.org>
Committed: Thu Aug 31 01:10:04 2017 +0530
----------------------------------------------------------------------
.../containermanager/ContainerManagerImpl.java | 71 +++++++++++++-------
.../application/ApplicationImpl.java | 27 ++++++--
.../yarn_server_nodemanager_recovery.proto | 7 ++
.../TestContainerManagerRecovery.java | 40 +++++++++--
4 files changed, 111 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/75c2a5b0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 2585a30..5b3edce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -88,6 +88,7 @@ import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ContainerType;
@@ -392,10 +393,20 @@ public class ContainerManagerImpl extends CompositeService implements
new LogAggregationContextPBImpl(p.getLogAggregationContext());
}
+ FlowContext fc = null;
+ if (p.getFlowContext() != null) {
+ FlowContextProto fcp = p.getFlowContext();
+ fc = new FlowContext(fcp.getFlowName(), fcp.getFlowVersion(),
+ fcp.getFlowRunId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Recovering Flow context: " + fc + " for an application " + appId);
+ }
+ }
+
LOG.info("Recovering application " + appId);
- //TODO: Recover flow and flow run ID
- ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId,
- creds, context, p.getAppLogAggregationInitedTime());
+ ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), fc,
+ appId, creds, context, p.getAppLogAggregationInitedTime());
context.getApplications().put(appId, app);
app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext));
}
@@ -971,7 +982,7 @@ public class ContainerManagerImpl extends CompositeService implements
private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
String user, Credentials credentials,
Map<ApplicationAccessType, String> appAcls,
- LogAggregationContext logAggregationContext) {
+ LogAggregationContext logAggregationContext, FlowContext flowContext) {
ContainerManagerApplicationProto.Builder builder =
ContainerManagerApplicationProto.newBuilder();
@@ -1006,6 +1017,16 @@ public class ContainerManagerImpl extends CompositeService implements
}
}
+ builder.clearFlowContext();
+ if (flowContext != null && flowContext.getFlowName() != null
+ && flowContext.getFlowVersion() != null) {
+ FlowContextProto fcp =
+ FlowContextProto.newBuilder().setFlowName(flowContext.getFlowName())
+ .setFlowVersion(flowContext.getFlowVersion())
+ .setFlowRunId(flowContext.getFlowRunId()).build();
+ builder.setFlowContext(fcp);
+ }
+
return builder.build();
}
@@ -1058,25 +1079,29 @@ public class ContainerManagerImpl extends CompositeService implements
this.readLock.lock();
try {
if (!isServiceStopped()) {
- // Create the application
- // populate the flow context from the launch context if the timeline
- // service v.2 is enabled
- FlowContext flowContext = null;
- if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
- String flowName = launchContext.getEnvironment().get(
- TimelineUtils.FLOW_NAME_TAG_PREFIX);
- String flowVersion = launchContext.getEnvironment().get(
- TimelineUtils.FLOW_VERSION_TAG_PREFIX);
- String flowRunIdStr = launchContext.getEnvironment().get(
- TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
- long flowRunId = 0L;
- if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
- flowRunId = Long.parseLong(flowRunIdStr);
- }
- flowContext =
- new FlowContext(flowName, flowVersion, flowRunId);
- }
if (!context.getApplications().containsKey(applicationID)) {
+ // Create the application
+ // populate the flow context from the launch context if the timeline
+ // service v.2 is enabled
+ FlowContext flowContext = null;
+ if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
+ String flowName = launchContext.getEnvironment()
+ .get(TimelineUtils.FLOW_NAME_TAG_PREFIX);
+ String flowVersion = launchContext.getEnvironment()
+ .get(TimelineUtils.FLOW_VERSION_TAG_PREFIX);
+ String flowRunIdStr = launchContext.getEnvironment()
+ .get(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
+ long flowRunId = 0L;
+ if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
+ flowRunId = Long.parseLong(flowRunIdStr);
+ }
+ flowContext = new FlowContext(flowName, flowVersion, flowRunId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Flow context: " + flowContext
+ + " created for an application " + applicationID);
+ }
+ }
+
Application application =
new ApplicationImpl(dispatcher, user, flowContext,
applicationID, credentials, context);
@@ -1090,7 +1115,7 @@ public class ContainerManagerImpl extends CompositeService implements
container.getLaunchContext().getApplicationACLs();
context.getNMStateStore().storeApplication(applicationID,
buildAppProto(applicationID, user, credentials, appAcls,
- logAggregationContext));
+ logAggregationContext, flowContext));
dispatcher.getEventHandler().handle(new ApplicationInitEvent(
applicationID, appAcls, logAggregationContext));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/75c2a5b0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index 14c8c46a..3ff2ada 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
@@ -108,13 +109,6 @@ public class ApplicationImpl implements Application {
}
public ApplicationImpl(Dispatcher dispatcher, String user,
- ApplicationId appId, Credentials credentials, Context context,
- long recoveredLogInitedTime) {
- this(dispatcher, user, null, appId, credentials, context,
- recoveredLogInitedTime);
- }
-
- public ApplicationImpl(Dispatcher dispatcher, String user,
FlowContext flowContext, ApplicationId appId, Credentials credentials,
Context context, long recoveredLogInitedTime) {
this.dispatcher = dispatcher;
@@ -173,6 +167,15 @@ public class ApplicationImpl implements Application {
public long getFlowRunId() {
return flowRunId;
}
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("{");
+ sb.append("Flow Name=").append(getFlowName());
+ sb.append(" Flow Versioin=").append(getFlowVersion());
+ sb.append(" Flow Run Id=").append(getFlowRunId()).append(" }");
+ return sb.toString();
+ }
}
@Override
@@ -392,6 +395,16 @@ public class ApplicationImpl implements Application {
builder.setAppLogAggregationInitedTime(app.applicationLogInitedTimestamp);
+ builder.clearFlowContext();
+ if (app.flowContext != null && app.flowContext.getFlowName() != null
+ && app.flowContext.getFlowVersion() != null) {
+ FlowContextProto fcp = FlowContextProto.newBuilder()
+ .setFlowName(app.flowContext.getFlowName())
+ .setFlowVersion(app.flowContext.getFlowVersion())
+ .setFlowRunId(app.flowContext.getFlowRunId()).build();
+ builder.setFlowContext(fcp);
+ }
+
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/75c2a5b0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
index ecb9835..7212953 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
@@ -31,6 +31,7 @@ message ContainerManagerApplicationProto {
repeated ApplicationACLMapProto acls = 4;
optional LogAggregationContextProto log_aggregation_context = 5;
optional int64 appLogAggregationInitedTime = 6 [ default = -1 ];
+ optional FlowContextProto flowContext = 7;
}
message DeletionServiceDeleteTaskProto {
@@ -53,3 +54,9 @@ message LogDeleterProto {
optional string user = 1;
optional int64 deletionTime = 2;
}
+
+message FlowContextProto {
+ optional string flowName = 1;
+ optional string flowVersion = 2;
+ optional int64 flowRunId = 3;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/75c2a5b0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index e4ac8b4..6769d6a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -107,6 +107,7 @@ import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecret
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.Before;
import org.junit.Test;
@@ -136,6 +137,11 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogDir.getAbsolutePath());
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
+
+ // enable atsv2 by default in test
+ conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+
// Default delSrvc
delSrvc = createDeletionService();
delSrvc.init(conf);
@@ -144,6 +150,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
nodeHealthChecker = new NodeHealthCheckerService(
NodeManager.getNodeHealthScriptRunner(conf), dirsHandler);
nodeHealthChecker.init(conf);
+
}
@Test
@@ -161,6 +168,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
cm.start();
// add an application by starting a container
+ String appName = "app_name1";
String appUser = "app_user1";
String modUser = "modify_user1";
String viewUser = "view_user1";
@@ -170,7 +178,8 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
ApplicationAttemptId.newInstance(appId, 1);
ContainerId cid = ContainerId.newContainerId(attemptId, 1);
Map<String, LocalResource> localResources = Collections.emptyMap();
- Map<String, String> containerEnv = Collections.emptyMap();
+ Map<String, String> containerEnv = new HashMap<>();
+ setFlowContext(containerEnv, appName, appId);
List<String> containerCmds = Collections.emptyList();
Map<String, ByteBuffer> serviceData = Collections.emptyMap();
Credentials containerCreds = new Credentials();
@@ -318,7 +327,8 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
ApplicationAttemptId.newInstance(appId, 1);
ContainerId cid = ContainerId.newContainerId(attemptId, 1);
Map<String, LocalResource> localResources = Collections.emptyMap();
- Map<String, String> containerEnv = Collections.emptyMap();
+ Map<String, String> containerEnv = new HashMap<>();
+ setFlowContext(containerEnv, "app_name1", appId);
List<String> containerCmds = Collections.emptyList();
Map<String, ByteBuffer> serviceData = Collections.emptyMap();
@@ -399,7 +409,8 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
ApplicationAttemptId attemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId cid = ContainerId.newContainerId(attemptId, 1);
- Map<String, String> containerEnv = Collections.emptyMap();
+ Map<String, String> containerEnv = new HashMap<>();
+ setFlowContext(containerEnv, "app_name1", appId);
Map<String, ByteBuffer> serviceData = Collections.emptyMap();
Credentials containerCreds = new Credentials();
DataOutputBuffer dob = new DataOutputBuffer();
@@ -475,7 +486,8 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
ApplicationAttemptId.newInstance(appId, 1);
ContainerId cid = ContainerId.newContainerId(attemptId, 1);
Map<String, LocalResource> localResources = Collections.emptyMap();
- Map<String, String> containerEnv = Collections.emptyMap();
+ Map<String, String> containerEnv = new HashMap<>();
+ setFlowContext(containerEnv, "app_name1", appId);
List<String> containerCmds = Collections.emptyList();
Map<String, ByteBuffer> serviceData = Collections.emptyMap();
Credentials containerCreds = new Credentials();
@@ -757,4 +769,24 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
}
};
}
+
+ private void setFlowContext(Map<String, String> containerEnv, String appName,
+ ApplicationId appId) {
+ if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
+ setFlowTags(containerEnv, TimelineUtils.FLOW_NAME_TAG_PREFIX,
+ TimelineUtils.generateDefaultFlowName(appName, appId));
+ setFlowTags(containerEnv, TimelineUtils.FLOW_VERSION_TAG_PREFIX,
+ TimelineUtils.DEFAULT_FLOW_VERSION);
+ setFlowTags(containerEnv, TimelineUtils.FLOW_RUN_ID_TAG_PREFIX,
+ String.valueOf(System.currentTimeMillis()));
+ }
+ }
+
+ private static void setFlowTags(Map<String, String> environment,
+ String tagPrefix, String value) {
+ if (!value.isEmpty()) {
+ environment.put(tagPrefix, value);
+ }
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org