You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bo...@apache.org on 2018/07/14 00:45:17 UTC

[23/50] [abbrv] hadoop git commit: YARN-8512. ATSv2 entities are not published to HBase from second attempt onwards. Contributed by Rohith Sharma K S.

YARN-8512. ATSv2 entities are not published to HBase from second attempt onwards. Contributed by Rohith Sharma K S.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7f1d3d0e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7f1d3d0e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7f1d3d0e

Branch: refs/heads/YARN-7402
Commit: 7f1d3d0e9dbe328fae0d43421665e0b6907b33fe
Parents: a47ec5d
Author: Sunil G <su...@apache.org>
Authored: Wed Jul 11 12:26:32 2018 +0530
Committer: Sunil G <su...@apache.org>
Committed: Wed Jul 11 12:26:32 2018 +0530

----------------------------------------------------------------------
 .../containermanager/ContainerManagerImpl.java  |  69 ++++++++----
 .../application/ApplicationImpl.java            |   7 +-
 .../BaseContainerManagerTest.java               |  25 +++++
 .../TestContainerManagerRecovery.java           | 106 +++++++++++++++++--
 4 files changed, 180 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f1d3d0e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 3470910..ad63720 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -1102,24 +1102,8 @@ public class ContainerManagerImpl extends CompositeService implements
           // Create the application
           // populate the flow context from the launch context if the timeline
           // service v.2 is enabled
-          FlowContext flowContext = null;
-          if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
-            String flowName = launchContext.getEnvironment()
-                .get(TimelineUtils.FLOW_NAME_TAG_PREFIX);
-            String flowVersion = launchContext.getEnvironment()
-                .get(TimelineUtils.FLOW_VERSION_TAG_PREFIX);
-            String flowRunIdStr = launchContext.getEnvironment()
-                .get(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
-            long flowRunId = 0L;
-            if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
-              flowRunId = Long.parseLong(flowRunIdStr);
-            }
-            flowContext = new FlowContext(flowName, flowVersion, flowRunId);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Flow context: " + flowContext
-                  + " created for an application " + applicationID);
-            }
-          }
+          FlowContext flowContext =
+              getFlowContext(launchContext, applicationID);
 
           Application application =
               new ApplicationImpl(dispatcher, user, flowContext,
@@ -1138,6 +1122,31 @@ public class ContainerManagerImpl extends CompositeService implements
             dispatcher.getEventHandler().handle(new ApplicationInitEvent(
                 applicationID, appAcls, logAggregationContext));
           }
+        } else if (containerTokenIdentifier.getContainerType()
+            == ContainerType.APPLICATION_MASTER) {
+          FlowContext flowContext =
+              getFlowContext(launchContext, applicationID);
+          if (flowContext != null) {
+            ApplicationImpl application =
+                (ApplicationImpl) context.getApplications().get(applicationID);
+
+            // update flowContext reference in ApplicationImpl
+            application.setFlowContext(flowContext);
+
+            // Required to update state store for recovery.
+            context.getNMStateStore().storeApplication(applicationID,
+                buildAppProto(applicationID, user, credentials,
+                    container.getLaunchContext().getApplicationACLs(),
+                    containerTokenIdentifier.getLogAggregationContext(),
+                    flowContext));
+
+            LOG.info(
+                "Updated application reference with flowContext " + flowContext
+                    + " for app " + applicationID);
+          } else {
+            LOG.info("TimelineService V2.0 is not enabled. Skipping updating "
+                + "flowContext for application " + applicationID);
+          }
         }
 
         this.context.getNMStateStore().storeContainer(containerId,
@@ -1163,6 +1172,30 @@ public class ContainerManagerImpl extends CompositeService implements
     }
   }
 
+  private FlowContext getFlowContext(ContainerLaunchContext launchContext,
+      ApplicationId applicationID) {
+    FlowContext flowContext = null;
+    if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
+      String flowName = launchContext.getEnvironment()
+          .get(TimelineUtils.FLOW_NAME_TAG_PREFIX);
+      String flowVersion = launchContext.getEnvironment()
+          .get(TimelineUtils.FLOW_VERSION_TAG_PREFIX);
+      String flowRunIdStr = launchContext.getEnvironment()
+          .get(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
+      long flowRunId = 0L;
+      if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
+        flowRunId = Long.parseLong(flowRunIdStr);
+      }
+      flowContext = new FlowContext(flowName, flowVersion, flowRunId);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "Flow context: " + flowContext + " created for an application "
+                + applicationID);
+      }
+    }
+    return flowContext;
+  }
+
   protected ContainerTokenIdentifier verifyAndGetContainerTokenIdentifier(
       org.apache.hadoop.yarn.api.records.Token token,
       ContainerTokenIdentifier containerTokenIdentifier) throws YarnException,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f1d3d0e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index 6d84fb2..ad995fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -25,6 +25,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,7 +68,6 @@ import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
-import com.google.common.annotations.VisibleForTesting;
 
 /**
  * The state machine for the representation of an Application
@@ -688,4 +689,8 @@ public class ApplicationImpl implements Application {
   public long getFlowRunId() {
     return flowContext == null ? 0L : flowContext.getFlowRunId();
   }
+
+  public void setFlowContext(FlowContext fc) {
+    this.flowContext = fc;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f1d3d0e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
index 93d0afb..b31601c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
@@ -429,6 +429,16 @@ public abstract class BaseContainerManagerTest {
   }
 
   public static Token createContainerToken(ContainerId cId, long rmIdentifier,
+      NodeId nodeId, String user,
+      NMContainerTokenSecretManager containerTokenSecretManager,
+      LogAggregationContext logAggregationContext, ContainerType containerType)
+      throws IOException {
+    Resource r = BuilderUtils.newResource(1024, 1);
+    return createContainerToken(cId, rmIdentifier, nodeId, user, r,
+        containerTokenSecretManager, logAggregationContext, containerType);
+  }
+
+  public static Token createContainerToken(ContainerId cId, long rmIdentifier,
       NodeId nodeId, String user, Resource resource,
       NMContainerTokenSecretManager containerTokenSecretManager,
       LogAggregationContext logAggregationContext)
@@ -442,6 +452,21 @@ public abstract class BaseContainerManagerTest {
             containerTokenIdentifier);
   }
 
+  public static Token createContainerToken(ContainerId cId, long rmIdentifier,
+      NodeId nodeId, String user, Resource resource,
+      NMContainerTokenSecretManager containerTokenSecretManager,
+      LogAggregationContext logAggregationContext, ContainerType continerType)
+      throws IOException {
+    ContainerTokenIdentifier containerTokenIdentifier =
+        new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource,
+            System.currentTimeMillis() + 100000L, 123, rmIdentifier,
+            Priority.newInstance(0), 0, logAggregationContext, null,
+            continerType);
+    return BuilderUtils.newContainerToken(nodeId,
+        containerTokenSecretManager.retrievePassword(containerTokenIdentifier),
+        containerTokenIdentifier);
+  }
+
   public static Token createContainerToken(ContainerId cId, int version,
       long rmIdentifier, NodeId nodeId, String user, Resource resource,
       NMContainerTokenSecretManager containerTokenSecretManager,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f1d3d0e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index bf8b500..0a834af 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.mock;
@@ -74,6 +75,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
@@ -205,7 +207,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
           "includePatternInRollingAggregation",
           "excludePatternInRollingAggregation");
    StartContainersResponse startResponse = startContainer(context, cm, cid,
-        clc, logAggregationContext);
+        clc, logAggregationContext, ContainerType.TASK);
     assertTrue(startResponse.getFailedRequests().isEmpty());
     assertEquals(1, context.getApplications().size());
     Application app = context.getApplications().get(appId);
@@ -342,7 +344,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
         null, null);
 
     StartContainersResponse startResponse = startContainer(context, cm, cid,
-        clc, null);
+        clc, null, ContainerType.TASK);
     assertTrue(startResponse.getFailedRequests().isEmpty());
     assertEquals(1, context.getApplications().size());
     Application app = context.getApplications().get(appId);
@@ -579,7 +581,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
     cm.init(conf);
     cm.start();
     StartContainersResponse startResponse = startContainer(context, cm, cid,
-        clc, logAggregationContext);
+        clc, logAggregationContext, ContainerType.TASK);
     assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
     cm.stop();
     verify(cm).handle(isA(CMgrCompletedAppsEvent.class));
@@ -595,7 +597,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
     cm.init(conf);
     cm.start();
     startResponse = startContainer(context, cm, cid,
-        clc, logAggregationContext);
+        clc, logAggregationContext, ContainerType.TASK);
     assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
     cm.stop();
     memStore.close();
@@ -612,7 +614,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
     cm.init(conf);
     cm.start();
     startResponse = startContainer(context, cm, cid,
-        clc, logAggregationContext);
+        clc, logAggregationContext, ContainerType.TASK);
     assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
     cm.stop();
     memStore.close();
@@ -661,7 +663,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
         localResources, containerEnv, commands, serviceData,
         containerTokens, acls);
     StartContainersResponse startResponse = startContainer(
-        context, cm, cid, clc, null);
+        context, cm, cid, clc, null, ContainerType.TASK);
     assertTrue(startResponse.getFailedRequests().isEmpty());
     assertEquals(1, context.getApplications().size());
     // make sure the container reaches RUNNING state
@@ -736,14 +738,15 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
 
   private StartContainersResponse startContainer(Context context,
       final ContainerManagerImpl cm, ContainerId cid,
-      ContainerLaunchContext clc, LogAggregationContext logAggregationContext)
+      ContainerLaunchContext clc, LogAggregationContext logAggregationContext,
+      ContainerType containerType)
           throws Exception {
     UserGroupInformation user = UserGroupInformation.createRemoteUser(
         cid.getApplicationAttemptId().toString());
     StartContainerRequest scReq = StartContainerRequest.newInstance(
         clc, TestContainerManager.createContainerToken(cid, 0,
             context.getNodeId(), user.getShortUserName(),
-            context.getContainerTokenSecretManager(), logAggregationContext));
+            context.getContainerTokenSecretManager(), logAggregationContext, containerType));
     final List<StartContainerRequest> scReqList =
         new ArrayList<StartContainerRequest>();
     scReqList.add(scReq);
@@ -910,4 +913,91 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
     }
   }
 
+  @Test
+  public void testApplicationRecoveryAfterFlowContextUpdated()
+      throws Exception {
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
+    conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
+    conf.set(YarnConfiguration.YARN_ADMIN_ACL, "yarn_admin_user");
+    NMStateStoreService stateStore = new NMMemoryStateStoreService();
+    stateStore.init(conf);
+    stateStore.start();
+    Context context = createContext(conf, stateStore);
+    ContainerManagerImpl cm = createContainerManager(context);
+    cm.init(conf);
+    cm.start();
+
+    // add an application by starting a container
+    String appName = "app_name1";
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
+
+    // create 1nd attempt container with containerId 2
+    ContainerId cid = ContainerId.newContainerId(attemptId, 2);
+    Map<String, LocalResource> localResources = Collections.emptyMap();
+    Map<String, String> containerEnv = new HashMap<>();
+
+    List<String> containerCmds = Collections.emptyList();
+    Map<String, ByteBuffer> serviceData = Collections.emptyMap();
+    Credentials containerCreds = new Credentials();
+    DataOutputBuffer dob = new DataOutputBuffer();
+    containerCreds.writeTokenStorageToStream(dob);
+    ByteBuffer containerTokens =
+        ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+    Map<ApplicationAccessType, String> acls =
+        new HashMap<ApplicationAccessType, String>();
+    ContainerLaunchContext clc = ContainerLaunchContext
+        .newInstance(localResources, containerEnv, containerCmds, serviceData,
+            containerTokens, acls);
+    // create the logAggregationContext
+    LogAggregationContext logAggregationContext = LogAggregationContext
+        .newInstance("includePattern", "excludePattern",
+            "includePatternInRollingAggregation",
+            "excludePatternInRollingAggregation");
+
+    StartContainersResponse startResponse =
+        startContainer(context, cm, cid, clc, logAggregationContext,
+            ContainerType.TASK);
+    assertTrue(startResponse.getFailedRequests().isEmpty());
+    assertEquals(1, context.getApplications().size());
+    ApplicationImpl app =
+        (ApplicationImpl) context.getApplications().get(appId);
+    assertNotNull(app);
+    waitForAppState(app, ApplicationState.INITING);
+    assertNull(app.getFlowName());
+
+    // 2nd attempt
+    ApplicationAttemptId attemptId2 =
+        ApplicationAttemptId.newInstance(appId, 2);
+    // create 2nd attempt master container
+    ContainerId cid2 = ContainerId.newContainerId(attemptId, 1);
+    setFlowContext(containerEnv, appName, appId);
+    // once again create for updating launch context
+    clc = ContainerLaunchContext
+        .newInstance(localResources, containerEnv, containerCmds, serviceData,
+            containerTokens, acls);
+    // start container with container type AM.
+    startResponse =
+        startContainer(context, cm, cid2, clc, logAggregationContext,
+            ContainerType.APPLICATION_MASTER);
+    assertTrue(startResponse.getFailedRequests().isEmpty());
+    assertEquals(1, context.getApplications().size());
+    waitForAppState(app, ApplicationState.INITING);
+    assertEquals(appName, app.getFlowName());
+
+    // reset container manager and verify flow context information
+    cm.stop();
+    context = createContext(conf, stateStore);
+    cm = createContainerManager(context);
+    cm.init(conf);
+    cm.start();
+    assertEquals(1, context.getApplications().size());
+    app = (ApplicationImpl) context.getApplications().get(appId);
+    assertNotNull(app);
+    assertEquals(appName, app.getFlowName());
+    waitForAppState(app, ApplicationState.INITING);
+
+    cm.stop();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org