You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2019/11/26 20:45:21 UTC
[hadoop] branch trunk updated: YARN-9362. Code cleanup in
TestNMLeveldbStateStoreService. Contributed by Denes Gerencser
This is an automated email from the ASF dual-hosted git repository.
snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 828ab40 YARN-9362. Code cleanup in TestNMLeveldbStateStoreService. Contributed by Denes Gerencser
828ab40 is described below
commit 828ab400eea64ebb628a36cc3d0d53de0bf38934
Author: Szilard Nemeth <sn...@apache.org>
AuthorDate: Tue Nov 26 21:44:22 2019 +0100
YARN-9362. Code cleanup in TestNMLeveldbStateStoreService. Contributed by Denes Gerencser
---
.../recovery/TestNMLeveldbStateStoreService.java | 562 +++++++++++++++------
1 file changed, 406 insertions(+), 156 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index 8c27474..06ad727 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -329,91 +329,105 @@ public class TestNMLeveldbStateStoreService {
assertEquals(appProto1, apps.get(0));
}
- @Test
- public void testContainerStorage() throws IOException {
- // test empty when no state
- List<RecoveredContainerState> recoveredContainers =
- loadContainersState(stateStore.getContainerStateIterator());
- assertTrue(recoveredContainers.isEmpty());
-
- // create a container request
- ApplicationId appId = ApplicationId.newInstance(1234, 3);
- ApplicationAttemptId appAttemptId =
- ApplicationAttemptId.newInstance(appId, 4);
- ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
- Resource containerResource = Resource.newInstance(1024, 2);
- StartContainerRequest containerReq =
- createContainerRequest(containerId, containerResource);
-
- // store a container and verify recovered
- long containerStartTime = System.currentTimeMillis();
- stateStore.storeContainer(containerId, 0, containerStartTime, containerReq);
-
- // verify the container version key is not stored for new containers
- DB db = stateStore.getDB();
- assertNull("version key present for new container", db.get(bytes(
- stateStore.getContainerVersionKey(containerId.toString()))));
+ @Test
+ public void testContainerStorageWhenContainerIsRequested()
+ throws IOException {
+ final ContainerStateConstructParams containerParams =
+ storeContainerInStateStore();
restartStateStore();
- recoveredContainers =
+
+ List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
- RecoveredContainerState rcs = recoveredContainers.get(0);
+ final RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(0, rcs.getVersion());
- assertEquals(containerStartTime, rcs.getStartTime());
+ assertEquals(containerParams.getContainerStartTime().longValue(),
+ rcs.getStartTime());
assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
- assertEquals(containerReq, rcs.getStartRequest());
+ assertEquals(containerParams.getContainerRequest(), rcs.getStartRequest());
assertTrue(rcs.getDiagnostics().isEmpty());
- assertEquals(containerResource, rcs.getCapability());
+ assertEquals(containerParams.getContainerResource(), rcs.getCapability());
+ }
+
- // store a new container record without StartContainerRequest
- ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6);
- stateStore.storeContainerLaunched(containerId1);
- recoveredContainers =
- loadContainersState(stateStore.getContainerStateIterator());
- // check whether the new container record is discarded
- assertEquals(1, recoveredContainers.size());
- // queue the container, and verify recovered
+ @Test
+ public void testContainerStorageWhenContainerIsQueued()
+ throws IOException {
+ ContainerStateConstructParams containerParams =
+ storeContainerInStateStore();
+ ContainerId containerId = containerParams.getContainerId();
+ StartContainerRequest containerReq = containerParams.getContainerRequest();
+ Resource containerResource = containerParams.getContainerResource();
+ ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();
+
+ storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);
+
stateStore.storeContainerQueued(containerId);
restartStateStore();
- recoveredContainers =
+
+ List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
- rcs = recoveredContainers.get(0);
+ RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.QUEUED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
assertEquals(containerReq, rcs.getStartRequest());
assertTrue(rcs.getDiagnostics().isEmpty());
assertEquals(containerResource, rcs.getCapability());
+ }
- // launch the container, add some diagnostics, and verify recovered
- StringBuilder diags = new StringBuilder();
- stateStore.storeContainerLaunched(containerId);
- diags.append("some diags for container");
- stateStore.storeContainerDiagnostics(containerId, diags);
+ @Test
+ public void testContainerStorageWhenContainerIsLaunched()
+ throws IOException {
+ ContainerStateConstructParams containerParams =
+ storeContainerInStateStore();
+ ContainerId containerId = containerParams.getContainerId();
+ StartContainerRequest containerReq = containerParams.getContainerRequest();
+ Resource containerResource = containerParams.getContainerResource();
+ ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();
+
+ storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);
+ stateStore.storeContainerQueued(containerId);
+
+ StringBuilder diags = launchContainerWithDiagnostics(containerId);
restartStateStore();
- recoveredContainers =
+
+ List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
- rcs = recoveredContainers.get(0);
+ RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
assertEquals(containerReq, rcs.getStartRequest());
assertEquals(diags.toString(), rcs.getDiagnostics());
assertEquals(containerResource, rcs.getCapability());
+ }
+
+ @Test
+ public void testContainerStorageWhenContainerIsPaused()
+ throws IOException {
+ ContainerStateConstructParams containerParams =
+ storeContainerInStateStore();
+ ContainerId containerId = containerParams.getContainerId();
+ StartContainerRequest containerReq = containerParams.getContainerRequest();
+ ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();
+
+ storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);
+ stateStore.storeContainerQueued(containerId);
- // pause the container, and verify recovered
stateStore.storeContainerPaused(containerId);
restartStateStore();
- recoveredContainers =
+
+ List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
- rcs = recoveredContainers.get(0);
+ RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.PAUSED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
@@ -425,82 +439,261 @@ public class TestNMLeveldbStateStoreService {
recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
+ }
- // increase the container size, and verify recovered
- ContainerTokenIdentifier updateTokenIdentifier =
- new ContainerTokenIdentifier(containerId, "host", "user",
- Resource.newInstance(2468, 4), 9876543210L, 42, 2468,
- Priority.newInstance(7), 13579);
+ @Test
+ public void testContainerStorageWhenContainerSizeIncreased()
+ throws IOException {
+ ContainerStateConstructParams containerParams =
+ storeContainerInStateStore();
+ ContainerId containerId = containerParams.getContainerId();
+ ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();
- stateStore
- .storeContainerUpdateToken(containerId, updateTokenIdentifier);
+ storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);
+ stateStore.storeContainerQueued(containerId);
+ launchContainerWithDiagnostics(containerId);
+
+ increaseContainerSize(containerId);
restartStateStore();
- recoveredContainers =
+
+ List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
- rcs = recoveredContainers.get(0);
+ RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(0, rcs.getVersion());
assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
assertEquals(Resource.newInstance(2468, 4), rcs.getCapability());
+ }
- // mark the container killed, add some more diags, and verify recovered
- diags.append("some more diags for container");
- stateStore.storeContainerDiagnostics(containerId, diags);
- stateStore.storeContainerKilled(containerId);
+ @Test
+ public void testContainerStorageWhenContainerMarkedAsKilled()
+ throws IOException {
+ ContainerStateConstructParams containerParams =
+ storeContainerInStateStore();
+ ContainerId containerId = containerParams.getContainerId();
+ ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();
+
+ storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);
+ stateStore.storeContainerQueued(containerId);
+ StringBuilder diags = launchContainerWithDiagnostics(containerId);
+ ContainerTokenIdentifier updateTokenIdentifier =
+ increaseContainerSize(containerId);
+
+ markContainerAsKilled(containerId, diags);
restartStateStore();
- recoveredContainers =
+
+ List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
- rcs = recoveredContainers.get(0);
+ RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertTrue(rcs.getKilled());
ContainerTokenIdentifier tokenReadFromRequest = BuilderUtils
- .newContainerTokenIdentifier(rcs.getStartRequest().getContainerToken());
+ .newContainerTokenIdentifier(rcs.getStartRequest()
+ .getContainerToken());
assertEquals(updateTokenIdentifier, tokenReadFromRequest);
assertEquals(diags.toString(), rcs.getDiagnostics());
+ }
- // add yet more diags, mark container completed, and verify recovered
+ @Test
+ public void testContainerStorageWhenContainerCompleted()
+ throws IOException {
+ ContainerStateConstructParams containerParams =
+ storeContainerInStateStore();
+ ContainerId containerId = containerParams.getContainerId();
+ ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId();
+
+ storeNewContainerRecordWithoutStartContainerRequest(appAttemptId);
+ stateStore.storeContainerQueued(containerId);
+ StringBuilder diags = launchContainerWithDiagnostics(containerId);
+ markContainerAsKilled(containerId, diags);
+
+ // add yet more diags, mark container completed
diags.append("some final diags");
stateStore.storeContainerDiagnostics(containerId, diags);
stateStore.storeContainerCompleted(containerId, 21);
restartStateStore();
- recoveredContainers =
+
+ List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
- rcs = recoveredContainers.get(0);
+ RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus());
assertEquals(21, rcs.getExitCode());
assertTrue(rcs.getKilled());
assertEquals(diags.toString(), rcs.getDiagnostics());
+ }
+
+ @Test
+ public void testContainerStorage() throws IOException {
+ ContainerStateConstructParams containerParams =
+ storeContainerInStateStore();
+ ContainerId containerId = containerParams.getContainerId();
- // store remainingRetryAttempts, workDir and logDir
+ // remaining retry attempts, work dir and log dir are stored
stateStore.storeContainerRemainingRetryAttempts(containerId, 6);
stateStore.storeContainerWorkDir(containerId, "/test/workdir");
stateStore.storeContainerLogDir(containerId, "/test/logdir");
restartStateStore();
- recoveredContainers =
+
+ List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
- rcs = recoveredContainers.get(0);
+ RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(6, rcs.getRemainingRetryAttempts());
assertEquals("/test/workdir", rcs.getWorkDir());
assertEquals("/test/logdir", rcs.getLogDir());
-
validateRetryAttempts(containerId);
+ }
+
+ @Test
+ public void testContainerStorageWhenContainerRemoved()
+ throws IOException {
+ ContainerStateConstructParams containerParams =
+ storeContainerInStateStore();
+ ContainerId containerId = containerParams.getContainerId();
+
// remove the container and verify not recovered
stateStore.removeContainer(containerId);
restartStateStore();
- recoveredContainers =
+ List<RecoveredContainerState> recoveredContainers =
loadContainersState(stateStore.getContainerStateIterator());
assertTrue(recoveredContainers.isEmpty());
// recover again to check remove clears all containers
restartStateStore();
NMStateStoreService nmStoreSpy = spy(stateStore);
loadContainersState(nmStoreSpy.getContainerStateIterator());
- verify(nmStoreSpy,times(0)).removeContainer(any(ContainerId.class));
+ verify(nmStoreSpy, times(0)).removeContainer(any(ContainerId.class));
+ }
+
+ private ContainerStateConstructParams storeContainerInStateStore()
+ throws IOException {
+ // test empty when no state
+ List<RecoveredContainerState> recoveredContainers =
+ loadContainersState(stateStore.getContainerStateIterator());
+ assertTrue(recoveredContainers.isEmpty());
+
+ // create a container request
+ ApplicationId appId = ApplicationId.newInstance(1234, 3);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 4);
+ ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
+ Resource containerResource = Resource.newInstance(1024, 2);
+ StartContainerRequest containerReq =
+ createContainerRequest(containerId, containerResource);
+
+ long anyContainerStartTime = 1573155078494L;
+ stateStore.storeContainer(containerId, 0, anyContainerStartTime,
+ containerReq);
+
+ // verify the container version key is not stored for new containers
+ DB db = stateStore.getDB();
+ assertNull("version key present for new container", db.get(bytes(
+ stateStore.getContainerVersionKey(containerId.toString()))));
+
+ return new ContainerStateConstructParams()
+ .setContainerRequest(containerReq)
+ .setContainerResource(containerResource)
+ .setContainerStartTime(anyContainerStartTime)
+ .setAppAttemptId(appAttemptId)
+ .setContainerId(containerId);
+ }
+
+ private static class ContainerStateConstructParams {
+ private StartContainerRequest containerRequest;
+ private Resource containerResource;
+ private Long containerStartTime;
+ private ApplicationAttemptId appAttemptId;
+ private ContainerId containerId;
+
+ public ApplicationAttemptId getAppAttemptId() {
+ return appAttemptId;
+ }
+ public ContainerStateConstructParams setAppAttemptId(ApplicationAttemptId
+ theAppAttemptId) {
+ this.appAttemptId = theAppAttemptId;
+ return this;
+ }
+ public ContainerId getContainerId() {
+ return containerId;
+ }
+ public ContainerStateConstructParams setContainerId(ContainerId
+ theContainerId) {
+ this.containerId = theContainerId;
+ return this;
+ }
+
+ public StartContainerRequest getContainerRequest() {
+ return containerRequest;
+ }
+ public ContainerStateConstructParams setContainerRequest(
+ StartContainerRequest theContainerRequest) {
+ this.containerRequest = theContainerRequest;
+ return this;
+ }
+
+ public Resource getContainerResource() {
+ return containerResource;
+ }
+
+ public ContainerStateConstructParams setContainerResource(
+ Resource theContainerResource) {
+ this.containerResource = theContainerResource;
+ return this;
+ }
+
+ public Long getContainerStartTime() {
+ return containerStartTime;
+ }
+
+ public ContainerStateConstructParams setContainerStartTime(
+ Long theContainerStartTime) {
+ this.containerStartTime = theContainerStartTime;
+ return this;
+ }
+ }
+
+ private void markContainerAsKilled(ContainerId containerId,
+ StringBuilder diags) throws IOException {
+ // mark the container killed, add some more diags
+ diags.append("some more diags for container");
+ stateStore.storeContainerDiagnostics(containerId, diags);
+ stateStore.storeContainerKilled(containerId);
+ }
+
+ private ContainerTokenIdentifier increaseContainerSize(
+ ContainerId containerId) throws IOException {
+ ContainerTokenIdentifier updateTokenIdentifier =
+ new ContainerTokenIdentifier(containerId, "host", "user",
+ Resource.newInstance(2468, 4), 9876543210L, 42, 2468,
+ Priority.newInstance(7), 13579);
+ stateStore
+ .storeContainerUpdateToken(containerId, updateTokenIdentifier);
+ return updateTokenIdentifier;
+ }
+
+ private StringBuilder launchContainerWithDiagnostics(ContainerId containerId)
+ throws IOException {
+ StringBuilder diags = new StringBuilder();
+ stateStore.storeContainerLaunched(containerId);
+ diags.append("some diags for container");
+ stateStore.storeContainerDiagnostics(containerId, diags);
+ return diags;
+ }
+
+ private void storeNewContainerRecordWithoutStartContainerRequest(
+ ApplicationAttemptId appAttemptId) throws IOException {
+ // store a new container record without StartContainerRequest
+ ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6);
+ stateStore.storeContainerLaunched(containerId1);
+
+ List<RecoveredContainerState> recoveredContainers =
+ loadContainersState(stateStore.getContainerStateIterator());
+ // check whether the new container record is discarded
+ assertEquals(1, recoveredContainers.size());
}
private void validateRetryAttempts(ContainerId containerId)
@@ -524,11 +717,6 @@ public class TestNMLeveldbStateStoreService {
return createContainerRequestInternal(containerId, res);
}
- private StartContainerRequest createContainerRequest(
- ContainerId containerId) {
- return createContainerRequestInternal(containerId, null);
- }
-
private StartContainerRequest createContainerRequestInternal(ContainerId
containerId, Resource res) {
LocalResource lrsrc = LocalResource.newInstance(
@@ -545,9 +733,9 @@ public class TestNMLeveldbStateStoreService {
containerCmds.add("somearg");
Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
serviceData.put("someservice",
- ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 }));
+ ByteBuffer.wrap(new byte[] {0x1, 0x2, 0x3}));
ByteBuffer containerTokens =
- ByteBuffer.wrap(new byte[] { 0x7, 0x8, 0x9, 0xa });
+ ByteBuffer.wrap(new byte[] {0x7, 0x8, 0x9, 0xa});
Map<ApplicationAccessType, String> acls =
new HashMap<ApplicationAccessType, String>();
acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
@@ -676,7 +864,8 @@ public class TestNMLeveldbStateStoreService {
}
@Test
- public void testStartResourceLocalization() throws IOException {
+ public void testStartResourceLocalizationForApplicationResource()
+ throws IOException {
String user = "somebody";
ApplicationId appId = ApplicationId.newInstance(1, 1);
@@ -730,10 +919,14 @@ public class TestNMLeveldbStateStoreService {
assertEquals(1, startedResources.size());
assertEquals(appRsrcLocalPath,
startedResources.get(appRsrcProto));
+ }
- // start some public and private resources
+ @Test
+ public void testStartResourceLocalizationForPublicResources()
+ throws IOException {
Path pubRsrcPath1 = new Path("hdfs://some/public/resource1");
- rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
+ LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
+ .newInstance(
URL.fromPath(pubRsrcPath1),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
789L, 135L);
@@ -750,23 +943,14 @@ public class TestNMLeveldbStateStoreService {
Path pubRsrcLocalPath2 = new Path("/some/local/dir/for/pubrsrc2");
stateStore.startResourceLocalization(null, null, pubRsrcProto2,
pubRsrcLocalPath2);
- Path privRsrcPath = new Path("hdfs://some/private/resource");
- rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
- URL.fromPath(privRsrcPath),
- LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE,
- 789L, 680L, "*pattern*");
- LocalResourceProto privRsrcProto = rsrcPb.getProto();
- Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc");
- stateStore.startResourceLocalization(user, null, privRsrcProto,
- privRsrcLocalPath);
// restart and verify resources are marked in-progress
restartStateStore();
- state = stateStore.loadLocalizationState();
- pubts = state.getPublicTrackerState();
- completedResources = loadCompletedResources(
+ RecoveredLocalizationState state = stateStore.loadLocalizationState();
+ LocalResourceTrackerState pubts = state.getPublicTrackerState();
+ List<LocalizedResourceProto> completedResources = loadCompletedResources(
pubts.getCompletedResourcesIterator());
- startedResources = loadStartedResources(
+ Map<LocalResourceProto, Path> startedResources = loadStartedResources(
pubts.getStartedResourcesIterator());
assertTrue(completedResources.isEmpty());
assertEquals(2, startedResources.size());
@@ -774,34 +958,49 @@ public class TestNMLeveldbStateStoreService {
startedResources.get(pubRsrcProto1));
assertEquals(pubRsrcLocalPath2,
startedResources.get(pubRsrcProto2));
- userResources = loadUserResources(state.getIterator());
+ Map<String, RecoveredUserResources> userResources =
+ loadUserResources(state.getIterator());
+ assertEquals(0, userResources.size());
+ }
+
+ @Test
+ public void testStartResourceLocalizationForPrivateResource()
+ throws IOException {
+ Path privRsrcPath = new Path("hdfs://some/private/resource");
+ LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
+ .newInstance(
+ URL.fromPath(privRsrcPath),
+ LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE,
+ 789L, 680L, "*pattern*");
+ LocalResourceProto privRsrcProto = rsrcPb.getProto();
+ Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc");
+ String user = "somebody";
+ stateStore.startResourceLocalization(user, null, privRsrcProto,
+ privRsrcLocalPath);
+
+ // restart and verify resources are marked in-progress
+ restartStateStore();
+ RecoveredLocalizationState state = stateStore.loadLocalizationState();
+ Map<String, RecoveredUserResources> userResources =
+ loadUserResources(state.getIterator());
assertEquals(1, userResources.size());
- rur = userResources.get(user);
- privts = rur.getPrivateTrackerState();
+ RecoveredUserResources rur = userResources.get(user);
+ LocalResourceTrackerState privts = rur.getPrivateTrackerState();
assertNotNull(privts);
- completedResources = loadCompletedResources(
+ List<LocalizedResourceProto> completedResources = loadCompletedResources(
privts.getCompletedResourcesIterator());
- startedResources = loadStartedResources(
+ Map<LocalResourceProto, Path> startedResources = loadStartedResources(
privts.getStartedResourcesIterator());
assertTrue(completedResources.isEmpty());
assertEquals(1, startedResources.size());
assertEquals(privRsrcLocalPath,
startedResources.get(privRsrcProto));
- assertEquals(1, rur.getAppTrackerStates().size());
- appts = rur.getAppTrackerStates().get(appId);
- assertNotNull(appts);
- completedResources = loadCompletedResources(
- appts.getCompletedResourcesIterator());
- startedResources = loadStartedResources(
- appts.getStartedResourcesIterator());
- assertTrue(completedResources.isEmpty());
- assertEquals(1, startedResources.size());
- assertEquals(appRsrcLocalPath,
- startedResources.get(appRsrcProto));
+ assertEquals(0, rur.getAppTrackerStates().size());
}
@Test
- public void testFinishResourceLocalization() throws IOException {
+ public void testFinishResourceLocalizationForApplicationResource()
+ throws IOException {
String user = "somebody";
ApplicationId appId = ApplicationId.newInstance(1, 1);
@@ -862,10 +1061,14 @@ public class TestNMLeveldbStateStoreService {
assertEquals(1, completedResources.size());
assertEquals(appLocalizedProto,
completedResources.iterator().next());
+ }
- // start some public and private resources
+ @Test
+ public void testFinishResourceLocalizationForPublicResources()
+ throws IOException {
Path pubRsrcPath1 = new Path("hdfs://some/public/resource1");
- rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
+ LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
+ .newInstance(
URL.fromPath(pubRsrcPath1),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
789L, 135L);
@@ -882,15 +1085,6 @@ public class TestNMLeveldbStateStoreService {
Path pubRsrcLocalPath2 = new Path("/some/local/dir/for/pubrsrc2");
stateStore.startResourceLocalization(null, null, pubRsrcProto2,
pubRsrcLocalPath2);
- Path privRsrcPath = new Path("hdfs://some/private/resource");
- rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
- URL.fromPath(privRsrcPath),
- LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE,
- 789L, 680L, "*pattern*");
- LocalResourceProto privRsrcProto = rsrcPb.getProto();
- Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc");
- stateStore.startResourceLocalization(user, null, privRsrcProto,
- privRsrcLocalPath);
// finish some of the resources
LocalizedResourceProto pubLocalizedProto1 =
@@ -900,6 +1094,43 @@ public class TestNMLeveldbStateStoreService {
.setSize(pubRsrcProto1.getSize())
.build();
stateStore.finishResourceLocalization(null, null, pubLocalizedProto1);
+
+ // restart and verify state
+ restartStateStore();
+ RecoveredLocalizationState state = stateStore.loadLocalizationState();
+ LocalResourceTrackerState pubts = state.getPublicTrackerState();
+ List<LocalizedResourceProto> completedResources = loadCompletedResources(
+ pubts.getCompletedResourcesIterator());
+ Map<LocalResourceProto, Path> startedResources = loadStartedResources(
+ pubts.getStartedResourcesIterator());
+ assertEquals(1, completedResources.size());
+ assertEquals(pubLocalizedProto1,
+ completedResources.iterator().next());
+ assertEquals(1, startedResources.size());
+ assertEquals(pubRsrcLocalPath2,
+ startedResources.get(pubRsrcProto2));
+ Map<String, RecoveredUserResources> userResources =
+ loadUserResources(state.getIterator());
+ assertEquals(0, userResources.size());
+ }
+
+ @Test
+ public void testFinishResourceLocalizationForPrivateResource()
+ throws IOException {
+ String user = "somebody";
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
+
+ Path privRsrcPath = new Path("hdfs://some/private/resource");
+ LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
+ .newInstance(
+ URL.fromPath(privRsrcPath),
+ LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE,
+ 789L, 680L, "*pattern*");
+ LocalResourceProto privRsrcProto = rsrcPb.getProto();
+ Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc");
+ stateStore.startResourceLocalization(user, null, privRsrcProto,
+ privRsrcLocalPath);
+
LocalizedResourceProto privLocalizedProto =
LocalizedResourceProto.newBuilder()
.setResource(privRsrcProto)
@@ -910,22 +1141,19 @@ public class TestNMLeveldbStateStoreService {
// restart and verify state
restartStateStore();
- state = stateStore.loadLocalizationState();
- pubts = state.getPublicTrackerState();
- completedResources = loadCompletedResources(
+ RecoveredLocalizationState state = stateStore.loadLocalizationState();
+ LocalResourceTrackerState pubts = state.getPublicTrackerState();
+ List<LocalizedResourceProto> completedResources = loadCompletedResources(
pubts.getCompletedResourcesIterator());
- startedResources = loadStartedResources(
+ Map<LocalResourceProto, Path> startedResources = loadStartedResources(
pubts.getStartedResourcesIterator());
- assertEquals(1, completedResources.size());
- assertEquals(pubLocalizedProto1,
- completedResources.iterator().next());
- assertEquals(1, startedResources.size());
- assertEquals(pubRsrcLocalPath2,
- startedResources.get(pubRsrcProto2));
- userResources = loadUserResources(state.getIterator());
+ assertEquals(0, completedResources.size());
+ assertEquals(0, startedResources.size());
+ Map<String, RecoveredUserResources> userResources =
+ loadUserResources(state.getIterator());
assertEquals(1, userResources.size());
- rur = userResources.get(user);
- privts = rur.getPrivateTrackerState();
+ RecoveredUserResources rur = userResources.get(user);
+ LocalResourceTrackerState privts = rur.getPrivateTrackerState();
assertNotNull(privts);
completedResources = loadCompletedResources(
privts.getCompletedResourcesIterator());
@@ -935,21 +1163,16 @@ public class TestNMLeveldbStateStoreService {
assertEquals(privLocalizedProto,
completedResources.iterator().next());
assertTrue(startedResources.isEmpty());
- assertEquals(1, rur.getAppTrackerStates().size());
- appts = rur.getAppTrackerStates().get(appId);
- assertNotNull(appts);
- completedResources = loadCompletedResources(
- appts.getCompletedResourcesIterator());
- startedResources = loadStartedResources(
- appts.getStartedResourcesIterator());
+ assertEquals(0, rur.getAppTrackerStates().size());
+ LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId);
+ assertNull(appts);
assertTrue(startedResources.isEmpty());
assertEquals(1, completedResources.size());
- assertEquals(appLocalizedProto,
- completedResources.iterator().next());
}
@Test
- public void testRemoveLocalizedResource() throws IOException {
+ public void testRemoveLocalizedResourceForApplicationResource()
+ throws IOException {
String user = "somebody";
ApplicationId appId = ApplicationId.newInstance(1, 1);
@@ -983,10 +1206,15 @@ public class TestNMLeveldbStateStoreService {
restartStateStore();
verifyEmptyState();
+ }
- // add public and private resources and remove some
+ @Test
+ public void testRemoveLocalizedResourceForPublicResources()
+ throws IOException {
+ // add public resources and remove some
Path pubRsrcPath1 = new Path("hdfs://some/public/resource1");
- rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
+ LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
+ .newInstance(
URL.fromPath(pubRsrcPath1),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC,
789L, 135L);
@@ -1018,8 +1246,32 @@ public class TestNMLeveldbStateStoreService {
.build();
stateStore.finishResourceLocalization(null, null, pubLocalizedProto2);
stateStore.removeLocalizedResource(null, null, pubRsrcLocalPath2);
+
+ // restart and verify state
+ restartStateStore();
+ RecoveredLocalizationState state = stateStore.loadLocalizationState();
+ LocalResourceTrackerState pubts = state.getPublicTrackerState();
+ List<LocalizedResourceProto> completedResources =
+ loadCompletedResources(pubts.getCompletedResourcesIterator());
+ Map<LocalResourceProto, Path> startedResources =
+ loadStartedResources(pubts.getStartedResourcesIterator());
+ assertTrue(startedResources.isEmpty());
+ assertEquals(1, completedResources.size());
+ assertEquals(pubLocalizedProto1,
+ completedResources.iterator().next());
+ Map<String, RecoveredUserResources> userResources =
+ loadUserResources(state.getIterator());
+ assertTrue(userResources.isEmpty());
+ }
+
+ @Test
+ public void testRemoveLocalizedResourceForPrivateResource()
+ throws IOException {
+ String user = "somebody";
+
Path privRsrcPath = new Path("hdfs://some/private/resource");
- rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance(
+ LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource
+ .newInstance(
URL.fromPath(privRsrcPath),
LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE,
789L, 680L, "*pattern*");
@@ -1038,9 +1290,7 @@ public class TestNMLeveldbStateStoreService {
Map<LocalResourceProto, Path> startedResources =
loadStartedResources(pubts.getStartedResourcesIterator());
assertTrue(startedResources.isEmpty());
- assertEquals(1, completedResources.size());
- assertEquals(pubLocalizedProto1,
- completedResources.iterator().next());
+ assertEquals(0, completedResources.size());
Map<String, RecoveredUserResources> userResources =
loadUserResources(state.getIterator());
assertTrue(userResources.isEmpty());
@@ -1574,9 +1824,9 @@ public class TestNMLeveldbStateStoreService {
containerCmds.add("somearg");
Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
serviceData.put("someservice",
- ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 }));
+ ByteBuffer.wrap(new byte[] {0x1, 0x2, 0x3}));
ByteBuffer containerTokens = ByteBuffer
- .wrap(new byte[] { 0x7, 0x8, 0x9, 0xa });
+ .wrap(new byte[] {0x7, 0x8, 0x9, 0xa});
Map<ApplicationAccessType, String> acls =
new HashMap<ApplicationAccessType, String>();
acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org