You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/07/23 18:08:24 UTC

hadoop git commit: YARN-6966. NodeManager metrics may return wrong negative values when NM restart. (Szilard Nemeth via Haibo Chen)

Repository: hadoop
Updated Branches:
  refs/heads/trunk 3a9e25edf -> 9d3c39e9d


YARN-6966. NodeManager metrics may return wrong negative values when NM restart. (Szilard Nemeth via Haibo Chen)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9d3c39e9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9d3c39e9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9d3c39e9

Branch: refs/heads/trunk
Commit: 9d3c39e9dd88b8f32223c01328581bb68507d415
Parents: 3a9e25e
Author: Haibo Chen <ha...@apache.org>
Authored: Mon Jul 23 11:06:44 2018 -0700
Committer: Haibo Chen <ha...@apache.org>
Committed: Mon Jul 23 11:07:24 2018 -0700

----------------------------------------------------------------------
 .../containermanager/ContainerManagerImpl.java  |  2 +-
 .../scheduler/ContainerScheduler.java           | 16 ++++--
 .../recovery/NMLeveldbStateStoreService.java    | 32 ++++++-----
 .../recovery/NMNullStateStoreService.java       |  2 +-
 .../recovery/NMStateStoreService.java           |  3 +-
 .../BaseContainerManagerTest.java               |  2 +-
 .../TestContainerManagerRecovery.java           | 57 ++++++++++++++++++++
 .../TestContainerSchedulerRecovery.java         | 46 +++++++++++-----
 .../metrics/TestNodeManagerMetrics.java         |  4 +-
 .../recovery/NMMemoryStateStoreService.java     | 16 +++++-
 .../TestNMLeveldbStateStoreService.java         | 21 +++++++-
 11 files changed, 163 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index ad63720..89bef8f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -496,7 +496,7 @@ public class ContainerManagerImpl extends CompositeService implements
     Container container = new ContainerImpl(getConfig(), dispatcher,
         launchContext, credentials, metrics, token, context, rcs);
     context.getContainers().put(token.getContainerID(), container);
-    containerScheduler.recoverActiveContainer(container, rcs.getStatus());
+    containerScheduler.recoverActiveContainer(container, rcs);
     app.handle(new ApplicationContainerInitEvent(container));
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
index 5cdcf41..a61b9d1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
@@ -44,6 +44,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai
 
 
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService
+        .RecoveredContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -259,11 +262,11 @@ public class ContainerScheduler extends AbstractService implements
    * @param rcs Recovered Container status
    */
   public void recoverActiveContainer(Container container,
-      RecoveredContainerStatus rcs) {
+      RecoveredContainerState rcs) {
     ExecutionType execType =
         container.getContainerTokenIdentifier().getExecutionType();
-    if (rcs == RecoveredContainerStatus.QUEUED
-        || rcs == RecoveredContainerStatus.PAUSED) {
+    if (rcs.getStatus() == RecoveredContainerStatus.QUEUED
+        || rcs.getStatus() == RecoveredContainerStatus.PAUSED) {
       if (execType == ExecutionType.GUARANTEED) {
         queuedGuaranteedContainers.put(container.getContainerId(), container);
       } else if (execType == ExecutionType.OPPORTUNISTIC) {
@@ -274,10 +277,15 @@ public class ContainerScheduler extends AbstractService implements
             "UnKnown execution type received " + container.getContainerId()
                 + ", execType " + execType);
       }
-    } else if (rcs == RecoveredContainerStatus.LAUNCHED) {
+    } else if (rcs.getStatus() == RecoveredContainerStatus.LAUNCHED) {
       runningContainers.put(container.getContainerId(), container);
       utilizationTracker.addContainerResources(container);
     }
+    if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED
+            && rcs.getCapability() != null) {
+      metrics.launchedContainer();
+      metrics.allocateContainer(rcs.getCapability());
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index 6f643b0..44f5e18 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.fusesource.leveldbjni.JniDBFactory;
@@ -237,7 +238,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
       iter.seek(bytes(CONTAINERS_KEY_PREFIX));
 
       while (iter.hasNext()) {
-        Entry<byte[],byte[]> entry = iter.peekNext();
+        Entry<byte[], byte[]> entry = iter.peekNext();
         String key = asString(entry.getKey());
         if (!key.startsWith(CONTAINERS_KEY_PREFIX)) {
           break;
@@ -299,6 +300,10 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
       if (suffix.equals(CONTAINER_REQUEST_KEY_SUFFIX)) {
         rcs.startRequest = new StartContainerRequestPBImpl(
             StartContainerRequestProto.parseFrom(entry.getValue()));
+        ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
+            .newContainerTokenIdentifier(rcs.startRequest.getContainerToken());
+        rcs.capability = new ResourcePBImpl(
+            containerTokenIdentifier.getProto().getResource());
       } else if (suffix.equals(CONTAINER_VERSION_KEY_SUFFIX)) {
         rcs.version = Integer.parseInt(asString(entry.getValue()));
       } else if (suffix.equals(CONTAINER_START_TIME_KEY_SUFFIX)) {
@@ -382,24 +387,25 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
       LOG.debug("storeContainer: containerId= " + idStr
           + ", startRequest= " + startRequest);
     }
-    String keyRequest = getContainerKey(idStr, CONTAINER_REQUEST_KEY_SUFFIX);
-    String keyVersion = getContainerVersionKey(idStr);
-    String keyStartTime =
+    final String keyVersion = getContainerVersionKey(idStr);
+    final String keyRequest =
+        getContainerKey(idStr, CONTAINER_REQUEST_KEY_SUFFIX);
+    final StartContainerRequestProto startContainerRequest =
+        ((StartContainerRequestPBImpl) startRequest).getProto();
+
+    final String keyStartTime =
         getContainerKey(idStr, CONTAINER_START_TIME_KEY_SUFFIX);
+    final String startTimeValue = Long.toString(startTime);
+
     try {
-      WriteBatch batch = db.createWriteBatch();
-      try {
-        batch.put(bytes(keyRequest),
-            ((StartContainerRequestPBImpl) startRequest).getProto().
-                toByteArray());
-        batch.put(bytes(keyStartTime), bytes(Long.toString(startTime)));
+      try (WriteBatch batch = db.createWriteBatch()) {
+        batch.put(bytes(keyRequest), startContainerRequest.toByteArray());
+        batch.put(bytes(keyStartTime), bytes(startTimeValue));
         if (containerVersion != 0) {
           batch.put(bytes(keyVersion),
-              bytes(Integer.toString(containerVersion)));
+                  bytes(Integer.toString(containerVersion)));
         }
         db.write(batch);
-      } finally {
-        batch.close();
       }
     } catch (DBException e) {
       markStoreUnHealthy(e);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
index f217f2f..dfad9cf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
@@ -73,7 +73,7 @@ public class NMNullStateStoreService extends NMStateStoreService {
 
   @Override
   public void storeContainer(ContainerId containerId, int version,
-      long startTime, StartContainerRequest startRequest) throws IOException {
+      long startTime, StartContainerRequest startRequest) {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
index 0ea0ef3..70decdb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
@@ -416,7 +416,8 @@ public abstract class NMStateStoreService extends AbstractService {
    * @throws IOException
    */
   public abstract void storeContainer(ContainerId containerId,
-      int containerVersion, long startTime, StartContainerRequest startRequest)
+          int containerVersion, long startTime,
+          StartContainerRequest startRequest)
       throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
index b31601c..493aa4c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
@@ -107,7 +107,7 @@ public abstract class BaseContainerManagerTest {
   protected static File remoteLogDir;
   protected static File tmpDir;
 
-  protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
+  protected NodeManagerMetrics metrics = NodeManagerMetrics.create();
 
   public BaseContainerManagerTest() throws UnsupportedFileSystemException {
     localFS = FileContext.getLocalFSFileContext();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index 0a834af..a144adf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.ServerSocketUtil;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -106,6 +107,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler;
 
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.metrics.TestNodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@@ -401,6 +403,61 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
   }
 
   @Test
+  public void testNodeManagerMetricsRecovery() throws Exception {
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
+
+    NMStateStoreService stateStore = new NMMemoryStateStoreService();
+    stateStore.init(conf);
+    stateStore.start();
+    Context context = createContext(conf, stateStore);
+    ContainerManagerImpl cm = createContainerManager(context, delSrvc);
+    cm.init(conf);
+    cm.start();
+    metrics.addResource(Resource.newInstance(10240, 8));
+
+    // add an application by starting a container
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
+    ContainerId cid = ContainerId.newContainerId(attemptId, 1);
+    Map<String, String> containerEnv = Collections.emptyMap();
+    Map<String, ByteBuffer> serviceData = Collections.emptyMap();
+    Map<String, LocalResource> localResources = Collections.emptyMap();
+    List<String> commands = Arrays.asList("sleep 60s".split(" "));
+    ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
+        localResources, containerEnv, commands, serviceData,
+        null, null);
+    StartContainersResponse startResponse = startContainer(context, cm, cid,
+        clc, null, ContainerType.TASK);
+    assertTrue(startResponse.getFailedRequests().isEmpty());
+    assertEquals(1, context.getApplications().size());
+    Application app = context.getApplications().get(appId);
+    assertNotNull(app);
+
+    // make sure the container reaches RUNNING state
+    waitForNMContainerState(cm, cid,
+        org.apache.hadoop.yarn.server.nodemanager
+            .containermanager.container.ContainerState.RUNNING);
+    TestNodeManagerMetrics.checkMetrics(1, 0, 0, 0, 0, 1, 1, 1, 9, 1, 7);
+
+    // restart and verify metrics could be recovered
+    cm.stop();
+    DefaultMetricsSystem.shutdown();
+    metrics = NodeManagerMetrics.create();
+    metrics.addResource(Resource.newInstance(10240, 8));
+    TestNodeManagerMetrics.checkMetrics(0, 0, 0, 0, 0, 0, 0, 0, 10, 0, 8);
+    context = createContext(conf, stateStore);
+    cm = createContainerManager(context, delSrvc);
+    cm.init(conf);
+    cm.start();
+    assertEquals(1, context.getApplications().size());
+    app = context.getApplications().get(appId);
+    assertNotNull(app);
+    TestNodeManagerMetrics.checkMetrics(1, 0, 0, 0, 0, 1, 1, 1, 9, 1, 7);
+    cm.stop();
+  }
+
+  @Test
   public void testContainerResizeRecovery() throws Exception {
     conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
     conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java
index 2ae8b97..6b3ac67 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerRecovery.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.doNothing;
@@ -31,6 +32,8 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService
+        .RecoveredContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
 import org.junit.After;
 import org.junit.Before;
@@ -71,6 +74,13 @@ public class TestContainerSchedulerRecovery {
 
   private ContainerScheduler spy;
 
+  private RecoveredContainerState createRecoveredContainerState(
+      RecoveredContainerStatus status) {
+    RecoveredContainerState mockState = mock(RecoveredContainerState.class);
+    when(mockState.getStatus()).thenReturn(status);
+    return mockState;
+  }
+
   @Before public void setUp() throws Exception {
     MockitoAnnotations.initMocks(this);
     spy = spy(tempContainerScheduler);
@@ -94,7 +104,8 @@ public class TestContainerSchedulerRecovery {
     assertEquals(0, spy.getNumQueuedGuaranteedContainers());
     assertEquals(0, spy.getNumQueuedOpportunisticContainers());
     assertEquals(0, spy.getNumRunningContainers());
-    RecoveredContainerStatus rcs = RecoveredContainerStatus.QUEUED;
+    RecoveredContainerState rcs =
+            createRecoveredContainerState(RecoveredContainerStatus.QUEUED);
     when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
     when(container.getContainerTokenIdentifier()).thenReturn(token);
     spy.recoverActiveContainer(container, rcs);
@@ -113,7 +124,8 @@ public class TestContainerSchedulerRecovery {
     assertEquals(0, spy.getNumQueuedGuaranteedContainers());
     assertEquals(0, spy.getNumQueuedOpportunisticContainers());
     assertEquals(0, spy.getNumRunningContainers());
-    RecoveredContainerStatus rcs = RecoveredContainerStatus.QUEUED;
+    RecoveredContainerState rcs =
+            createRecoveredContainerState(RecoveredContainerStatus.QUEUED);
     when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC);
     when(container.getContainerTokenIdentifier()).thenReturn(token);
     spy.recoverActiveContainer(container, rcs);
@@ -132,7 +144,8 @@ public class TestContainerSchedulerRecovery {
     assertEquals(0, spy.getNumQueuedGuaranteedContainers());
     assertEquals(0, spy.getNumQueuedOpportunisticContainers());
     assertEquals(0, spy.getNumRunningContainers());
-    RecoveredContainerStatus rcs = RecoveredContainerStatus.PAUSED;
+    RecoveredContainerState rcs =
+        createRecoveredContainerState(RecoveredContainerStatus.PAUSED);
     when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
     when(container.getContainerTokenIdentifier()).thenReturn(token);
     spy.recoverActiveContainer(container, rcs);
@@ -151,7 +164,8 @@ public class TestContainerSchedulerRecovery {
     assertEquals(0, spy.getNumQueuedGuaranteedContainers());
     assertEquals(0, spy.getNumQueuedOpportunisticContainers());
     assertEquals(0, spy.getNumRunningContainers());
-    RecoveredContainerStatus rcs = RecoveredContainerStatus.PAUSED;
+    RecoveredContainerState rcs =
+            createRecoveredContainerState(RecoveredContainerStatus.PAUSED);
     when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC);
     when(container.getContainerTokenIdentifier()).thenReturn(token);
     spy.recoverActiveContainer(container, rcs);
@@ -170,7 +184,8 @@ public class TestContainerSchedulerRecovery {
     assertEquals(0, spy.getNumQueuedGuaranteedContainers());
     assertEquals(0, spy.getNumQueuedOpportunisticContainers());
     assertEquals(0, spy.getNumRunningContainers());
-    RecoveredContainerStatus rcs = RecoveredContainerStatus.LAUNCHED;
+    RecoveredContainerState rcs =
+            createRecoveredContainerState(RecoveredContainerStatus.LAUNCHED);
     when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
     when(container.getContainerTokenIdentifier()).thenReturn(token);
     spy.recoverActiveContainer(container, rcs);
@@ -189,7 +204,8 @@ public class TestContainerSchedulerRecovery {
     assertEquals(0, spy.getNumQueuedGuaranteedContainers());
     assertEquals(0, spy.getNumQueuedOpportunisticContainers());
     assertEquals(0, spy.getNumRunningContainers());
-    RecoveredContainerStatus rcs = RecoveredContainerStatus.LAUNCHED;
+    RecoveredContainerState rcs =
+            createRecoveredContainerState(RecoveredContainerStatus.LAUNCHED);
     when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC);
     when(container.getContainerTokenIdentifier()).thenReturn(token);
     spy.recoverActiveContainer(container, rcs);
@@ -208,7 +224,8 @@ public class TestContainerSchedulerRecovery {
     assertEquals(0, spy.getNumQueuedGuaranteedContainers());
     assertEquals(0, spy.getNumQueuedOpportunisticContainers());
     assertEquals(0, spy.getNumRunningContainers());
-    RecoveredContainerStatus rcs = RecoveredContainerStatus.REQUESTED;
+    RecoveredContainerState rcs =
+            createRecoveredContainerState(RecoveredContainerStatus.REQUESTED);
     when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
     when(container.getContainerTokenIdentifier()).thenReturn(token);
     spy.recoverActiveContainer(container, rcs);
@@ -227,7 +244,8 @@ public class TestContainerSchedulerRecovery {
     assertEquals(0, spy.getNumQueuedGuaranteedContainers());
     assertEquals(0, spy.getNumQueuedOpportunisticContainers());
     assertEquals(0, spy.getNumRunningContainers());
-    RecoveredContainerStatus rcs = RecoveredContainerStatus.REQUESTED;
+    RecoveredContainerState rcs =
+            createRecoveredContainerState(RecoveredContainerStatus.REQUESTED);
     when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC);
     when(container.getContainerTokenIdentifier()).thenReturn(token);
     spy.recoverActiveContainer(container, rcs);
@@ -246,7 +264,8 @@ public class TestContainerSchedulerRecovery {
     assertEquals(0, spy.getNumQueuedGuaranteedContainers());
     assertEquals(0, spy.getNumQueuedOpportunisticContainers());
     assertEquals(0, spy.getNumRunningContainers());
-    RecoveredContainerStatus rcs = RecoveredContainerStatus.COMPLETED;
+    RecoveredContainerState rcs =
+            createRecoveredContainerState(RecoveredContainerStatus.COMPLETED);
     when(token.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
     when(container.getContainerTokenIdentifier()).thenReturn(token);
     spy.recoverActiveContainer(container, rcs);
@@ -265,7 +284,8 @@ public class TestContainerSchedulerRecovery {
     assertEquals(0, spy.getNumQueuedGuaranteedContainers());
     assertEquals(0, spy.getNumQueuedOpportunisticContainers());
     assertEquals(0, spy.getNumRunningContainers());
-    RecoveredContainerStatus rcs = RecoveredContainerStatus.COMPLETED;
+    RecoveredContainerState rcs =
+            createRecoveredContainerState(RecoveredContainerStatus.COMPLETED);
     when(token.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC);
     when(container.getContainerTokenIdentifier()).thenReturn(token);
     spy.recoverActiveContainer(container, rcs);
@@ -284,7 +304,8 @@ public class TestContainerSchedulerRecovery {
     assertEquals(0, spy.getNumQueuedGuaranteedContainers());
     assertEquals(0, spy.getNumQueuedOpportunisticContainers());
     assertEquals(0, spy.getNumRunningContainers());
-    RecoveredContainerStatus rcs = RecoveredContainerStatus.QUEUED;
+    RecoveredContainerState rcs =
+            createRecoveredContainerState(RecoveredContainerStatus.QUEUED);
     when(container.getContainerTokenIdentifier()).thenReturn(token);
     spy.recoverActiveContainer(container, rcs);
     assertEquals(0, spy.getNumQueuedGuaranteedContainers());
@@ -302,7 +323,8 @@ public class TestContainerSchedulerRecovery {
     assertEquals(0, spy.getNumQueuedGuaranteedContainers());
     assertEquals(0, spy.getNumQueuedOpportunisticContainers());
     assertEquals(0, spy.getNumRunningContainers());
-    RecoveredContainerStatus rcs = RecoveredContainerStatus.PAUSED;
+    RecoveredContainerState rcs =
+            createRecoveredContainerState(RecoveredContainerStatus.PAUSED);
     when(container.getContainerTokenIdentifier()).thenReturn(token);
     spy.recoverActiveContainer(container, rcs);
     assertEquals(0, spy.getNumQueuedGuaranteedContainers());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java
index d21e7ad..c5f80ba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java
@@ -113,8 +113,8 @@ public class TestNodeManagerMetrics {
     assertGauge("AvailableVCores", 19, rb);
   }
 
-  private void checkMetrics(int launched, int completed, int failed, int killed,
-      int initing, int running, int allocatedGB,
+  public static void checkMetrics(int launched, int completed, int failed,
+      int killed, int initing, int running, int allocatedGB,
       int allocatedContainers, int availableGB, int allocatedVCores,
       int availableVCores) {
     MetricsRecordBuilder rb = getMetrics("NodeManagerMetrics");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
index b67d11f..c5428d1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
@@ -45,6 +46,9 @@ import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
 
+
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+
 public class NMMemoryStateStoreService extends NMStateStoreService {
   private Map<ApplicationId, ContainerManagerApplicationProto> apps;
   private Map<ContainerId, RecoveredContainerState> containerStates;
@@ -132,11 +136,19 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
 
   @Override
   public synchronized void storeContainer(ContainerId containerId,
-      int version, long startTime, StartContainerRequest startRequest)
-      throws IOException {
+      int version, long startTime, StartContainerRequest startRequest) {
     RecoveredContainerState rcs = new RecoveredContainerState();
     rcs.startRequest = startRequest;
     rcs.version = version;
+    try {
+      ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
+          .newContainerTokenIdentifier(startRequest.getContainerToken());
+      rcs.capability =
+          new ResourcePBImpl(containerTokenIdentifier.getProto().getResource());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
     rcs.setStartTime(startTime);
     containerStates.put(containerId, rcs);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d3c39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index 265b3e6..c8c07d1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -238,7 +238,9 @@ public class TestNMLeveldbStateStoreService {
     ApplicationAttemptId appAttemptId =
         ApplicationAttemptId.newInstance(appId, 4);
     ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
-    StartContainerRequest containerReq = createContainerRequest(containerId);
+    Resource containerResource = Resource.newInstance(1024, 2);
+    StartContainerRequest containerReq =
+        createContainerRequest(containerId, containerResource);
 
     // store a container and verify recovered
     long containerStartTime = System.currentTimeMillis();
@@ -260,6 +262,7 @@ public class TestNMLeveldbStateStoreService {
     assertEquals(false, rcs.getKilled());
     assertEquals(containerReq, rcs.getStartRequest());
     assertTrue(rcs.getDiagnostics().isEmpty());
+    assertEquals(containerResource, rcs.getCapability());
 
     // store a new container record without StartContainerRequest
     ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6);
@@ -279,6 +282,7 @@ public class TestNMLeveldbStateStoreService {
     assertEquals(false, rcs.getKilled());
     assertEquals(containerReq, rcs.getStartRequest());
     assertTrue(rcs.getDiagnostics().isEmpty());
+    assertEquals(containerResource, rcs.getCapability());
 
     // launch the container, add some diagnostics, and verify recovered
     StringBuilder diags = new StringBuilder();
@@ -294,6 +298,7 @@ public class TestNMLeveldbStateStoreService {
     assertEquals(false, rcs.getKilled());
     assertEquals(containerReq, rcs.getStartRequest());
     assertEquals(diags.toString(), rcs.getDiagnostics());
+    assertEquals(containerResource, rcs.getCapability());
 
     // pause the container, and verify recovered
     stateStore.storeContainerPaused(containerId);
@@ -395,7 +400,17 @@ public class TestNMLeveldbStateStoreService {
   }
 
   private StartContainerRequest createContainerRequest(
+          ContainerId containerId, Resource res) {
+    return createContainerRequestInternal(containerId, res);
+  }
+
+  private StartContainerRequest createContainerRequest(
       ContainerId containerId) {
+    return createContainerRequestInternal(containerId, null);
+  }
+
+  private StartContainerRequest createContainerRequestInternal(ContainerId
+          containerId, Resource res) {
     LocalResource lrsrc = LocalResource.newInstance(
         URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
         LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
@@ -421,6 +436,10 @@ public class TestNMLeveldbStateStoreService {
         localResources, env, containerCmds, serviceData, containerTokens,
         acls);
     Resource containerRsrc = Resource.newInstance(1357, 3);
+
+    if (res != null) {
+      containerRsrc = res;
+    }
     ContainerTokenIdentifier containerTokenId =
         new ContainerTokenIdentifier(containerId, "host", "user",
             containerRsrc, 9876543210L, 42, 2468, Priority.newInstance(7),


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org