You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by at...@apache.org on 2011/09/14 00:49:38 UTC
svn commit: r1170378 [8/12] - in
/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ conf/
dev-support/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-clie...
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java Tue Sep 13 22:49:27 2011
@@ -17,208 +17,203 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
-import java.net.URISyntaxException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.net.URISyntaxException;
import java.nio.ByteBuffer;
-
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
-import java.util.Random;
import java.util.Map.Entry;
-import java.util.AbstractMap.SimpleEntry;
+import java.util.Random;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
-
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
+import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
-import static org.junit.Assert.*;
-
import org.mockito.ArgumentMatcher;
-import static org.mockito.Mockito.*;
public class TestContainer {
final NodeManagerMetrics metrics = NodeManagerMetrics.create();
+
/**
* Verify correct container request events sent to localizer.
*/
@Test
- @SuppressWarnings("unchecked") // mocked generic
public void testLocalizationRequest() throws Exception {
- DrainDispatcher dispatcher = new DrainDispatcher();
- dispatcher.init(null);
+ WrappedContainer wc = null;
try {
- dispatcher.start();
- EventHandler<LocalizationEvent> localizerBus = mock(EventHandler.class);
- dispatcher.register(LocalizationEventType.class, localizerBus);
- // null serviceData; no registered AuxServicesEventType handler
-
- ContainerLaunchContext ctxt = mock(ContainerLaunchContext.class);
- ContainerId cId = getMockContainerId(7, 314159265358979L, 4344);
- when(ctxt.getUser()).thenReturn("yak");
- when(ctxt.getContainerId()).thenReturn(cId);
-
- Random r = new Random();
- long seed = r.nextLong();
- r.setSeed(seed);
- System.out.println("testLocalizationRequest seed: " + seed);
- final Map<String,LocalResource> localResources = createLocalResources(r);
- when(ctxt.getAllLocalResources()).thenReturn(localResources);
-
- final Container c = newContainer(dispatcher, ctxt);
- assertEquals(ContainerState.NEW, c.getContainerState());
+ wc = new WrappedContainer(7, 314159265358979L, 4344, "yak");
+ assertEquals(ContainerState.NEW, wc.c.getContainerState());
+ wc.initContainer();
// Verify request for public/private resources to localizer
- c.handle(new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER));
- dispatcher.await();
- ContainerReqMatcher matchesPublicReq =
- new ContainerReqMatcher(localResources,
- EnumSet.of(LocalResourceVisibility.PUBLIC));
- ContainerReqMatcher matchesPrivateReq =
- new ContainerReqMatcher(localResources,
- EnumSet.of(LocalResourceVisibility.PRIVATE));
- ContainerReqMatcher matchesAppReq =
- new ContainerReqMatcher(localResources,
- EnumSet.of(LocalResourceVisibility.APPLICATION));
- verify(localizerBus).handle(argThat(matchesPublicReq));
- verify(localizerBus).handle(argThat(matchesPrivateReq));
- verify(localizerBus).handle(argThat(matchesAppReq));
- assertEquals(ContainerState.LOCALIZING, c.getContainerState());
- } finally {
- dispatcher.stop();
+ ResourcesRequestedMatcher matchesReq =
+ new ResourcesRequestedMatcher(wc.localResources, EnumSet.of(
+ LocalResourceVisibility.PUBLIC, LocalResourceVisibility.PRIVATE,
+ LocalResourceVisibility.APPLICATION));
+ verify(wc.localizerBus).handle(argThat(matchesReq));
+ assertEquals(ContainerState.LOCALIZING, wc.c.getContainerState());
}
+ finally {
+ if (wc != null) {
+ wc.finished();
+ }
+ }
}
/**
* Verify container launch when all resources already cached.
*/
@Test
- @SuppressWarnings("unchecked") // mocked generic
public void testLocalizationLaunch() throws Exception {
- DrainDispatcher dispatcher = new DrainDispatcher();
- dispatcher.init(null);
+ WrappedContainer wc = null;
try {
- dispatcher.start();
- EventHandler<LocalizationEvent> localizerBus = mock(EventHandler.class);
- dispatcher.register(LocalizationEventType.class, localizerBus);
- EventHandler<ContainersLauncherEvent> launcherBus =
- mock(EventHandler.class);
- dispatcher.register(ContainersLauncherEventType.class, launcherBus);
- // null serviceData; no registered AuxServicesEventType handler
-
- ContainerLaunchContext ctxt = mock(ContainerLaunchContext.class);
- ContainerId cId = getMockContainerId(8, 314159265358979L, 4344);
- when(ctxt.getUser()).thenReturn("yak");
- when(ctxt.getContainerId()).thenReturn(cId);
-
- Random r = new Random();
- long seed = r.nextLong();
- r.setSeed(seed);
- System.out.println("testLocalizationLaunch seed: " + seed);
- final Map<String,LocalResource> localResources = createLocalResources(r);
- when(ctxt.getAllLocalResources()).thenReturn(localResources);
- final Container c = newContainer(dispatcher, ctxt);
- assertEquals(ContainerState.NEW, c.getContainerState());
-
- c.handle(new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER));
- dispatcher.await();
-
- // Container prepared for localization events
- Path cache = new Path("file:///cache");
- Map<Path,String> localPaths = new HashMap<Path,String>();
- for (Entry<String,LocalResource> rsrc : localResources.entrySet()) {
- assertEquals(ContainerState.LOCALIZING, c.getContainerState());
- LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue());
- Path p = new Path(cache, rsrc.getKey());
- localPaths.put(p, rsrc.getKey());
- // rsrc copied to p
- c.handle(new ContainerResourceLocalizedEvent(c.getContainerID(), req, p));
- }
- dispatcher.await();
+ wc = new WrappedContainer(8, 314159265358979L, 4344, "yak");
+ assertEquals(ContainerState.NEW, wc.c.getContainerState());
+ wc.initContainer();
+ Map<Path, String> localPaths = wc.localizeResources();
// all resources should be localized
- assertEquals(ContainerState.LOCALIZED, c.getContainerState());
- for (Entry<Path,String> loc : c.getLocalizedResources().entrySet()) {
+ assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState());
+ for (Entry<Path,String> loc : wc.c.getLocalizedResources().entrySet()) {
assertEquals(localPaths.remove(loc.getKey()), loc.getValue());
}
assertTrue(localPaths.isEmpty());
+ final WrappedContainer wcf = wc;
// verify container launch
ArgumentMatcher<ContainersLauncherEvent> matchesContainerLaunch =
new ArgumentMatcher<ContainersLauncherEvent>() {
@Override
public boolean matches(Object o) {
ContainersLauncherEvent launchEvent = (ContainersLauncherEvent) o;
- return c == launchEvent.getContainer();
+ return wcf.c == launchEvent.getContainer();
}
};
- verify(launcherBus).handle(argThat(matchesContainerLaunch));
+ verify(wc.launcherBus).handle(argThat(matchesContainerLaunch));
} finally {
- dispatcher.stop();
+ if (wc != null) {
+ wc.finished();
+ }
}
}
+ @Test
+ @SuppressWarnings("unchecked") // mocked generic
+ public void testCleanupOnFailure() throws Exception {
+ WrappedContainer wc = null;
+ try {
+ wc = new WrappedContainer(10, 314159265358979L, 4344, "yak");
+ wc.initContainer();
+ wc.localizeResources();
+ wc.launchContainer();
+ reset(wc.localizerBus);
+ wc.containerFailed(ExitCode.KILLED.getExitCode());
+ assertEquals(ContainerState.EXITED_WITH_FAILURE,
+ wc.c.getContainerState());
+ verifyCleanupCall(wc);
+ }
+ finally {
+ if (wc != null) {
+ wc.finished();
+ }
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked") // mocked generic
+ public void testCleanupOnSuccess() throws Exception {
+ WrappedContainer wc = null;
+ try {
+ wc = new WrappedContainer(11, 314159265358979L, 4344, "yak");
+ wc.initContainer();
+ wc.localizeResources();
+ wc.launchContainer();
+ reset(wc.localizerBus);
+ wc.containerSuccessful();
+ assertEquals(ContainerState.EXITED_WITH_SUCCESS,
+ wc.c.getContainerState());
+
+ verifyCleanupCall(wc);
+ }
+ finally {
+ if (wc != null) {
+ wc.finished();
+ }
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked") // mocked generic
+ public void testCleanupOnKillRequest() throws Exception {
+ WrappedContainer wc = null;
+ try {
+ wc = new WrappedContainer(12, 314159265358979L, 4344, "yak");
+ wc.initContainer();
+ wc.localizeResources();
+ wc.launchContainer();
+ reset(wc.localizerBus);
+ wc.killContainer();
+ assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+ wc.containerKilledOnRequest();
+
+ verifyCleanupCall(wc);
+ } finally {
+ if (wc != null) {
+ wc.finished();
+ }
+ }
+ }
+
/**
* Verify serviceData correctly sent.
*/
@Test
- @SuppressWarnings("unchecked") // mocked generic
public void testServiceData() throws Exception {
- DrainDispatcher dispatcher = new DrainDispatcher();
- dispatcher.init(null);
- dispatcher.start();
+ WrappedContainer wc = null;
try {
- EventHandler<LocalizationEvent> localizerBus = mock(EventHandler.class);
- dispatcher.register(LocalizationEventType.class, localizerBus);
- EventHandler<AuxServicesEvent> auxBus = mock(EventHandler.class);
- dispatcher.register(AuxServicesEventType.class, auxBus);
- EventHandler<ContainersLauncherEvent> launchBus = mock(EventHandler.class);
- dispatcher.register(ContainersLauncherEventType.class, launchBus);
-
- ContainerLaunchContext ctxt = mock(ContainerLaunchContext.class);
- final ContainerId cId = getMockContainerId(9, 314159265358979L, 4344);
- when(ctxt.getUser()).thenReturn("yak");
- when(ctxt.getContainerId()).thenReturn(cId);
- when(ctxt.getAllLocalResources()).thenReturn(
- Collections.<String,LocalResource>emptyMap());
-
- Random r = new Random();
- long seed = r.nextLong();
- r.setSeed(seed);
- System.out.println("testServiceData seed: " + seed);
- final Map<String,ByteBuffer> serviceData = createServiceData(r);
- when(ctxt.getAllServiceData()).thenReturn(serviceData);
-
- final Container c = newContainer(dispatcher, ctxt);
- assertEquals(ContainerState.NEW, c.getContainerState());
-
- // Verify propagation of service data to AuxServices
- c.handle(new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER));
- dispatcher.await();
- for (final Map.Entry<String,ByteBuffer> e : serviceData.entrySet()) {
+ wc = new WrappedContainer(9, 314159265358979L, 4344, "yak", false, true);
+ assertEquals(ContainerState.NEW, wc.c.getContainerState());
+ wc.initContainer();
+
+ for (final Map.Entry<String,ByteBuffer> e : wc.serviceData.entrySet()) {
ArgumentMatcher<AuxServicesEvent> matchesServiceReq =
new ArgumentMatcher<AuxServicesEvent>() {
@Override
@@ -228,9 +223,10 @@ public class TestContainer {
&& 0 == e.getValue().compareTo(evt.getServiceData());
}
};
- verify(auxBus).handle(argThat(matchesServiceReq));
+ verify(wc.auxBus).handle(argThat(matchesServiceReq));
}
+ final WrappedContainer wcf = wc;
// verify launch on empty resource request
ArgumentMatcher<ContainersLauncherEvent> matchesLaunchReq =
new ArgumentMatcher<ContainersLauncherEvent>() {
@@ -238,61 +234,103 @@ public class TestContainer {
public boolean matches(Object o) {
ContainersLauncherEvent evt = (ContainersLauncherEvent) o;
return evt.getType() == ContainersLauncherEventType.LAUNCH_CONTAINER
- && cId == evt.getContainer().getContainerID();
+ && wcf.cId == evt.getContainer().getContainerID();
}
};
- verify(launchBus).handle(argThat(matchesLaunchReq));
+ verify(wc.launcherBus).handle(argThat(matchesLaunchReq));
} finally {
- dispatcher.stop();
+ if (wc != null) {
+ wc.finished();
+ }
}
}
- // Accept iff the resource request payload matches.
- static class ContainerReqMatcher extends ArgumentMatcher<LocalizationEvent> {
+ private void verifyCleanupCall(WrappedContainer wc) throws Exception {
+ ResourcesReleasedMatcher matchesReq =
+ new ResourcesReleasedMatcher(wc.localResources, EnumSet.of(
+ LocalResourceVisibility.PUBLIC, LocalResourceVisibility.PRIVATE,
+ LocalResourceVisibility.APPLICATION));
+ verify(wc.localizerBus).handle(argThat(matchesReq));
+ }
+
+ private static class ResourcesReleasedMatcher extends
+ ArgumentMatcher<LocalizationEvent> {
final HashSet<LocalResourceRequest> resources =
- new HashSet<LocalResourceRequest>();
- ContainerReqMatcher(Map<String,LocalResource> allResources,
+ new HashSet<LocalResourceRequest>();
+
+ ResourcesReleasedMatcher(Map<String, LocalResource> allResources,
EnumSet<LocalResourceVisibility> vis) throws URISyntaxException {
- for (Entry<String,LocalResource> e : allResources.entrySet()) {
+ for (Entry<String, LocalResource> e : allResources.entrySet()) {
if (vis.contains(e.getValue().getVisibility())) {
resources.add(new LocalResourceRequest(e.getValue()));
}
}
}
+
@Override
public boolean matches(Object o) {
- ContainerLocalizationRequestEvent evt = (ContainerLocalizationRequestEvent) o;
+ if (!(o instanceof ContainerLocalizationCleanupEvent)) {
+ return false;
+ }
+ ContainerLocalizationCleanupEvent evt =
+ (ContainerLocalizationCleanupEvent) o;
final HashSet<LocalResourceRequest> expected =
- new HashSet<LocalResourceRequest>(resources);
- for (LocalResourceRequest rsrc : evt.getRequestedResources()) {
- if (!expected.remove(rsrc)) {
- return false;
+ new HashSet<LocalResourceRequest>(resources);
+ for (Collection<LocalResourceRequest> rc : evt.getResources().values()) {
+ for (LocalResourceRequest rsrc : rc) {
+ if (!expected.remove(rsrc)) {
+ return false;
+ }
}
}
return expected.isEmpty();
}
}
- static Entry<String,LocalResource> getMockRsrc(Random r,
- LocalResourceVisibility vis) {
- LocalResource rsrc = mock(LocalResource.class);
+ // Accept iff the resource payload matches.
+ private static class ResourcesRequestedMatcher extends
+ ArgumentMatcher<LocalizationEvent> {
+ final HashSet<LocalResourceRequest> resources =
+ new HashSet<LocalResourceRequest>();
- String name = Long.toHexString(r.nextLong());
- URL uri = mock(org.apache.hadoop.yarn.api.records.URL.class);
- when(uri.getScheme()).thenReturn("file");
- when(uri.getHost()).thenReturn(null);
- when(uri.getFile()).thenReturn("/local/" + vis + "/" + name);
+ ResourcesRequestedMatcher(Map<String, LocalResource> allResources,
+ EnumSet<LocalResourceVisibility> vis) throws URISyntaxException {
+ for (Entry<String, LocalResource> e : allResources.entrySet()) {
+ if (vis.contains(e.getValue().getVisibility())) {
+ resources.add(new LocalResourceRequest(e.getValue()));
+ }
+ }
+ }
- when(rsrc.getResource()).thenReturn(uri);
- when(rsrc.getSize()).thenReturn(r.nextInt(1024) + 1024L);
- when(rsrc.getTimestamp()).thenReturn(r.nextInt(1024) + 2048L);
- when(rsrc.getType()).thenReturn(LocalResourceType.FILE);
- when(rsrc.getVisibility()).thenReturn(vis);
+ @Override
+ public boolean matches(Object o) {
+ ContainerLocalizationRequestEvent evt =
+ (ContainerLocalizationRequestEvent) o;
+ final HashSet<LocalResourceRequest> expected =
+ new HashSet<LocalResourceRequest>(resources);
+ for (Collection<LocalResourceRequest> rc : evt.getRequestedResources()
+ .values()) {
+ for (LocalResourceRequest rsrc : rc) {
+ if (!expected.remove(rsrc)) {
+ return false;
+ }
+ }
+ }
+ return expected.isEmpty();
+ }
+ }
- return new SimpleEntry<String,LocalResource>(name, rsrc);
+ private static Entry<String, LocalResource> getMockRsrc(Random r,
+ LocalResourceVisibility vis) {
+ String name = Long.toHexString(r.nextLong());
+ URL url = BuilderUtils.newURL("file", null, 0, "/local" + vis + "/" + name);
+ LocalResource rsrc =
+ BuilderUtils.newLocalResource(url, LocalResourceType.FILE, vis,
+ r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L);
+ return new SimpleEntry<String, LocalResource>(name, rsrc);
}
- static Map<String,LocalResource> createLocalResources(Random r) {
+ private static Map<String,LocalResource> createLocalResources(Random r) {
Map<String,LocalResource> localResources =
new HashMap<String,LocalResource>();
for (int i = r.nextInt(5) + 5; i >= 0; --i) {
@@ -313,17 +351,7 @@ public class TestContainer {
return localResources;
}
- static ContainerId getMockContainerId(int appId, long timestamp, int id) {
- ApplicationId aId = mock(ApplicationId.class);
- when(aId.getId()).thenReturn(appId);
- when(aId.getClusterTimestamp()).thenReturn(timestamp);
- ContainerId cId = mock(ContainerId.class);
- when(cId.getId()).thenReturn(id);
- when(cId.getAppId()).thenReturn(aId);
- return cId;
- }
-
- static Map<String,ByteBuffer> createServiceData(Random r) {
+ private static Map<String,ByteBuffer> createServiceData(Random r) {
Map<String,ByteBuffer> serviceData =
new HashMap<String,ByteBuffer>();
for (int i = r.nextInt(5) + 5; i >= 0; --i) {
@@ -335,7 +363,134 @@ public class TestContainer {
return serviceData;
}
- Container newContainer(Dispatcher disp, ContainerLaunchContext ctx) {
+ private Container newContainer(Dispatcher disp, ContainerLaunchContext ctx) {
return new ContainerImpl(disp, ctx, null, metrics);
}
+
+ @SuppressWarnings("unchecked")
+ private class WrappedContainer {
+ final DrainDispatcher dispatcher;
+ final EventHandler<LocalizationEvent> localizerBus;
+ final EventHandler<ContainersLauncherEvent> launcherBus;
+ final EventHandler<ContainersMonitorEvent> monitorBus;
+ final EventHandler<AuxServicesEvent> auxBus;
+
+ final ContainerLaunchContext ctxt;
+ final ContainerId cId;
+ final Container c;
+ final Map<String, LocalResource> localResources;
+ final Map<String, ByteBuffer> serviceData;
+ final String user;
+
+ WrappedContainer(int appId, long timestamp, int id, String user) {
+ this(appId, timestamp, id, user, true, false);
+ }
+
+ WrappedContainer(int appId, long timestamp, int id, String user,
+ boolean withLocalRes, boolean withServiceData) {
+ dispatcher = new DrainDispatcher();
+ dispatcher.init(null);
+
+ localizerBus = mock(EventHandler.class);
+ launcherBus = mock(EventHandler.class);
+ monitorBus = mock(EventHandler.class);
+ auxBus = mock(EventHandler.class);
+ dispatcher.register(LocalizationEventType.class, localizerBus);
+ dispatcher.register(ContainersLauncherEventType.class, launcherBus);
+ dispatcher.register(ContainersMonitorEventType.class, monitorBus);
+ dispatcher.register(AuxServicesEventType.class, auxBus);
+ this.user = user;
+
+ ctxt = mock(ContainerLaunchContext.class);
+ cId = BuilderUtils.newContainerId(appId, 1, timestamp, id);
+ when(ctxt.getUser()).thenReturn(this.user);
+ when(ctxt.getContainerId()).thenReturn(cId);
+
+ Resource resource = BuilderUtils.newResource(1024);
+ when(ctxt.getResource()).thenReturn(resource);
+
+ if (withLocalRes) {
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+ System.out.println("WrappedContainerLocalResource seed: " + seed);
+ localResources = createLocalResources(r);
+ } else {
+ localResources = Collections.<String, LocalResource> emptyMap();
+ }
+ when(ctxt.getLocalResources()).thenReturn(localResources);
+
+ if (withServiceData) {
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+ System.out.println("ServiceData seed: " + seed);
+ serviceData = createServiceData(r);
+ } else {
+ serviceData = Collections.<String, ByteBuffer> emptyMap();
+ }
+ when(ctxt.getServiceData()).thenReturn(serviceData);
+
+ c = newContainer(dispatcher, ctxt);
+ dispatcher.start();
+ }
+
+ private void drainDispatcherEvents() {
+ dispatcher.await();
+ }
+
+ public void finished() {
+ dispatcher.stop();
+ }
+
+ public void initContainer() {
+ c.handle(new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER));
+ drainDispatcherEvents();
+ }
+
+ public Map<Path, String> localizeResources() throws URISyntaxException {
+ Path cache = new Path("file:///cache");
+ Map<Path, String> localPaths = new HashMap<Path, String>();
+ for (Entry<String, LocalResource> rsrc : localResources.entrySet()) {
+ assertEquals(ContainerState.LOCALIZING, c.getContainerState());
+ LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue());
+ Path p = new Path(cache, rsrc.getKey());
+ localPaths.put(p, rsrc.getKey());
+ // rsrc copied to p
+ c.handle(new ContainerResourceLocalizedEvent(c.getContainerID(),
+ req, p));
+ }
+ drainDispatcherEvents();
+ return localPaths;
+ }
+
+ public void launchContainer() {
+ c.handle(new ContainerEvent(cId, ContainerEventType.CONTAINER_LAUNCHED));
+ drainDispatcherEvents();
+ }
+
+ public void containerSuccessful() {
+ c.handle(new ContainerEvent(cId,
+ ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
+ drainDispatcherEvents();
+ }
+
+ public void containerFailed(int exitCode) {
+ c.handle(new ContainerExitEvent(cId,
+ ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, exitCode));
+ drainDispatcherEvents();
+ }
+
+ public void killContainer() {
+ c.handle(new ContainerKillEvent(cId, "KillRequest"));
+ drainDispatcherEvents();
+ }
+
+ public void containerKilledOnRequest() {
+ c.handle(new ContainerExitEvent(cId,
+ ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ExitCode.KILLED
+ .getExitCode()));
+ drainDispatcherEvents();
+ }
+ }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java Tue Sep 13 22:49:27 2011
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.no
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -48,9 +49,12 @@ public class TestLocalizedResource {
ApplicationId appId = mock(ApplicationId.class);
when(appId.getClusterTimestamp()).thenReturn(314159265L);
when(appId.getId()).thenReturn(3);
+ ApplicationAttemptId appAttemptId = mock(ApplicationAttemptId.class);
+ when(appAttemptId.getApplicationId()).thenReturn(appId);
+ when(appAttemptId.getAttemptId()).thenReturn(0);
ContainerId container = mock(ContainerId.class);
when(container.getId()).thenReturn(id);
- when(container.getAppId()).thenReturn(appId);
+ when(container.getApplicationAttemptId()).thenReturn(appAttemptId);
return container;
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Tue Sep 13 22:49:27 2011
@@ -21,10 +21,17 @@ package org.apache.hadoop.yarn.server.no
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Random;
+import java.util.Set;
+
+import junit.framework.Assert;
import org.apache.avro.ipc.Server;
import org.apache.hadoop.conf.Configuration;
@@ -46,6 +53,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -62,11 +70,15 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerTracker;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
+import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
import org.junit.Test;
import static org.junit.Assert.*;
@@ -74,8 +86,6 @@ import static org.junit.Assert.*;
import org.mockito.ArgumentMatcher;
import static org.mockito.Mockito.*;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOCAL_DIR;
-
public class TestResourceLocalizationService {
static final Path basedir =
@@ -110,7 +120,7 @@ public class TestResourceLocalizationSer
localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
sDirs[i] = localDirs.get(i).toString();
}
- conf.setStrings(NM_LOCAL_DIR, sDirs);
+ conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
// initialize ResourceLocalizationService
locService.init(conf);
@@ -135,6 +145,190 @@ public class TestResourceLocalizationSer
@Test
@SuppressWarnings("unchecked") // mocked generics
+ public void testResourceRelease() throws Exception {
+ Configuration conf = new Configuration();
+ AbstractFileSystem spylfs =
+ spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+ final FileContext lfs = FileContext.getFileContext(spylfs, conf);
+ doNothing().when(spylfs).mkdir(
+ isA(Path.class), isA(FsPermission.class), anyBoolean());
+
+ List<Path> localDirs = new ArrayList<Path>();
+ String[] sDirs = new String[4];
+ for (int i = 0; i < 4; ++i) {
+ localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
+ sDirs[i] = localDirs.get(i).toString();
+ }
+ conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+
+ Server ignore = mock(Server.class);
+ LocalizerTracker mockLocallilzerTracker = mock(LocalizerTracker.class);
+ DrainDispatcher dispatcher = new DrainDispatcher();
+ dispatcher.init(conf);
+ dispatcher.start();
+ EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+ dispatcher.register(ApplicationEventType.class, applicationBus);
+ EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
+ dispatcher.register(ContainerEventType.class, containerBus);
+ //Ignore actual localization
+ EventHandler<LocalizerEvent> localizerBus = mock(EventHandler.class);
+ dispatcher.register(LocalizerEventType.class, localizerBus);
+
+ ContainerExecutor exec = mock(ContainerExecutor.class);
+ DeletionService delService = new DeletionService(exec);
+ delService.init(null);
+ delService.start();
+
+ ResourceLocalizationService rawService =
+ new ResourceLocalizationService(dispatcher, exec, delService);
+ ResourceLocalizationService spyService = spy(rawService);
+ doReturn(ignore).when(spyService).createServer();
+ doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
+ isA(Configuration.class));
+ doReturn(lfs).when(spyService)
+ .getLocalFileContext(isA(Configuration.class));
+ try {
+ spyService.init(conf);
+ spyService.start();
+
+ final String user = "user0";
+ // init application
+ final Application app = mock(Application.class);
+ final ApplicationId appId =
+ BuilderUtils.newApplicationId(314159265358979L, 3);
+ when(app.getUser()).thenReturn(user);
+ when(app.getAppId()).thenReturn(appId);
+ spyService.handle(new ApplicationLocalizationEvent(
+ LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
+ dispatcher.await();
+
+ //Get a handle on the trackers after they're setup with INIT_APP_RESOURCES
+ LocalResourcesTracker appTracker =
+ spyService.getLocalResourcesTracker(
+ LocalResourceVisibility.APPLICATION, user, appId);
+ LocalResourcesTracker privTracker =
+ spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE,
+ user, appId);
+ LocalResourcesTracker pubTracker =
+ spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC,
+ user, appId);
+
+ // init container.
+ final Container c = getMockContainer(appId, 42);
+
+ // init resources
+ Random r = new Random();
+ long seed = r.nextLong();
+ System.out.println("SEED: " + seed);
+ r.setSeed(seed);
+
+ // Send localization requests for one resource of each type.
+ final LocalResource privResource = getPrivateMockedResource(r);
+ final LocalResourceRequest privReq =
+ new LocalResourceRequest(privResource);
+
+ final LocalResource pubResource = getPublicMockedResource(r);
+ final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource);
+ final LocalResource pubResource2 = getPublicMockedResource(r);
+ final LocalResourceRequest pubReq2 =
+ new LocalResourceRequest(pubResource2);
+
+ final LocalResource appResource = getAppMockedResource(r);
+ final LocalResourceRequest appReq = new LocalResourceRequest(appResource);
+
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
+ new HashMap<LocalResourceVisibility,
+ Collection<LocalResourceRequest>>();
+ req.put(LocalResourceVisibility.PRIVATE,
+ Collections.singletonList(privReq));
+ req.put(LocalResourceVisibility.PUBLIC,
+ Collections.singletonList(pubReq));
+ req.put(LocalResourceVisibility.APPLICATION,
+ Collections.singletonList(appReq));
+
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req2 =
+ new HashMap<LocalResourceVisibility,
+ Collection<LocalResourceRequest>>();
+ req2.put(LocalResourceVisibility.PRIVATE,
+ Collections.singletonList(privReq));
+ req2.put(LocalResourceVisibility.PUBLIC,
+ Collections.singletonList(pubReq2));
+
+ Set<LocalResourceRequest> pubRsrcs = new HashSet<LocalResourceRequest>();
+ pubRsrcs.add(pubReq);
+ pubRsrcs.add(pubReq2);
+
+ // Send Request event
+ spyService.handle(new ContainerLocalizationRequestEvent(c, req));
+ spyService.handle(new ContainerLocalizationRequestEvent(c, req2));
+ dispatcher.await();
+
+ int privRsrcCount = 0;
+ for (LocalizedResource lr : privTracker) {
+ privRsrcCount++;
+ Assert.assertEquals("Incorrect reference count", 2, lr.getRefCount());
+ Assert.assertEquals(privReq, lr.getRequest());
+ }
+ Assert.assertEquals(1, privRsrcCount);
+
+ int pubRsrcCount = 0;
+ for (LocalizedResource lr : pubTracker) {
+ pubRsrcCount++;
+ Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount());
+ pubRsrcs.remove(lr.getRequest());
+ }
+ Assert.assertEquals(0, pubRsrcs.size());
+ Assert.assertEquals(2, pubRsrcCount);
+
+ int appRsrcCount = 0;
+ for (LocalizedResource lr : appTracker) {
+ appRsrcCount++;
+ Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount());
+ Assert.assertEquals(appReq, lr.getRequest());
+ }
+ Assert.assertEquals(1, appRsrcCount);
+
+ //Send Cleanup Event
+ spyService.handle(new ContainerLocalizationCleanupEvent(c, req));
+ req2.remove(LocalResourceVisibility.PRIVATE);
+ spyService.handle(new ContainerLocalizationCleanupEvent(c, req2));
+ dispatcher.await();
+
+ pubRsrcs.add(pubReq);
+ pubRsrcs.add(pubReq2);
+
+ privRsrcCount = 0;
+ for (LocalizedResource lr : privTracker) {
+ privRsrcCount++;
+ Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount());
+ Assert.assertEquals(privReq, lr.getRequest());
+ }
+ Assert.assertEquals(1, privRsrcCount);
+
+ pubRsrcCount = 0;
+ for (LocalizedResource lr : pubTracker) {
+ pubRsrcCount++;
+ Assert.assertEquals("Incorrect reference count", 0, lr.getRefCount());
+ pubRsrcs.remove(lr.getRequest());
+ }
+ Assert.assertEquals(0, pubRsrcs.size());
+ Assert.assertEquals(2, pubRsrcCount);
+
+ appRsrcCount = 0;
+ for (LocalizedResource lr : appTracker) {
+ appRsrcCount++;
+ Assert.assertEquals("Incorrect reference count", 0, lr.getRefCount());
+ Assert.assertEquals(appReq, lr.getRequest());
+ }
+ Assert.assertEquals(1, appRsrcCount);
+ } finally {
+ dispatcher.stop();
+ delService.stop();
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked") // mocked generics
public void testLocalizationHeartbeat() throws Exception {
Configuration conf = new Configuration();
AbstractFileSystem spylfs =
@@ -149,7 +343,7 @@ public class TestResourceLocalizationSer
localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
sDirs[i] = localDirs.get(i).toString();
}
- conf.setStrings(NM_LOCAL_DIR, sDirs);
+ conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
Server ignore = mock(Server.class);
DrainDispatcher dispatcher = new DrainDispatcher();
@@ -176,9 +370,8 @@ public class TestResourceLocalizationSer
// init application
final Application app = mock(Application.class);
- final ApplicationId appId = mock(ApplicationId.class);
- when(appId.getClusterTimestamp()).thenReturn(314159265358979L);
- when(appId.getId()).thenReturn(3);
+ final ApplicationId appId =
+ BuilderUtils.newApplicationId(314159265358979L, 3);
when(app.getUser()).thenReturn("user0");
when(app.getAppId()).thenReturn(appId);
spyService.handle(new ApplicationLocalizationEvent(
@@ -206,11 +399,13 @@ public class TestResourceLocalizationSer
doReturn(out).when(spylfs).createInternal(isA(Path.class),
isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
anyLong(), isA(Progressable.class), anyInt(), anyBoolean());
- final LocalResource resource = getMockResource(r);
+ final LocalResource resource = getPrivateMockedResource(r);
final LocalResourceRequest req = new LocalResourceRequest(resource);
- spyService.handle(new ContainerLocalizationRequestEvent(
- c, Collections.singletonList(req),
- LocalResourceVisibility.PRIVATE));
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
+ new HashMap<LocalResourceVisibility,
+ Collection<LocalResourceRequest>>();
+ rsrcs.put(LocalResourceVisibility.PRIVATE, Collections.singletonList(req));
+ spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
// Sigh. Thread init of private localizer not accessible
Thread.sleep(500);
dispatcher.await();
@@ -266,42 +461,44 @@ public class TestResourceLocalizationSer
}
}
- static URL getPath(String path) {
- URL uri = mock(org.apache.hadoop.yarn.api.records.URL.class);
- when(uri.getScheme()).thenReturn("file");
- when(uri.getHost()).thenReturn(null);
- when(uri.getFile()).thenReturn(path);
- return uri;
+ private static URL getPath(String path) {
+ URL url = BuilderUtils.newURL("file", null, 0, path);
+ return url;
}
- static LocalResource getMockResource(Random r) {
- LocalResource rsrc = mock(LocalResource.class);
-
+ private static LocalResource getMockedResource(Random r,
+ LocalResourceVisibility vis) {
String name = Long.toHexString(r.nextLong());
- URL uri = getPath("/local/PRIVATE/" + name);
-
- when(rsrc.getResource()).thenReturn(uri);
- when(rsrc.getSize()).thenReturn(r.nextInt(1024) + 1024L);
- when(rsrc.getTimestamp()).thenReturn(r.nextInt(1024) + 2048L);
- when(rsrc.getType()).thenReturn(LocalResourceType.FILE);
- when(rsrc.getVisibility()).thenReturn(LocalResourceVisibility.PRIVATE);
+ URL url = getPath("/local/PRIVATE/" + name);
+ LocalResource rsrc =
+ BuilderUtils.newLocalResource(url, LocalResourceType.FILE, vis,
+ r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L);
return rsrc;
}
+
+ private static LocalResource getAppMockedResource(Random r) {
+ return getMockedResource(r, LocalResourceVisibility.APPLICATION);
+ }
+
+ private static LocalResource getPublicMockedResource(Random r) {
+ return getMockedResource(r, LocalResourceVisibility.PUBLIC);
+ }
+
+ private static LocalResource getPrivateMockedResource(Random r) {
+ return getMockedResource(r, LocalResourceVisibility.PRIVATE);
+ }
- static Container getMockContainer(ApplicationId appId, int id) {
+ private static Container getMockContainer(ApplicationId appId, int id) {
Container c = mock(Container.class);
- ApplicationAttemptId appAttemptId = Records.newRecord(ApplicationAttemptId.class);
- appAttemptId.setApplicationId(appId);
- appAttemptId.setAttemptId(1);
- ContainerId cId = Records.newRecord(ContainerId.class);
- cId.setAppAttemptId(appAttemptId);
- cId.setAppId(appId);
- cId.setId(id);
+ ApplicationAttemptId appAttemptId =
+ BuilderUtils.newApplicationAttemptId(appId, 1);
+ ContainerId cId = BuilderUtils.newContainerId(appAttemptId, id);
when(c.getUser()).thenReturn("user0");
when(c.getContainerID()).thenReturn(cId);
Credentials creds = new Credentials();
creds.addToken(new Text("tok" + id), getToken(id));
when(c.getCredentials()).thenReturn(creds);
+ when(c.toString()).thenReturn(cId.toString());
return c;
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java Tue Sep 13 22:49:27 2011
@@ -25,8 +25,10 @@ import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import junit.framework.Assert;
@@ -47,11 +49,11 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogReader;
@@ -92,8 +94,8 @@ public class TestLogAggregationService e
public void testLocalFileDeletionAfterUpload() throws IOException {
this.delSrvc = new DeletionService(createContainerExecutor());
this.delSrvc.init(conf);
- this.conf.set(NMConfig.NM_LOG_DIR, localLogDir.getAbsolutePath());
- this.conf.set(NMConfig.REMOTE_USER_LOG_DIR,
+ this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
+ this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
LogAggregationService logAggregationService =
new LogAggregationService(this.delSrvc);
@@ -118,8 +120,8 @@ public class TestLogAggregationService e
BuilderUtils.newContainerId(recordFactory, application1, appAttemptId, 1);
// Simulate log-file creation
writeContainerLogs(app1LogDir, container11);
- logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
- container11, "0"));
+ logAggregationService.handle(
+ new LogAggregatorContainerFinishedEvent(container11, 0));
logAggregationService.handle(new LogAggregatorAppFinishedEvent(
application1));
@@ -140,8 +142,8 @@ public class TestLogAggregationService e
@Test
public void testNoContainerOnNode() {
- this.conf.set(NMConfig.NM_LOG_DIR, localLogDir.getAbsolutePath());
- this.conf.set(NMConfig.REMOTE_USER_LOG_DIR,
+ this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
+ this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
LogAggregationService logAggregationService =
new LogAggregationService(this.delSrvc);
@@ -173,8 +175,8 @@ public class TestLogAggregationService e
@Test
public void testMultipleAppsLogAggregation() throws IOException {
- this.conf.set(NMConfig.NM_LOG_DIR, localLogDir.getAbsolutePath());
- this.conf.set(NMConfig.REMOTE_USER_LOG_DIR,
+ this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
+ this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
LogAggregationService logAggregationService =
new LogAggregationService(this.delSrvc);
@@ -192,17 +194,19 @@ public class TestLogAggregationService e
application1, this.user, null,
ContainerLogsRetentionPolicy.ALL_CONTAINERS));
- ApplicationAttemptId appAttemptId1 = recordFactory.newRecordInstance(ApplicationAttemptId.class);
+ ApplicationAttemptId appAttemptId1 =
+ recordFactory.newRecordInstance(ApplicationAttemptId.class);
appAttemptId1.setApplicationId(application1);
ContainerId container11 =
BuilderUtils.newContainerId(recordFactory, application1, appAttemptId1, 1);
// Simulate log-file creation
writeContainerLogs(app1LogDir, container11);
- logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
- container11, "0"));
+ logAggregationService.handle(
+ new LogAggregatorContainerFinishedEvent(container11, 0));
ApplicationId application2 = BuilderUtils.newApplicationId(1234, 2);
- ApplicationAttemptId appAttemptId2 = recordFactory.newRecordInstance(ApplicationAttemptId.class);
+ ApplicationAttemptId appAttemptId2 =
+ recordFactory.newRecordInstance(ApplicationAttemptId.class);
appAttemptId1.setApplicationId(application2);
File app2LogDir =
@@ -214,19 +218,22 @@ public class TestLogAggregationService e
ContainerId container21 =
- BuilderUtils.newContainerId(recordFactory, application2, appAttemptId2, 1);
+ BuilderUtils.newContainerId(recordFactory, application2,
+ appAttemptId2, 1);
writeContainerLogs(app2LogDir, container21);
- logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
- container21, "0"));
+ logAggregationService.handle(
+ new LogAggregatorContainerFinishedEvent(container21, 0));
ContainerId container12 =
- BuilderUtils.newContainerId(recordFactory, application1, appAttemptId1, 2);
+ BuilderUtils.newContainerId(recordFactory, application1, appAttemptId1,
+ 2);
writeContainerLogs(app1LogDir, container12);
- logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
- container12, "0"));
+ logAggregationService.handle(
+ new LogAggregatorContainerFinishedEvent(container12, 0));
ApplicationId application3 = BuilderUtils.newApplicationId(1234, 3);
- ApplicationAttemptId appAttemptId3 = recordFactory.newRecordInstance(ApplicationAttemptId.class);
+ ApplicationAttemptId appAttemptId3 =
+ recordFactory.newRecordInstance(ApplicationAttemptId.class);
appAttemptId1.setApplicationId(application3);
File app3LogDir =
@@ -237,28 +244,32 @@ public class TestLogAggregationService e
ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY));
ContainerId container31 =
- BuilderUtils.newContainerId(recordFactory, application3, appAttemptId3, 1);
+ BuilderUtils.newContainerId(recordFactory, application3, appAttemptId3,
+ 1);
writeContainerLogs(app3LogDir, container31);
- logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
- container31, "0"));
+ logAggregationService.handle(
+ new LogAggregatorContainerFinishedEvent(container31, 0));
ContainerId container32 =
- BuilderUtils.newContainerId(recordFactory, application3, appAttemptId3, 2);
+ BuilderUtils.newContainerId(recordFactory, application3, appAttemptId3,
+ 2);
writeContainerLogs(app3LogDir, container32);
- logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
- container32, "1")); // Failed container
+ logAggregationService.handle(
+ new LogAggregatorContainerFinishedEvent(container32, 1)); // Failed
ContainerId container22 =
- BuilderUtils.newContainerId(recordFactory, application2, appAttemptId2, 2);
+ BuilderUtils.newContainerId(recordFactory, application2, appAttemptId2,
+ 2);
writeContainerLogs(app2LogDir, container22);
- logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
- container22, "0"));
+ logAggregationService.handle(
+ new LogAggregatorContainerFinishedEvent(container22, 0));
ContainerId container33 =
- BuilderUtils.newContainerId(recordFactory, application3, appAttemptId3, 3);
+ BuilderUtils.newContainerId(recordFactory, application3, appAttemptId3,
+ 3);
writeContainerLogs(app3LogDir, container33);
- logAggregationService.handle(new LogAggregatorContainerFinishedEvent(
- container33, "0"));
+ logAggregationService.handle(
+ new LogAggregatorContainerFinishedEvent(container33, 0));
logAggregationService.handle(new LogAggregatorAppFinishedEvent(
application2));
@@ -387,8 +398,15 @@ public class TestLogAggregationService e
// ////// Construct the Container-id
ApplicationId appId =
recordFactory.newRecordInstance(ApplicationId.class);
+ appId.setClusterTimestamp(0);
+ appId.setId(0);
+ ApplicationAttemptId appAttemptId =
+ recordFactory.newRecordInstance(ApplicationAttemptId.class);
+ appAttemptId.setApplicationId(appId);
+ appAttemptId.setAttemptId(1);
ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
- cId.setAppId(appId);
+ cId.setId(0);
+ cId.setApplicationAttemptId(appAttemptId);
containerLaunchContext.setContainerId(cId);
containerLaunchContext.setUser(this.user);
@@ -404,10 +422,15 @@ public class TestLogAggregationService e
rsrc_alpha.setType(LocalResourceType.FILE);
rsrc_alpha.setTimestamp(scriptFile.lastModified());
String destinationFile = "dest_file";
- containerLaunchContext.setLocalResource(destinationFile, rsrc_alpha);
+ Map<String, LocalResource> localResources =
+ new HashMap<String, LocalResource>();
+ localResources.put(destinationFile, rsrc_alpha);
+ containerLaunchContext.setLocalResources(localResources);
containerLaunchContext.setUser(containerLaunchContext.getUser());
- containerLaunchContext.addCommand("/bin/bash");
- containerLaunchContext.addCommand(scriptFile.getAbsolutePath());
+ List<String> commands = new ArrayList<String>();
+ commands.add("/bin/bash");
+ commands.add(scriptFile.getAbsolutePath());
+ containerLaunchContext.setCommands(commands);
containerLaunchContext.setResource(recordFactory
.newRecordInstance(Resource.class));
containerLaunchContext.getResource().setMemory(100 * 1024 * 1024);
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java Tue Sep 13 22:49:27 2011
@@ -26,6 +26,10 @@ import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.regex.Pattern;
import junit.framework.Assert;
@@ -47,6 +51,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
@@ -70,7 +75,7 @@ public class TestContainersMonitor exten
@Before
public void setup() throws IOException {
conf.setClass(
- ContainersMonitorImpl.RESOURCE_CALCULATOR_PLUGIN_CONFIG_KEY,
+ YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
LinuxResourceCalculatorPlugin.class, ResourceCalculatorPlugin.class);
super.setup();
}
@@ -191,13 +196,15 @@ public class TestContainersMonitor exten
// ////// Construct the Container-id
ApplicationId appId =
recordFactory.newRecordInstance(ApplicationId.class);
- ApplicationAttemptId appAttemptId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
+ appId.setClusterTimestamp(0);
+ appId.setId(0);
+ ApplicationAttemptId appAttemptId =
+ recordFactory.newRecordInstance(ApplicationAttemptId.class);
appAttemptId.setApplicationId(appId);
appAttemptId.setAttemptId(1);
ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
- cId.setAppId(appId);
cId.setId(0);
- cId.setAppAttemptId(appAttemptId);
+ cId.setApplicationAttemptId(appAttemptId);
containerLaunchContext.setContainerId(cId);
containerLaunchContext.setUser(user);
@@ -213,10 +220,15 @@ public class TestContainersMonitor exten
rsrc_alpha.setType(LocalResourceType.FILE);
rsrc_alpha.setTimestamp(scriptFile.lastModified());
String destinationFile = "dest_file";
- containerLaunchContext.setLocalResource(destinationFile, rsrc_alpha);
+ Map<String, LocalResource> localResources =
+ new HashMap<String, LocalResource>();
+ localResources.put(destinationFile, rsrc_alpha);
+ containerLaunchContext.setLocalResources(localResources);
containerLaunchContext.setUser(containerLaunchContext.getUser());
- containerLaunchContext.addCommand("/bin/bash");
- containerLaunchContext.addCommand(scriptFile.getAbsolutePath());
+ List<String> commands = new ArrayList<String>();
+ commands.add("/bin/bash");
+ commands.add(scriptFile.getAbsolutePath());
+ containerLaunchContext.setCommands(commands);
containerLaunchContext.setResource(recordFactory
.newRecordInstance(Resource.class));
containerLaunchContext.getResource().setMemory(8 * 1024 * 1024);
@@ -250,7 +262,7 @@ public class TestContainersMonitor exten
gcsRequest.setContainerId(cId);
ContainerStatus containerStatus =
containerManager.getContainerStatus(gcsRequest).getStatus();
- Assert.assertEquals(String.valueOf(ExitCode.KILLED.getExitCode()),
+ Assert.assertEquals(ExitCode.KILLED.getExitCode(),
containerStatus.getExitStatus());
String expectedMsgPattern =
"Container \\[pid=" + pid + ",containerID=" + cId
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java Tue Sep 13 22:49:27 2011
@@ -28,12 +28,12 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.Context;
-import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
@@ -72,7 +72,7 @@ public class TestNMWebServer {
};
WebServer server = new WebServer(nmContext, resourceView);
Configuration conf = new Configuration();
- conf.set(NMConfig.NM_LOCAL_DIR, testRootDir.getAbsolutePath());
+ conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
server.init(conf);
server.start();
@@ -112,7 +112,9 @@ public class TestNMWebServer {
};
nmContext.getContainers().put(containerId, container);
//TODO: Gross hack. Fix in code.
- nmContext.getApplications().get(containerId.getAppId()).getContainers()
+ ApplicationId applicationId =
+ containerId.getApplicationAttemptId().getApplicationId();
+ nmContext.getApplications().get(applicationId).getContainers()
.put(containerId, container);
writeContainerLogs(conf, nmContext, containerId);
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/security/admin/AdminSecurityInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/security/admin/AdminSecurityInfo.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/security/admin/AdminSecurityInfo.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/security/admin/AdminSecurityInfo.java Tue Sep 13 22:49:27 2011
@@ -43,7 +43,7 @@ public class AdminSecurityInfo extends S
@Override
public String serverPrincipal() {
- return YarnConfiguration.RM_SERVER_PRINCIPAL_KEY;
+ return YarnConfiguration.RM_PRINCIPAL;
}
@Override
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java Tue Sep 13 22:49:27 2011
@@ -83,12 +83,12 @@ public class AdminService extends Abstra
public void init(Configuration conf) {
super.init(conf);
String bindAddress =
- conf.get(RMConfig.ADMIN_ADDRESS,
- RMConfig.DEFAULT_ADMIN_BIND_ADDRESS);
+ conf.get(YarnConfiguration.RM_ADMIN_ADDRESS,
+ YarnConfiguration.RM_ADMIN_ADDRESS);
masterServiceAddress = NetUtils.createSocketAddr(bindAddress);
adminAcl =
new AccessControlList(
- conf.get(RMConfig.RM_ADMIN_ACL, RMConfig.DEFAULT_RM_ADMIN_ACL));
+ conf.get(YarnConfiguration.RM_ADMIN_ACL, YarnConfiguration.DEFAULT_RM_ADMIN_ACL));
}
public void start() {
@@ -100,8 +100,8 @@ public class AdminService extends Abstra
this.server =
rpc.getServer(RMAdminProtocol.class, this, masterServiceAddress,
serverConf, null,
- serverConf.getInt(RMConfig.RM_ADMIN_THREADS,
- RMConfig.DEFAULT_RM_ADMIN_THREADS));
+ serverConf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT));
this.server.start();
super.start();
}
@@ -219,7 +219,7 @@ public class AdminService extends Abstra
Configuration conf = new Configuration();
adminAcl =
new AccessControlList(
- conf.get(RMConfig.RM_ADMIN_ACL, RMConfig.DEFAULT_RM_ADMIN_ACL));
+ conf.get(YarnConfiguration.RM_ADMIN_ACL, YarnConfiguration.DEFAULT_RM_ADMIN_ACL));
RMAuditLogger.logSuccess(user.getShortUserName(), "refreshAdminAcls",
"AdminService");
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationACLsManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationACLsManager.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationACLsManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationACLsManager.java Tue Sep 13 22:49:27 2011
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
@InterfaceAudience.Private
public class ApplicationACLsManager {
@@ -36,7 +37,8 @@ public class ApplicationACLsManager {
}
public boolean areACLsEnabled() {
- return conf.getBoolean(RMConfig.RM_ACLS_ENABLED, false);
+ return conf.getBoolean(YarnConfiguration.RM_ACL_ENABLE,
+ YarnConfiguration.DEFAULT_RM_ACL_ENABLE);
}
/**
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Tue Sep 13 22:49:27 2011
@@ -90,8 +90,8 @@ public class ApplicationMasterService ex
@Override
public void init(Configuration conf) {
String bindAddress =
- conf.get(YarnConfiguration.SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_SCHEDULER_BIND_ADDRESS);
+ conf.get(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS);
masterServiceAddress = NetUtils.createSocketAddr(bindAddress);
super.init(conf);
}
@@ -105,8 +105,8 @@ public class ApplicationMasterService ex
this.server =
rpc.getServer(AMRMProtocol.class, this, masterServiceAddress,
serverConf, this.appTokenManager,
- serverConf.getInt(RMConfig.RM_AM_THREADS,
- RMConfig.DEFAULT_RM_AM_THREADS));
+ serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
this.server.start();
super.start();
}
@@ -232,8 +232,8 @@ public class ApplicationMasterService ex
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
- response.addAllNewContainers(allocation.getContainers());
- response.addAllFinishedContainers(appAttempt
+ response.setAllocatedContainers(allocation.getContainers());
+ response.setCompletedContainersStatuses(appAttempt
.pullJustFinishedContainers());
response.setResponseId(lastResponse.getResponseId() + 1);
response.setAvailableResources(allocation.getResourceLimit());
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Tue Sep 13 22:49:27 2011
@@ -31,7 +31,6 @@ import org.apache.avro.ipc.Server;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation;
@@ -58,10 +57,8 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.QueueInfo;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
@@ -74,7 +71,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -95,6 +91,7 @@ public class ClientRMService extends Abs
final private YarnScheduler scheduler;
final private RMContext rmContext;
private final AMLivelinessMonitor amLivelinessMonitor;
+ private final RMAppManager rmAppManager;
private String clientServiceBindAddress;
private Server server;
@@ -104,18 +101,20 @@ public class ClientRMService extends Abs
private ApplicationACLsManager aclsManager;
private Map<ApplicationACL, AccessControlList> applicationACLs;
- public ClientRMService(RMContext rmContext, YarnScheduler scheduler) {
+ public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
+ RMAppManager rmAppManager) {
super(ClientRMService.class.getName());
this.scheduler = scheduler;
this.rmContext = rmContext;
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
+ this.rmAppManager = rmAppManager;
}
@Override
public void init(Configuration conf) {
clientServiceBindAddress =
- conf.get(YarnConfiguration.APPSMANAGER_ADDRESS,
- YarnConfiguration.DEFAULT_APPSMANAGER_BIND_ADDRESS);
+ conf.get(YarnConfiguration.RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_ADDRESS);
clientBindAddress =
NetUtils.createSocketAddr(clientServiceBindAddress);
@@ -138,8 +137,8 @@ public class ClientRMService extends Abs
rpc.getServer(ClientRMProtocol.class, this,
clientBindAddress,
clientServerConf, null,
- clientServerConf.getInt(RMConfig.RM_CLIENT_THREADS,
- RMConfig.DEFAULT_RM_CLIENT_THREADS));
+ clientServerConf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT));
this.server.start();
super.start();
}
@@ -205,8 +204,10 @@ public class ClientRMService extends Abs
throw new IOException("Application with id " + applicationId
+ " is already present! Cannot add a duplicate!");
}
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppManagerSubmitEvent(submissionContext));
+ // This needs to be synchronous as the client can query
+ // immediately following the submission to get the application status.
+ // So call handle directly and do not send an event.
+ rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext));
LOG.info("Application with id " + applicationId.getId() +
" submitted by user " + user + " with " + submissionContext);
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NMLivelinessMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NMLivelinessMonitor.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NMLivelinessMonitor.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NMLivelinessMonitor.java Tue Sep 13 22:49:27 2011
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.re
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
@@ -38,11 +39,11 @@ public class NMLivelinessMonitor extends
public void init(Configuration conf) {
super.init(conf);
- setExpireInterval(conf.getInt(RMConfig.NM_EXPIRY_INTERVAL,
- RMConfig.DEFAULT_NM_EXPIRY_INTERVAL));
+ setExpireInterval(conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS));
setMonitorInterval(conf.getInt(
- RMConfig.NMLIVELINESS_MONITORING_INTERVAL,
- RMConfig.DEFAULT_NMLIVELINESS_MONITORING_INTERVAL));
+ YarnConfiguration.RM_NM_LIVENESS_MONITOR_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_NM_LIVENESS_MONITOR_INTERVAL_MS));
}
@Override
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java Tue Sep 13 22:49:27 2011
@@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.service.AbstractService;
public class NodesListManager extends AbstractService{
@@ -48,18 +49,18 @@ public class NodesListManager extends Ab
try {
this.hostsReader =
new HostsFileReader(
- conf.get(RMConfig.RM_NODES_INCLUDE_FILE,
- RMConfig.DEFAULT_RM_NODES_INCLUDE_FILE),
- conf.get(RMConfig.RM_NODES_EXCLUDE_FILE,
- RMConfig.DEFAULT_RM_NODES_EXCLUDE_FILE)
+ conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
+ YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH),
+ conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
+ YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH)
);
printConfiguredHosts();
} catch (IOException ioe) {
LOG.warn("Failed to init hostsReader, disabling", ioe);
try {
this.hostsReader =
- new HostsFileReader(RMConfig.DEFAULT_RM_NODES_INCLUDE_FILE,
- RMConfig.DEFAULT_RM_NODES_EXCLUDE_FILE);
+ new HostsFileReader(YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH,
+ YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
} catch (IOException ioe2) {
// Should *never* happen
this.hostsReader = null;
@@ -74,10 +75,10 @@ public class NodesListManager extends Ab
return;
}
- LOG.debug("hostsReader: in=" + conf.get(RMConfig.RM_NODES_INCLUDE_FILE,
- RMConfig.DEFAULT_RM_NODES_INCLUDE_FILE) + " out=" +
- conf.get(RMConfig.RM_NODES_EXCLUDE_FILE,
- RMConfig.DEFAULT_RM_NODES_EXCLUDE_FILE));
+ LOG.debug("hostsReader: in=" + conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
+ YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH) + " out=" +
+ conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
+ YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH));
for (String include : hostsReader.getHosts()) {
LOG.debug("include: " + include);
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java?rev=1170378&r1=1170377&r2=1170378&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java Tue Sep 13 22:49:27 2011
@@ -18,32 +18,29 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
-import java.util.List;
import java.util.LinkedList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
-import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.util.StringUtils;
/**
* This class manages the list of applications for the resource manager.
@@ -52,7 +49,7 @@ public class RMAppManager implements Eve
private static final Log LOG = LogFactory.getLog(RMAppManager.class);
- private int completedAppsMax = RMConfig.DEFAULT_EXPIRE_APPLICATIONS_COMPLETED_MAX;
+ private int completedAppsMax = YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS;
private LinkedList<ApplicationId> completedApps = new LinkedList<ApplicationId>();
private final RMContext rmContext;
@@ -70,8 +67,8 @@ public class RMAppManager implements Eve
this.masterService = masterService;
this.conf = conf;
setCompletedAppsMax(conf.getInt(
- RMConfig.EXPIRE_APPLICATIONS_COMPLETED_MAX,
- RMConfig.DEFAULT_EXPIRE_APPLICATIONS_COMPLETED_MAX));
+ YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS,
+ YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS));
}
/**
@@ -154,7 +151,7 @@ public class RMAppManager implements Eve
}
}
- protected void setCompletedAppsMax(int max) {
+ protected synchronized void setCompletedAppsMax(int max) {
this.completedAppsMax = max;
}
@@ -213,7 +210,7 @@ public class RMAppManager implements Eve
}
}
- protected void submitApplication(ApplicationSubmissionContext submissionContext) {
+ protected synchronized void submitApplication(ApplicationSubmissionContext submissionContext) {
ApplicationId applicationId = submissionContext.getApplicationId();
RMApp application = null;
try {