You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by va...@apache.org on 2017/08/03 06:58:07 UTC

[30/50] [abbrv] hadoop git commit: YARN-6555. Store application flow context in NM state store for work-preserving restart. (Rohith Sharma K S via Haibo Chen)

YARN-6555. Store application flow context in NM state store for work-preserving restart. (Rohith Sharma K S via Haibo Chen)

(cherry picked from commit 47474fffac085e0e5ea46336bf80ccd0677017a3)
(cherry picked from commit 8817cb5c8424359b880c6d700e53092f0269c1bb)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/303d7e0a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/303d7e0a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/303d7e0a

Branch: refs/heads/YARN-5355_branch2
Commit: 303d7e0a284544b13d5ea04ef699823d31b7933e
Parents: f1f7d65
Author: Haibo Chen <ha...@apache.org>
Authored: Thu May 25 21:15:27 2017 -0700
Committer: Haibo Chen <ha...@apache.org>
Committed: Thu May 25 21:38:58 2017 -0700

----------------------------------------------------------------------
 .../containermanager/ContainerManagerImpl.java  | 71 +++++++++++++-------
 .../application/ApplicationImpl.java            | 27 ++++++--
 .../yarn_server_nodemanager_recovery.proto      |  7 ++
 .../TestContainerManagerRecovery.java           | 40 +++++++++--
 4 files changed, 111 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/303d7e0a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 1d822fe..a9d5f47 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -85,6 +85,7 @@ import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.ContainerType;
@@ -384,10 +385,20 @@ public class ContainerManagerImpl extends CompositeService implements
           new LogAggregationContextPBImpl(p.getLogAggregationContext());
     }
 
+    FlowContext fc = null;
+    if (p.getFlowContext() != null) {
+      FlowContextProto fcp = p.getFlowContext();
+      fc = new FlowContext(fcp.getFlowName(), fcp.getFlowVersion(),
+          fcp.getFlowRunId());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "Recovering Flow context: " + fc + " for an application " + appId);
+      }
+    }
+
     LOG.info("Recovering application " + appId);
-    //TODO: Recover flow and flow run ID
-    ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId,
-        creds, context, p.getAppLogAggregationInitedTime());
+    ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), fc,
+        appId, creds, context, p.getAppLogAggregationInitedTime());
     context.getApplications().put(appId, app);
     app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext));
   }
@@ -941,7 +952,7 @@ public class ContainerManagerImpl extends CompositeService implements
   private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
       String user, Credentials credentials,
       Map<ApplicationAccessType, String> appAcls,
-      LogAggregationContext logAggregationContext) {
+      LogAggregationContext logAggregationContext, FlowContext flowContext) {
 
     ContainerManagerApplicationProto.Builder builder =
         ContainerManagerApplicationProto.newBuilder();
@@ -976,6 +987,16 @@ public class ContainerManagerImpl extends CompositeService implements
       }
     }
 
+    builder.clearFlowContext();
+    if (flowContext != null && flowContext.getFlowName() != null
+        && flowContext.getFlowVersion() != null) {
+      FlowContextProto fcp =
+          FlowContextProto.newBuilder().setFlowName(flowContext.getFlowName())
+              .setFlowVersion(flowContext.getFlowVersion())
+              .setFlowRunId(flowContext.getFlowRunId()).build();
+      builder.setFlowContext(fcp);
+    }
+
     return builder.build();
   }
 
@@ -1022,25 +1043,29 @@ public class ContainerManagerImpl extends CompositeService implements
     this.readLock.lock();
     try {
       if (!isServiceStopped()) {
-        // Create the application
-        // populate the flow context from the launch context if the timeline
-        // service v.2 is enabled
-        FlowContext flowContext = null;
-        if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
-          String flowName = launchContext.getEnvironment().get(
-              TimelineUtils.FLOW_NAME_TAG_PREFIX);
-          String flowVersion = launchContext.getEnvironment().get(
-              TimelineUtils.FLOW_VERSION_TAG_PREFIX);
-          String flowRunIdStr = launchContext.getEnvironment().get(
-              TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
-          long flowRunId = 0L;
-          if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
-            flowRunId = Long.parseLong(flowRunIdStr);
-          }
-          flowContext =
-              new FlowContext(flowName, flowVersion, flowRunId);
-        }
         if (!context.getApplications().containsKey(applicationID)) {
+          // Create the application
+          // populate the flow context from the launch context if the timeline
+          // service v.2 is enabled
+          FlowContext flowContext = null;
+          if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
+            String flowName = launchContext.getEnvironment()
+                .get(TimelineUtils.FLOW_NAME_TAG_PREFIX);
+            String flowVersion = launchContext.getEnvironment()
+                .get(TimelineUtils.FLOW_VERSION_TAG_PREFIX);
+            String flowRunIdStr = launchContext.getEnvironment()
+                .get(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
+            long flowRunId = 0L;
+            if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
+              flowRunId = Long.parseLong(flowRunIdStr);
+            }
+            flowContext = new FlowContext(flowName, flowVersion, flowRunId);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Flow context: " + flowContext
+                  + " created for an application " + applicationID);
+            }
+          }
+
           Application application =
               new ApplicationImpl(dispatcher, user, flowContext,
                   applicationID, credentials, context);
@@ -1054,7 +1079,7 @@ public class ContainerManagerImpl extends CompositeService implements
                 container.getLaunchContext().getApplicationACLs();
             context.getNMStateStore().storeApplication(applicationID,
                 buildAppProto(applicationID, user, credentials, appAcls,
-                    logAggregationContext));
+                    logAggregationContext, flowContext));
             dispatcher.getEventHandler().handle(new ApplicationInitEvent(
                 applicationID, appAcls, logAggregationContext));
           }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/303d7e0a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index 4e14eb0..aafb8d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
@@ -107,13 +108,6 @@ public class ApplicationImpl implements Application {
   }
 
   public ApplicationImpl(Dispatcher dispatcher, String user,
-      ApplicationId appId, Credentials credentials, Context context,
-      long recoveredLogInitedTime) {
-    this(dispatcher, user, null, appId, credentials, context,
-      recoveredLogInitedTime);
-  }
-
-  public ApplicationImpl(Dispatcher dispatcher, String user,
       FlowContext flowContext, ApplicationId appId, Credentials credentials,
       Context context, long recoveredLogInitedTime) {
     this.dispatcher = dispatcher;
@@ -172,6 +166,15 @@ public class ApplicationImpl implements Application {
     public long getFlowRunId() {
       return flowRunId;
     }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("{");
+      sb.append("Flow Name=").append(getFlowName());
+      sb.append(" Flow Versioin=").append(getFlowVersion());
+      sb.append(" Flow Run Id=").append(getFlowRunId()).append(" }");
+      return sb.toString();
+    }
   }
 
   @Override
@@ -391,6 +394,16 @@ public class ApplicationImpl implements Application {
 
     builder.setAppLogAggregationInitedTime(app.applicationLogInitedTimestamp);
 
+    builder.clearFlowContext();
+    if (app.flowContext != null && app.flowContext.getFlowName() != null
+        && app.flowContext.getFlowVersion() != null) {
+      FlowContextProto fcp = FlowContextProto.newBuilder()
+          .setFlowName(app.flowContext.getFlowName())
+          .setFlowVersion(app.flowContext.getFlowVersion())
+          .setFlowRunId(app.flowContext.getFlowRunId()).build();
+      builder.setFlowContext(fcp);
+    }
+
     return builder.build();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/303d7e0a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
index 0dfa20e..7831711 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
@@ -31,6 +31,7 @@ message ContainerManagerApplicationProto {
   repeated ApplicationACLMapProto acls = 4;
   optional LogAggregationContextProto log_aggregation_context = 5;
   optional int64 appLogAggregationInitedTime = 6 [ default = -1 ];
+  optional FlowContextProto flowContext = 7;
 }
 
 message DeletionServiceDeleteTaskProto {
@@ -52,3 +53,9 @@ message LogDeleterProto {
   optional string user = 1;
   optional int64 deletionTime = 2;
 }
+
+message FlowContextProto {
+  optional string flowName = 1;
+  optional string flowVersion = 2;
+  optional int64 flowRunId = 3;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/303d7e0a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index 36db50d..85d9639 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -107,6 +107,7 @@ import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecret
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -136,6 +137,11 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
     conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
     conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogDir.getAbsolutePath());
     conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
+
+    // enable atsv2 by default in test
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+
     // Default delSrvc
     delSrvc = createDeletionService();
     delSrvc.init(conf);
@@ -144,6 +150,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
     nodeHealthChecker = new NodeHealthCheckerService(
         NodeManager.getNodeHealthScriptRunner(conf), dirsHandler);
     nodeHealthChecker.init(conf);
+
   }
 
   @Test
@@ -161,6 +168,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
     cm.start();
 
     // add an application by starting a container
+    String appName = "app_name1";
     String appUser = "app_user1";
     String modUser = "modify_user1";
     String viewUser = "view_user1";
@@ -170,7 +178,8 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
         ApplicationAttemptId.newInstance(appId, 1);
     ContainerId cid = ContainerId.newContainerId(attemptId, 1);
     Map<String, LocalResource> localResources = Collections.emptyMap();
-    Map<String, String> containerEnv = Collections.emptyMap();
+    Map<String, String> containerEnv = new HashMap<>();
+    setFlowContext(containerEnv, appName, appId);
     List<String> containerCmds = Collections.emptyList();
     Map<String, ByteBuffer> serviceData = Collections.emptyMap();
     Credentials containerCreds = new Credentials();
@@ -318,7 +327,8 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
         ApplicationAttemptId.newInstance(appId, 1);
     ContainerId cid = ContainerId.newContainerId(attemptId, 1);
     Map<String, LocalResource> localResources = Collections.emptyMap();
-    Map<String, String> containerEnv = Collections.emptyMap();
+    Map<String, String> containerEnv = new HashMap<>();
+    setFlowContext(containerEnv, "app_name1", appId);
     List<String> containerCmds = Collections.emptyList();
     Map<String, ByteBuffer> serviceData = Collections.emptyMap();
 
@@ -399,7 +409,8 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
     ApplicationAttemptId attemptId =
         ApplicationAttemptId.newInstance(appId, 1);
     ContainerId cid = ContainerId.newContainerId(attemptId, 1);
-    Map<String, String> containerEnv = Collections.emptyMap();
+    Map<String, String> containerEnv = new HashMap<>();
+    setFlowContext(containerEnv, "app_name1", appId);
     Map<String, ByteBuffer> serviceData = Collections.emptyMap();
     Credentials containerCreds = new Credentials();
     DataOutputBuffer dob = new DataOutputBuffer();
@@ -475,7 +486,8 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
         ApplicationAttemptId.newInstance(appId, 1);
     ContainerId cid = ContainerId.newContainerId(attemptId, 1);
     Map<String, LocalResource> localResources = Collections.emptyMap();
-    Map<String, String> containerEnv = Collections.emptyMap();
+    Map<String, String> containerEnv = new HashMap<>();
+    setFlowContext(containerEnv, "app_name1", appId);
     List<String> containerCmds = Collections.emptyList();
     Map<String, ByteBuffer> serviceData = Collections.emptyMap();
     Credentials containerCreds = new Credentials();
@@ -768,4 +780,24 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
           }
     };
   }
+
+  private void setFlowContext(Map<String, String> containerEnv, String appName,
+      ApplicationId appId) {
+    if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
+      setFlowTags(containerEnv, TimelineUtils.FLOW_NAME_TAG_PREFIX,
+          TimelineUtils.generateDefaultFlowName(appName, appId));
+      setFlowTags(containerEnv, TimelineUtils.FLOW_VERSION_TAG_PREFIX,
+          TimelineUtils.DEFAULT_FLOW_VERSION);
+      setFlowTags(containerEnv, TimelineUtils.FLOW_RUN_ID_TAG_PREFIX,
+          String.valueOf(System.currentTimeMillis()));
+    }
+  }
+
+  private static void setFlowTags(Map<String, String> environment,
+      String tagPrefix, String value) {
+    if (!value.isEmpty()) {
+      environment.put(tagPrefix, value);
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org