You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2019/11/26 20:45:21 UTC

[hadoop] branch trunk updated: YARN-9362. Code cleanup in TestNMLeveldbStateStoreService. Contributed by Denes Gerencser

This is an automated email from the ASF dual-hosted git repository.

snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 828ab40  YARN-9362. Code cleanup in TestNMLeveldbStateStoreService. Contributed by Denes Gerencser
828ab40 is described below

commit 828ab400eea64ebb628a36cc3d0d53de0bf38934
Author: Szilard Nemeth <sn...@apache.org>
AuthorDate: Tue Nov 26 21:44:22 2019 +0100

    YARN-9362. Code cleanup in TestNMLeveldbStateStoreService. Contributed by Denes Gerencser
---
 .../recovery/TestNMLeveldbStateStoreService.java   | 562 +++++++++++++++------
 1 file changed, 406 insertions(+), 156 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index 8c27474..06ad727 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -329,91 +329,105 @@ public class TestNMLeveldbStateStoreService {
     assertEquals(appProto1, apps.get(0));
   }
 
-  @Test
-  public void testContainerStorage() throws IOException {
-    // test empty when no state
-    List<RecoveredContainerState> recoveredContainers =
-        loadContainersState(stateStore.getContainerStateIterator());
-    assertTrue(recoveredContainers.isEmpty());
-
-    // create a container request
-    ApplicationId appId = ApplicationId.newInstance(1234, 3);
-    ApplicationAttemptId appAttemptId =
-        ApplicationAttemptId.newInstance(appId, 4);
-    ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
-    Resource containerResource = Resource.newInstance(1024, 2);
-    StartContainerRequest containerReq =
-        createContainerRequest(containerId, containerResource);
-
-    // store a container and verify recovered
-    long containerStartTime = System.currentTimeMillis();
-    stateStore.storeContainer(containerId, 0, containerStartTime, containerReq);
-
-    // verify the container version key is not stored for new containers
-    DB db = stateStore.getDB();
-    assertNull("version key present for new container", db.get(bytes(
-        stateStore.getContainerVersionKey(containerId.toString()))));
 
+  @Test
+  public void testContainerStorageWhenContainerIsRequested()
+      throws IOException {
+    final ContainerStateConstructParams containerParams =
+        storeContainerInStateStore();
     restartStateStore();
-    recoveredContainers =
+
+    List<RecoveredContainerState> recoveredContainers =
         loadContainersState(stateStore.getContainerStateIterator());
     assertEquals(1, recoveredContainers.size());
-    RecoveredContainerState rcs = recoveredContainers.get(0);
+    final RecoveredContainerState rcs = recoveredContainers.get(0);
     assertEquals(0, rcs.getVersion());
-    assertEquals(containerStartTime, rcs.getStartTime());
+    assertEquals(containerParams.getContainerStartTime().longValue(),
+        rcs.getStartTime());
     assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
     assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
     assertEquals(false, rcs.getKilled());
-    assertEquals(containerReq, rcs.getStartRequest());
+    assertEquals(containerParams.getContainerRequest(), rcs.getStartRequest());
     assertTrue(rcs.getDiagnostics().isEmpty());
-    assertEquals(containerResource, rcs.getCapability());
+    assertEquals(containerParams.getContainerResource(), rcs.getCapability());
+  }
+
 
-    // store a new container record without StartContainerRequest
-    ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6);
-    stateStore.storeContainerLaunched(containerId1);
-    recoveredContainers =
-        loadContainersState(stateStore.getContainerStateIterator());
-    // check whether the new container record is discarded
-    assertEquals(1, recoveredContainers.size());
 
-    // queue the container, and verify recovered
+  @Test
+  public void testContainerStorageWhenContainerIsQueued()
+      throws IOException {
+    ContainerStateConstructParams containerParams =
+        storeContainerInStateStore();
+    ContainerId containerId = containerParams.getContainerId();
+    StartContainerRequest containerReq = containerParams.getContainerRequest();
+    Resource containerResource = containerParams.getContainerResource();
+    ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();
+
+    storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);
+
     stateStore.storeContainerQueued(containerId);
     restartStateStore();
-    recoveredContainers =
+
+    List<RecoveredContainerState> recoveredContainers =
         loadContainersState(stateStore.getContainerStateIterator());
     assertEquals(1, recoveredContainers.size());
-    rcs = recoveredContainers.get(0);
+    RecoveredContainerState rcs = recoveredContainers.get(0);
     assertEquals(RecoveredContainerStatus.QUEUED, rcs.getStatus());
     assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
     assertEquals(false, rcs.getKilled());
     assertEquals(containerReq, rcs.getStartRequest());
     assertTrue(rcs.getDiagnostics().isEmpty());
     assertEquals(containerResource, rcs.getCapability());
+  }
 
-    // launch the container, add some diagnostics, and verify recovered
-    StringBuilder diags = new StringBuilder();
-    stateStore.storeContainerLaunched(containerId);
-    diags.append("some diags for container");
-    stateStore.storeContainerDiagnostics(containerId, diags);
+  @Test
+  public void testContainerStorageWhenContainerIsLaunched()
+      throws IOException {
+    ContainerStateConstructParams containerParams =
+        storeContainerInStateStore();
+    ContainerId containerId = containerParams.getContainerId();
+    StartContainerRequest containerReq = containerParams.getContainerRequest();
+    Resource containerResource = containerParams.getContainerResource();
+    ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();
+
+    storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);
+    stateStore.storeContainerQueued(containerId);
+
+    StringBuilder diags = launchContainerWithDiagnostics(containerId);
     restartStateStore();
-    recoveredContainers =
+
+    List<RecoveredContainerState> recoveredContainers =
         loadContainersState(stateStore.getContainerStateIterator());
     assertEquals(1, recoveredContainers.size());
-    rcs = recoveredContainers.get(0);
+    RecoveredContainerState rcs = recoveredContainers.get(0);
     assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
     assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
     assertEquals(false, rcs.getKilled());
     assertEquals(containerReq, rcs.getStartRequest());
     assertEquals(diags.toString(), rcs.getDiagnostics());
     assertEquals(containerResource, rcs.getCapability());
+  }
+
+  @Test
+  public void testContainerStorageWhenContainerIsPaused()
+      throws IOException {
+    ContainerStateConstructParams containerParams =
+        storeContainerInStateStore();
+    ContainerId containerId = containerParams.getContainerId();
+    StartContainerRequest containerReq = containerParams.getContainerRequest();
+    ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();
+
+    storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);
+    stateStore.storeContainerQueued(containerId);
 
-    // pause the container, and verify recovered
     stateStore.storeContainerPaused(containerId);
     restartStateStore();
-    recoveredContainers =
+
+    List<RecoveredContainerState> recoveredContainers =
         loadContainersState(stateStore.getContainerStateIterator());
     assertEquals(1, recoveredContainers.size());
-    rcs = recoveredContainers.get(0);
+    RecoveredContainerState rcs = recoveredContainers.get(0);
     assertEquals(RecoveredContainerStatus.PAUSED, rcs.getStatus());
     assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
     assertEquals(false, rcs.getKilled());
@@ -425,82 +439,261 @@ public class TestNMLeveldbStateStoreService {
     recoveredContainers =
         loadContainersState(stateStore.getContainerStateIterator());
     assertEquals(1, recoveredContainers.size());
+  }
 
-    // increase the container size, and verify recovered
-    ContainerTokenIdentifier updateTokenIdentifier =
-        new ContainerTokenIdentifier(containerId, "host", "user",
-            Resource.newInstance(2468, 4), 9876543210L, 42, 2468,
-            Priority.newInstance(7), 13579);
+  @Test
+  public void testContainerStorageWhenContainerSizeIncreased()
+      throws IOException {
+    ContainerStateConstructParams containerParams =
+        storeContainerInStateStore();
+    ContainerId containerId = containerParams.getContainerId();
+    ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();
 
-    stateStore
-        .storeContainerUpdateToken(containerId, updateTokenIdentifier);
+    storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);
+    stateStore.storeContainerQueued(containerId);
+    launchContainerWithDiagnostics(containerId);
+
+    increaseContainerSize(containerId);
     restartStateStore();
-    recoveredContainers =
+
+    List<RecoveredContainerState> recoveredContainers =
         loadContainersState(stateStore.getContainerStateIterator());
     assertEquals(1, recoveredContainers.size());
-    rcs = recoveredContainers.get(0);
+    RecoveredContainerState rcs = recoveredContainers.get(0);
     assertEquals(0, rcs.getVersion());
     assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
     assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
     assertEquals(false, rcs.getKilled());
     assertEquals(Resource.newInstance(2468, 4), rcs.getCapability());
+  }
 
-    // mark the container killed, add some more diags, and verify recovered
-    diags.append("some more diags for container");
-    stateStore.storeContainerDiagnostics(containerId, diags);
-    stateStore.storeContainerKilled(containerId);
+  @Test
+  public void testContainerStorageWhenContainerMarkedAsKilled()
+      throws IOException {
+    ContainerStateConstructParams containerParams =
+        storeContainerInStateStore();
+    ContainerId containerId = containerParams.getContainerId();
+    ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();
+
+    storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);
+    stateStore.storeContainerQueued(containerId);
+    StringBuilder diags = launchContainerWithDiagnostics(containerId);
+    ContainerTokenIdentifier updateTokenIdentifier =
+        increaseContainerSize(containerId);
+
+    markContainerAsKilled(containerId, diags);
     restartStateStore();
-    recoveredContainers =
+
+    List<RecoveredContainerState> recoveredContainers =
         loadContainersState(stateStore.getContainerStateIterator());
     assertEquals(1, recoveredContainers.size());
-    rcs = recoveredContainers.get(0);
+    RecoveredContainerState rcs = recoveredContainers.get(0);
     assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
     assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
     assertTrue(rcs.getKilled());
     ContainerTokenIdentifier tokenReadFromRequest = BuilderUtils
-        .newContainerTokenIdentifier(rcs.getStartRequest().getContainerToken());
+        .newContainerTokenIdentifier(rcs.getStartRequest()
+            .getContainerToken());
     assertEquals(updateTokenIdentifier, tokenReadFromRequest);
     assertEquals(diags.toString(), rcs.getDiagnostics());
+  }
 
-    // add yet more diags, mark container completed, and verify recovered
+  @Test
+  public void testContainerStorageWhenContainerCompleted()
+      throws IOException {
+    ContainerStateConstructParams containerParams =
+        storeContainerInStateStore();
+    ContainerId containerId = containerParams.getContainerId();
+    ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();
+
+    storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);
+    stateStore.storeContainerQueued(containerId);
+    StringBuilder diags = launchContainerWithDiagnostics(containerId);
+    markContainerAsKilled(containerId, diags);
+
+    // add yet more diags, mark container completed
     diags.append("some final diags");
     stateStore.storeContainerDiagnostics(containerId, diags);
     stateStore.storeContainerCompleted(containerId, 21);
     restartStateStore();
-    recoveredContainers =
+
+    List<RecoveredContainerState> recoveredContainers =
         loadContainersState(stateStore.getContainerStateIterator());
     assertEquals(1, recoveredContainers.size());
-    rcs = recoveredContainers.get(0);
+    RecoveredContainerState rcs = recoveredContainers.get(0);
     assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus());
     assertEquals(21, rcs.getExitCode());
     assertTrue(rcs.getKilled());
     assertEquals(diags.toString(), rcs.getDiagnostics());
+  }
+
+  @Test
+  public void testContainerStorage() throws IOException {
+    ContainerStateConstructParams containerParams =
+        storeContainerInStateStore();
+    ContainerId containerId = containerParams.getContainerId();
 
-    // store remainingRetryAttempts, workDir and logDir
+    // remaining retry attempts, work dir and log dir are stored
     stateStore.storeContainerRemainingRetryAttempts(containerId, 6);
     stateStore.storeContainerWorkDir(containerId, "/test/workdir");
     stateStore.storeContainerLogDir(containerId, "/test/logdir");
     restartStateStore();
-    recoveredContainers =
+
+    List<RecoveredContainerState> recoveredContainers =
         loadContainersState(stateStore.getContainerStateIterator());
     assertEquals(1, recoveredContainers.size());
-    rcs = recoveredContainers.get(0);
+    RecoveredContainerState rcs = recoveredContainers.get(0);
     assertEquals(6, rcs.getRemainingRetryAttempts());
     assertEquals("/test/workdir", rcs.getWorkDir());
     assertEquals("/test/logdir", rcs.getLogDir());
-
     validateRetryAttempts(containerId);
+  }
+
+  @Test
+  public void testContainerStorageWhenContainerRemoved()
+      throws IOException {
+    ContainerStateConstructParams containerParams =
+        storeContainerInStateStore();
+    ContainerId containerId = containerParams.getContainerId();
+
     // remove the container and verify not recovered
     stateStore.removeContainer(containerId);
     restartStateStore();
-    recoveredContainers =
+    List<RecoveredContainerState> recoveredContainers =
         loadContainersState(stateStore.getContainerStateIterator());
     assertTrue(recoveredContainers.isEmpty());
     // recover again to check remove clears all containers
     restartStateStore();
     NMStateStoreService nmStoreSpy = spy(stateStore);
     loadContainersState(nmStoreSpy.getContainerStateIterator());
-    verify(nmStoreSpy,times(0)).removeContainer(any(ContainerId.class));
+    verify(nmStoreSpy, times(0)).removeContainer(any(ContainerId.class));
+  }
+
+  private ContainerStateConstructParams storeContainerInStateStore()
+      throws IOException {
+    // test empty when no state
+    List<RecoveredContainerState> recoveredContainers =
+        loadContainersState(stateStore.getContainerStateIterator());
+    assertTrue(recoveredContainers.isEmpty());
+
+    // create a container request
+    ApplicationId appId = ApplicationId.newInstance(1234, 3);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 4);
+    ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
+    Resource containerResource = Resource.newInstance(1024, 2);
+    StartContainerRequest containerReq =
+        createContainerRequest(containerId, containerResource);
+
+    long anyContainerStartTime = 1573155078494L;
+    stateStore.storeContainer(containerId, 0, anyContainerStartTime,
+        containerReq);
+
+    // verify the container version key is not stored for new containers
+    DB db = stateStore.getDB();
+    assertNull("version key present for new container", db.get(bytes(
+        stateStore.getContainerVersionKey(containerId.toString()))));
+
+    return new ContainerStateConstructParams()
+        .setContainerRequest(containerReq)
+        .setContainerResource(containerResource)
+        .setContainerStartTime(anyContainerStartTime)
+        .setAppAttemptId(appAttemptId)
+        .setContainerId(containerId);
+  }
+
+  private static class ContainerStateConstructParams {
+    private StartContainerRequest containerRequest;
+    private Resource containerResource;
+    private Long containerStartTime;
+    private ApplicationAttemptId appAttemptId;
+    private ContainerId containerId;
+
+    public ApplicationAttemptId getAppAttemptId() {
+      return appAttemptId;
+    }
+    public ContainerStateConstructParams setAppAttemptId(ApplicationAttemptId
+        theAppAttemptId) {
+      this.appAttemptId = theAppAttemptId;
+      return this;
+    }
+    public ContainerId getContainerId() {
+      return containerId;
+    }
+    public ContainerStateConstructParams setContainerId(ContainerId
+        theContainerId) {
+      this.containerId = theContainerId;
+      return this;
+    }
+
+    public StartContainerRequest getContainerRequest() {
+      return containerRequest;
+    }
+    public ContainerStateConstructParams setContainerRequest(
+        StartContainerRequest theContainerRequest) {
+      this.containerRequest = theContainerRequest;
+      return this;
+    }
+
+    public Resource getContainerResource() {
+      return containerResource;
+    }
+
+    public ContainerStateConstructParams setContainerResource(
+        Resource theContainerResource) {
+      this.containerResource = theContainerResource;
+      return this;
+    }
+
+    public Long getContainerStartTime() {
+      return containerStartTime;
+    }
+
+    public ContainerStateConstructParams setContainerStartTime(
+        Long theContainerStartTime) {
+      this.containerStartTime = theContainerStartTime;
+      return this;
+    }
+  }
+
+  private void markContainerAsKilled(ContainerId containerId,
+      StringBuilder diags) throws IOException {
+    // mark the container killed, add some more diags
+    diags.append("some more diags for container");
+    stateStore.storeContainerDiagnostics(containerId, diags);
+    stateStore.storeContainerKilled(containerId);
+  }
+
+  private ContainerTokenIdentifier increaseContainerSize(
+      ContainerId containerId) throws IOException {
+    ContainerTokenIdentifier updateTokenIdentifier =
+        new ContainerTokenIdentifier(containerId, "host", "user",
+            Resource.newInstance(2468, 4), 9876543210L, 42, 2468,
+            Priority.newInstance(7), 13579);
+    stateStore
+        .storeContainerUpdateToken(containerId, updateTokenIdentifier);
+    return updateTokenIdentifier;
+  }
+
+  private StringBuilder launchContainerWithDiagnostics(ContainerId containerId)
+      throws IOException {
+    StringBuilder diags = new StringBuilder();
+    stateStore.storeContainerLaunched(containerId);
+    diags.append("some diags for container");
+    stateStore.storeContainerDiagnostics(containerId, diags);
+    return diags;
+  }
+
+  private void storeNewContainerRecordWithoutStartContainerRequest(
+      ApplicationAttemptId appAttemptId) throws IOException {
+    // store a new container record without StartContainerRequest
+    ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6);
+    stateStore.storeContainerLaunched(containerId1);
+
+    List<RecoveredContainerState> recoveredContainers =
+        loadContainersState(stateStore.getContainerStateIterator());
+    // check whether the new container record is discarded
+    assertEquals(1, recoveredContainers.size());
   }
 
   private void validateRetryAttempts(ContainerId containerId)
@@ -524,11 +717,6 @@ public class TestNMLeveldbStateStoreService {
     return createContainerRequestInternal(containerId, res);
   }
 
-  private StartContainerRequest createContainerRequest(
-      ContainerId containerId) {
-    return createContainerRequestInternal(containerId, null);
-  }
-
   private StartContainerRequest createContainerRequestInternal(ContainerId
           containerId, Resource res) {
     LocalResource lrsrc = LocalResource.newInstance(
@@ -545,9 +733,9 @@ public class TestNMLeveldbStateStoreService {
     containerCmds.add("somearg");
     Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
     serviceData.put("someservice",
-        ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 }));
+        ByteBuffer.wrap(new byte[] {0x1, 0x2, 0x3}));
     ByteBuffer containerTokens =
-        ByteBuffer.wrap(new byte[] { 0x7, 0x8, 0x9, 0xa });
+        ByteBuffer.wrap(new byte[] {0x7, 0x8, 0x9, 0xa});
     Map<ApplicationAccessType, String> acls =
         new HashMap<ApplicationAccessType, String>();
     acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
@@ -676,7 +864,8 @@ public class TestNMLeveldbStateStoreService {
   }
 
   @Test
-  public void testStartResourceLocalization() throws IOException {
+  public void testStartResourceLocalizationForApplicationResource()
+      throws IOException {
     String user = "somebody";
     ApplicationId appId = ApplicationId.newInstance(1, 1);
 
@@ -730,10 +919,14 @@ public class TestNMLeveldbStateStoreService {
     assertEquals(1, startedResources.size());
     assertEquals(appRsrcLocalPath,
         startedResources.get(appRsrcProto));
+  }
 
-    // start some public and private resources
+  @Test
+  public void testStartResourceLocalizationForPublicResources()
+      throws IOException {
     Path pubRsrcPath1 = new Path("hdfs://some/public/resource1");
-    rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
+    LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
+        .newInstance(
             URL.fromPath(pubRsrcPath1),
             LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
             789L, 135L);
@@ -750,23 +943,14 @@ public class TestNMLeveldbStateStoreService {
     Path pubRsrcLocalPath2 = new Path("/some/local/dir/for/pubrsrc2");
     stateStore.startResourceLocalization(null, null, pubRsrcProto2,
         pubRsrcLocalPath2);
-    Path privRsrcPath = new Path("hdfs://some/private/resource");
-    rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
-            URL.fromPath(privRsrcPath),
-            LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE,
-            789L, 680L, "*pattern*");
-    LocalResourceProto privRsrcProto = rsrcPb.getProto();
-    Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc");
-    stateStore.startResourceLocalization(user, null, privRsrcProto,
-        privRsrcLocalPath);
 
     // restart and verify resources are marked in-progress
     restartStateStore();
-    state = stateStore.loadLocalizationState();
-    pubts = state.getPublicTrackerState();
-    completedResources = loadCompletedResources(
+    RecoveredLocalizationState state = stateStore.loadLocalizationState();
+    LocalResourceTrackerState pubts = state.getPublicTrackerState();
+    List<LocalizedResourceProto> completedResources = loadCompletedResources(
         pubts.getCompletedResourcesIterator());
-    startedResources = loadStartedResources(
+    Map<LocalResourceProto, Path> startedResources = loadStartedResources(
         pubts.getStartedResourcesIterator());
     assertTrue(completedResources.isEmpty());
     assertEquals(2, startedResources.size());
@@ -774,34 +958,49 @@ public class TestNMLeveldbStateStoreService {
         startedResources.get(pubRsrcProto1));
     assertEquals(pubRsrcLocalPath2,
         startedResources.get(pubRsrcProto2));
-    userResources = loadUserResources(state.getIterator());
+    Map<String, RecoveredUserResources> userResources =
+        loadUserResources(state.getIterator());
+    assertEquals(0, userResources.size());
+  }
+
+  @Test
+  public void testStartResourceLocalizationForPrivateResource()
+      throws IOException {
+    Path privRsrcPath = new Path("hdfs://some/private/resource");
+    LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
+        .newInstance(
+            URL.fromPath(privRsrcPath),
+            LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE,
+            789L, 680L, "*pattern*");
+    LocalResourceProto privRsrcProto = rsrcPb.getProto();
+    Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc");
+    String user = "somebody";
+    stateStore.startResourceLocalization(user, null, privRsrcProto,
+        privRsrcLocalPath);
+
+    // restart and verify resources are marked in-progress
+    restartStateStore();
+    RecoveredLocalizationState state = stateStore.loadLocalizationState();
+    Map<String, RecoveredUserResources> userResources =
+        loadUserResources(state.getIterator());
     assertEquals(1, userResources.size());
-    rur = userResources.get(user);
-    privts = rur.getPrivateTrackerState();
+    RecoveredUserResources rur = userResources.get(user);
+    LocalResourceTrackerState privts = rur.getPrivateTrackerState();
     assertNotNull(privts);
-    completedResources = loadCompletedResources(
+    List<LocalizedResourceProto> completedResources = loadCompletedResources(
         privts.getCompletedResourcesIterator());
-    startedResources = loadStartedResources(
+    Map<LocalResourceProto, Path> startedResources = loadStartedResources(
         privts.getStartedResourcesIterator());
     assertTrue(completedResources.isEmpty());
     assertEquals(1, startedResources.size());
     assertEquals(privRsrcLocalPath,
         startedResources.get(privRsrcProto));
-    assertEquals(1, rur.getAppTrackerStates().size());
-    appts = rur.getAppTrackerStates().get(appId);
-    assertNotNull(appts);
-    completedResources = loadCompletedResources(
-        appts.getCompletedResourcesIterator());
-    startedResources = loadStartedResources(
-        appts.getStartedResourcesIterator());
-    assertTrue(completedResources.isEmpty());
-    assertEquals(1, startedResources.size());
-    assertEquals(appRsrcLocalPath,
-        startedResources.get(appRsrcProto));
+    assertEquals(0, rur.getAppTrackerStates().size());
   }
 
   @Test
-  public void testFinishResourceLocalization() throws IOException {
+  public void testFinishResourceLocalizationForApplicationResource()
+      throws IOException {
     String user = "somebody";
     ApplicationId appId = ApplicationId.newInstance(1, 1);
 
@@ -862,10 +1061,14 @@ public class TestNMLeveldbStateStoreService {
     assertEquals(1, completedResources.size());
     assertEquals(appLocalizedProto,
         completedResources.iterator().next());
+  }
 
-    // start some public and private resources
+  @Test
+  public void testFinishResourceLocalizationForPublicResources()
+      throws IOException {
     Path pubRsrcPath1 = new Path("hdfs://some/public/resource1");
-    rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
+    LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
+        .newInstance(
             URL.fromPath(pubRsrcPath1),
             LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
             789L, 135L);
@@ -882,15 +1085,6 @@ public class TestNMLeveldbStateStoreService {
     Path pubRsrcLocalPath2 = new Path("/some/local/dir/for/pubrsrc2");
     stateStore.startResourceLocalization(null, null, pubRsrcProto2,
         pubRsrcLocalPath2);
-    Path privRsrcPath = new Path("hdfs://some/private/resource");
-    rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
-            URL.fromPath(privRsrcPath),
-            LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE,
-            789L, 680L, "*pattern*");
-    LocalResourceProto privRsrcProto = rsrcPb.getProto();
-    Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc");
-    stateStore.startResourceLocalization(user, null, privRsrcProto,
-        privRsrcLocalPath);
 
     // finish some of the resources
     LocalizedResourceProto pubLocalizedProto1 =
@@ -900,6 +1094,43 @@ public class TestNMLeveldbStateStoreService {
           .setSize(pubRsrcProto1.getSize())
           .build();
     stateStore.finishResourceLocalization(null, null, pubLocalizedProto1);
+
+    // restart and verify state
+    restartStateStore();
+    RecoveredLocalizationState state = stateStore.loadLocalizationState();
+    LocalResourceTrackerState pubts = state.getPublicTrackerState();
+    List<LocalizedResourceProto> completedResources = loadCompletedResources(
+        pubts.getCompletedResourcesIterator());
+    Map<LocalResourceProto, Path> startedResources = loadStartedResources(
+        pubts.getStartedResourcesIterator());
+    assertEquals(1, completedResources.size());
+    assertEquals(pubLocalizedProto1,
+        completedResources.iterator().next());
+    assertEquals(1, startedResources.size());
+    assertEquals(pubRsrcLocalPath2,
+        startedResources.get(pubRsrcProto2));
+    Map<String, RecoveredUserResources> userResources =
+        loadUserResources(state.getIterator());
+    assertEquals(0, userResources.size());
+  }
+
+  @Test
+  public void testFinishResourceLocalizationForPrivateResource()
+      throws IOException {
+    String user = "somebody";
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+
+    Path privRsrcPath = new Path("hdfs://some/private/resource");
+    LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
+        .newInstance(
+            URL.fromPath(privRsrcPath),
+            LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE,
+            789L, 680L, "*pattern*");
+    LocalResourceProto privRsrcProto = rsrcPb.getProto();
+    Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc");
+    stateStore.startResourceLocalization(user, null, privRsrcProto,
+        privRsrcLocalPath);
+
     LocalizedResourceProto privLocalizedProto =
         LocalizedResourceProto.newBuilder()
           .setResource(privRsrcProto)
@@ -910,22 +1141,19 @@ public class TestNMLeveldbStateStoreService {
 
     // restart and verify state
     restartStateStore();
-    state = stateStore.loadLocalizationState();
-    pubts = state.getPublicTrackerState();
-    completedResources = loadCompletedResources(
+    RecoveredLocalizationState state = stateStore.loadLocalizationState();
+    LocalResourceTrackerState pubts = state.getPublicTrackerState();
+    List<LocalizedResourceProto> completedResources = loadCompletedResources(
         pubts.getCompletedResourcesIterator());
-    startedResources = loadStartedResources(
+    Map<LocalResourceProto, Path> startedResources = loadStartedResources(
         pubts.getStartedResourcesIterator());
-    assertEquals(1, completedResources.size());
-    assertEquals(pubLocalizedProto1,
-        completedResources.iterator().next());
-    assertEquals(1, startedResources.size());
-    assertEquals(pubRsrcLocalPath2,
-        startedResources.get(pubRsrcProto2));
-    userResources = loadUserResources(state.getIterator());
+    assertEquals(0, completedResources.size());
+    assertEquals(0, startedResources.size());
+    Map<String, RecoveredUserResources> userResources =
+        loadUserResources(state.getIterator());
     assertEquals(1, userResources.size());
-    rur = userResources.get(user);
-    privts = rur.getPrivateTrackerState();
+    RecoveredUserResources rur = userResources.get(user);
+    LocalResourceTrackerState privts = rur.getPrivateTrackerState();
     assertNotNull(privts);
     completedResources = loadCompletedResources(
         privts.getCompletedResourcesIterator());
@@ -935,21 +1163,16 @@ public class TestNMLeveldbStateStoreService {
     assertEquals(privLocalizedProto,
         completedResources.iterator().next());
     assertTrue(startedResources.isEmpty());
-    assertEquals(1, rur.getAppTrackerStates().size());
-    appts = rur.getAppTrackerStates().get(appId);
-    assertNotNull(appts);
-    completedResources = loadCompletedResources(
-        appts.getCompletedResourcesIterator());
-    startedResources = loadStartedResources(
-        appts.getStartedResourcesIterator());
+    assertEquals(0, rur.getAppTrackerStates().size());
+    LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId);
+    assertNull(appts);
     assertTrue(startedResources.isEmpty());
     assertEquals(1, completedResources.size());
-    assertEquals(appLocalizedProto,
-        completedResources.iterator().next());
   }
 
   @Test
-  public void testRemoveLocalizedResource() throws IOException {
+  public void testRemoveLocalizedResourceForApplicationResource()
+      throws IOException {
     String user = "somebody";
     ApplicationId appId = ApplicationId.newInstance(1, 1);
 
@@ -983,10 +1206,15 @@ public class TestNMLeveldbStateStoreService {
 
     restartStateStore();
     verifyEmptyState();
+  }
 
-    // add public and private resources and remove some
+  @Test
+  public void testRemoveLocalizedResourceForPublicResources()
+      throws IOException {
+    // add public resources and remove some
     Path pubRsrcPath1 = new Path("hdfs://some/public/resource1");
-    rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
+    LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
+        .newInstance(
             URL.fromPath(pubRsrcPath1),
             LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
             789L, 135L);
@@ -1018,8 +1246,32 @@ public class TestNMLeveldbStateStoreService {
           .build();
     stateStore.finishResourceLocalization(null, null, pubLocalizedProto2);
     stateStore.removeLocalizedResource(null, null, pubRsrcLocalPath2);
+
+    // restart and verify state
+    restartStateStore();
+    RecoveredLocalizationState state = stateStore.loadLocalizationState();
+    LocalResourceTrackerState pubts = state.getPublicTrackerState();
+    List<LocalizedResourceProto> completedResources =
+        loadCompletedResources(pubts.getCompletedResourcesIterator());
+    Map<LocalResourceProto, Path> startedResources =
+        loadStartedResources(pubts.getStartedResourcesIterator());
+    assertTrue(startedResources.isEmpty());
+    assertEquals(1, completedResources.size());
+    assertEquals(pubLocalizedProto1,
+        completedResources.iterator().next());
+    Map<String, RecoveredUserResources> userResources =
+        loadUserResources(state.getIterator());
+    assertTrue(userResources.isEmpty());
+  }
+
+  @Test
+  public void testRemoveLocalizedResourceForPrivateResource()
+      throws IOException {
+    String user = "somebody";
+
     Path privRsrcPath = new Path("hdfs://some/private/resource");
-    rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
+    LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
+        .newInstance(
             URL.fromPath(privRsrcPath),
             LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE,
             789L, 680L, "*pattern*");
@@ -1038,9 +1290,7 @@ public class TestNMLeveldbStateStoreService {
     Map<LocalResourceProto, Path> startedResources =
         loadStartedResources(pubts.getStartedResourcesIterator());
     assertTrue(startedResources.isEmpty());
-    assertEquals(1, completedResources.size());
-    assertEquals(pubLocalizedProto1,
-        completedResources.iterator().next());
+    assertEquals(0, completedResources.size());
     Map<String, RecoveredUserResources> userResources =
         loadUserResources(state.getIterator());
     assertTrue(userResources.isEmpty());
@@ -1574,9 +1824,9 @@ public class TestNMLeveldbStateStoreService {
     containerCmds.add("somearg");
     Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
     serviceData.put("someservice",
-        ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 }));
+        ByteBuffer.wrap(new byte[] {0x1, 0x2, 0x3}));
     ByteBuffer containerTokens = ByteBuffer
-        .wrap(new byte[] { 0x7, 0x8, 0x9, 0xa });
+        .wrap(new byte[] {0x7, 0x8, 0x9, 0xa});
     Map<ApplicationAccessType, String> acls =
         new HashMap<ApplicationAccessType, String>();
     acls.put(ApplicationAccessType.VIEW_APP, "viewuser");


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org