You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2017/05/26 04:19:22 UTC
hadoop git commit: YARN-6555. Store application flow context in NM
state store for work-preserving restart. (Rohith Sharma K S via Haibo Chen)
Repository: hadoop
Updated Branches:
refs/heads/trunk 2b5ad4876 -> 47474fffa
YARN-6555. Store application flow context in NM state store for work-preserving restart. (Rohith Sharma K S via Haibo Chen)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/47474fff
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/47474fff
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/47474fff
Branch: refs/heads/trunk
Commit: 47474fffac085e0e5ea46336bf80ccd0677017a3
Parents: 2b5ad48
Author: Haibo Chen <ha...@apache.org>
Authored: Thu May 25 21:15:27 2017 -0700
Committer: Haibo Chen <ha...@apache.org>
Committed: Thu May 25 21:15:27 2017 -0700
----------------------------------------------------------------------
.../containermanager/ContainerManagerImpl.java | 71 +++++++++++++-------
.../application/ApplicationImpl.java | 27 ++++++--
.../yarn_server_nodemanager_recovery.proto | 7 ++
.../TestContainerManagerRecovery.java | 40 +++++++++--
4 files changed, 111 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/47474fff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index f65f1ac..50268b9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -85,6 +85,7 @@ import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ContainerType;
@@ -381,10 +382,20 @@ public class ContainerManagerImpl extends CompositeService implements
new LogAggregationContextPBImpl(p.getLogAggregationContext());
}
+ FlowContext fc = null;
+ if (p.getFlowContext() != null) {
+ FlowContextProto fcp = p.getFlowContext();
+ fc = new FlowContext(fcp.getFlowName(), fcp.getFlowVersion(),
+ fcp.getFlowRunId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Recovering Flow context: " + fc + " for an application " + appId);
+ }
+ }
+
LOG.info("Recovering application " + appId);
- //TODO: Recover flow and flow run ID
- ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId,
- creds, context, p.getAppLogAggregationInitedTime());
+ ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), fc,
+ appId, creds, context, p.getAppLogAggregationInitedTime());
context.getApplications().put(appId, app);
app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext));
}
@@ -936,7 +947,7 @@ public class ContainerManagerImpl extends CompositeService implements
private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
String user, Credentials credentials,
Map<ApplicationAccessType, String> appAcls,
- LogAggregationContext logAggregationContext) {
+ LogAggregationContext logAggregationContext, FlowContext flowContext) {
ContainerManagerApplicationProto.Builder builder =
ContainerManagerApplicationProto.newBuilder();
@@ -971,6 +982,16 @@ public class ContainerManagerImpl extends CompositeService implements
}
}
+ builder.clearFlowContext();
+ if (flowContext != null && flowContext.getFlowName() != null
+ && flowContext.getFlowVersion() != null) {
+ FlowContextProto fcp =
+ FlowContextProto.newBuilder().setFlowName(flowContext.getFlowName())
+ .setFlowVersion(flowContext.getFlowVersion())
+ .setFlowRunId(flowContext.getFlowRunId()).build();
+ builder.setFlowContext(fcp);
+ }
+
return builder.build();
}
@@ -1016,25 +1037,29 @@ public class ContainerManagerImpl extends CompositeService implements
this.readLock.lock();
try {
if (!isServiceStopped()) {
- // Create the application
- // populate the flow context from the launch context if the timeline
- // service v.2 is enabled
- FlowContext flowContext = null;
- if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
- String flowName = launchContext.getEnvironment().get(
- TimelineUtils.FLOW_NAME_TAG_PREFIX);
- String flowVersion = launchContext.getEnvironment().get(
- TimelineUtils.FLOW_VERSION_TAG_PREFIX);
- String flowRunIdStr = launchContext.getEnvironment().get(
- TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
- long flowRunId = 0L;
- if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
- flowRunId = Long.parseLong(flowRunIdStr);
- }
- flowContext =
- new FlowContext(flowName, flowVersion, flowRunId);
- }
if (!context.getApplications().containsKey(applicationID)) {
+ // Create the application
+ // populate the flow context from the launch context if the timeline
+ // service v.2 is enabled
+ FlowContext flowContext = null;
+ if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
+ String flowName = launchContext.getEnvironment()
+ .get(TimelineUtils.FLOW_NAME_TAG_PREFIX);
+ String flowVersion = launchContext.getEnvironment()
+ .get(TimelineUtils.FLOW_VERSION_TAG_PREFIX);
+ String flowRunIdStr = launchContext.getEnvironment()
+ .get(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
+ long flowRunId = 0L;
+ if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
+ flowRunId = Long.parseLong(flowRunIdStr);
+ }
+ flowContext = new FlowContext(flowName, flowVersion, flowRunId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Flow context: " + flowContext
+ + " created for an application " + applicationID);
+ }
+ }
+
Application application =
new ApplicationImpl(dispatcher, user, flowContext,
applicationID, credentials, context);
@@ -1048,7 +1073,7 @@ public class ContainerManagerImpl extends CompositeService implements
container.getLaunchContext().getApplicationACLs();
context.getNMStateStore().storeApplication(applicationID,
buildAppProto(applicationID, user, credentials, appAcls,
- logAggregationContext));
+ logAggregationContext, flowContext));
dispatcher.getEventHandler().handle(new ApplicationInitEvent(
applicationID, appAcls, logAggregationContext));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/47474fff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index 444581c..80863a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
@@ -106,13 +107,6 @@ public class ApplicationImpl implements Application {
}
public ApplicationImpl(Dispatcher dispatcher, String user,
- ApplicationId appId, Credentials credentials, Context context,
- long recoveredLogInitedTime) {
- this(dispatcher, user, null, appId, credentials, context,
- recoveredLogInitedTime);
- }
-
- public ApplicationImpl(Dispatcher dispatcher, String user,
FlowContext flowContext, ApplicationId appId, Credentials credentials,
Context context, long recoveredLogInitedTime) {
this.dispatcher = dispatcher;
@@ -171,6 +165,15 @@ public class ApplicationImpl implements Application {
public long getFlowRunId() {
return flowRunId;
}
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("{");
+ sb.append("Flow Name=").append(getFlowName());
+ sb.append(" Flow Versioin=").append(getFlowVersion());
+ sb.append(" Flow Run Id=").append(getFlowRunId()).append(" }");
+ return sb.toString();
+ }
}
@Override
@@ -390,6 +393,16 @@ public class ApplicationImpl implements Application {
builder.setAppLogAggregationInitedTime(app.applicationLogInitedTimestamp);
+ builder.clearFlowContext();
+ if (app.flowContext != null && app.flowContext.getFlowName() != null
+ && app.flowContext.getFlowVersion() != null) {
+ FlowContextProto fcp = FlowContextProto.newBuilder()
+ .setFlowName(app.flowContext.getFlowName())
+ .setFlowVersion(app.flowContext.getFlowVersion())
+ .setFlowRunId(app.flowContext.getFlowRunId()).build();
+ builder.setFlowContext(fcp);
+ }
+
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/47474fff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
index 0dfa20e..7831711 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
@@ -31,6 +31,7 @@ message ContainerManagerApplicationProto {
repeated ApplicationACLMapProto acls = 4;
optional LogAggregationContextProto log_aggregation_context = 5;
optional int64 appLogAggregationInitedTime = 6 [ default = -1 ];
+ optional FlowContextProto flowContext = 7;
}
message DeletionServiceDeleteTaskProto {
@@ -52,3 +53,9 @@ message LogDeleterProto {
optional string user = 1;
optional int64 deletionTime = 2;
}
+
+message FlowContextProto {
+ optional string flowName = 1;
+ optional string flowVersion = 2;
+ optional int64 flowRunId = 3;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/47474fff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index 633bb6d..075d857 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -107,6 +107,7 @@ import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecret
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.Before;
import org.junit.Test;
@@ -136,6 +137,11 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogDir.getAbsolutePath());
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
+
+ // enable atsv2 by default in test
+ conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+
// Default delSrvc
delSrvc = createDeletionService();
delSrvc.init(conf);
@@ -144,6 +150,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
nodeHealthChecker = new NodeHealthCheckerService(
NodeManager.getNodeHealthScriptRunner(conf), dirsHandler);
nodeHealthChecker.init(conf);
+
}
@Test
@@ -161,6 +168,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
cm.start();
// add an application by starting a container
+ String appName = "app_name1";
String appUser = "app_user1";
String modUser = "modify_user1";
String viewUser = "view_user1";
@@ -170,7 +178,8 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
ApplicationAttemptId.newInstance(appId, 1);
ContainerId cid = ContainerId.newContainerId(attemptId, 1);
Map<String, LocalResource> localResources = Collections.emptyMap();
- Map<String, String> containerEnv = Collections.emptyMap();
+ Map<String, String> containerEnv = new HashMap<>();
+ setFlowContext(containerEnv, appName, appId);
List<String> containerCmds = Collections.emptyList();
Map<String, ByteBuffer> serviceData = Collections.emptyMap();
Credentials containerCreds = new Credentials();
@@ -318,7 +327,8 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
ApplicationAttemptId.newInstance(appId, 1);
ContainerId cid = ContainerId.newContainerId(attemptId, 1);
Map<String, LocalResource> localResources = Collections.emptyMap();
- Map<String, String> containerEnv = Collections.emptyMap();
+ Map<String, String> containerEnv = new HashMap<>();
+ setFlowContext(containerEnv, "app_name1", appId);
List<String> containerCmds = Collections.emptyList();
Map<String, ByteBuffer> serviceData = Collections.emptyMap();
@@ -400,7 +410,8 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
ApplicationAttemptId attemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId cid = ContainerId.newContainerId(attemptId, 1);
- Map<String, String> containerEnv = Collections.emptyMap();
+ Map<String, String> containerEnv = new HashMap<>();
+ setFlowContext(containerEnv, "app_name1", appId);
Map<String, ByteBuffer> serviceData = Collections.emptyMap();
Credentials containerCreds = new Credentials();
DataOutputBuffer dob = new DataOutputBuffer();
@@ -476,7 +487,8 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
ApplicationAttemptId.newInstance(appId, 1);
ContainerId cid = ContainerId.newContainerId(attemptId, 1);
Map<String, LocalResource> localResources = Collections.emptyMap();
- Map<String, String> containerEnv = Collections.emptyMap();
+ Map<String, String> containerEnv = new HashMap<>();
+ setFlowContext(containerEnv, "app_name1", appId);
List<String> containerCmds = Collections.emptyList();
Map<String, ByteBuffer> serviceData = Collections.emptyMap();
Credentials containerCreds = new Credentials();
@@ -760,4 +772,24 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
containerManager.dispatcher.disableExitOnDispatchException();
return containerManager;
}
+
+ private void setFlowContext(Map<String, String> containerEnv, String appName,
+ ApplicationId appId) {
+ if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
+ setFlowTags(containerEnv, TimelineUtils.FLOW_NAME_TAG_PREFIX,
+ TimelineUtils.generateDefaultFlowName(appName, appId));
+ setFlowTags(containerEnv, TimelineUtils.FLOW_VERSION_TAG_PREFIX,
+ TimelineUtils.DEFAULT_FLOW_VERSION);
+ setFlowTags(containerEnv, TimelineUtils.FLOW_RUN_ID_TAG_PREFIX,
+ String.valueOf(System.currentTimeMillis()));
+ }
+ }
+
+ private static void setFlowTags(Map<String, String> environment,
+ String tagPrefix, String value) {
+ if (!value.isEmpty()) {
+ environment.put(tagPrefix, value);
+ }
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org