You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2011/04/29 10:35:56 UTC
svn commit: r1097727 [5/5] - in /hadoop/mapreduce/branches/MR-279: ./
mr-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/
yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/
yarn/yarn-common/src/main/ja...
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java?rev=1097727&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java Fri Apr 29 08:35:53 2011
@@ -0,0 +1,298 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.io.IOException;
+
+import java.net.InetSocketAddress;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.mockito.ArgumentMatcher;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestContainerLocalizer {
+
+ static final Path basedir =
+ new Path("target", TestContainerLocalizer.class.getName());
+
+ @Test
+ @SuppressWarnings("unchecked") // mocked generics
+ public void testContainerLocalizerMain() throws Exception {
+ Configuration conf = new Configuration();
+ AbstractFileSystem spylfs =
+ spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+ // don't actually create dirs
+ doNothing().when(spylfs).mkdir(
+ isA(Path.class), isA(FsPermission.class), anyBoolean());
+ FileContext lfs = FileContext.getFileContext(spylfs, conf);
+ final String user = "yak";
+ final String appId = "app_RM_0";
+ final String cId = "container_0";
+ final InetSocketAddress nmAddr = new InetSocketAddress("foobar", 4344);
+ final Path logDir = lfs.makeQualified(new Path(basedir, "logs"));
+ final List<Path> localDirs = new ArrayList<Path>();
+ for (int i = 0; i < 4; ++i) {
+ localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
+ }
+ RecordFactory mockRF = getMockLocalizerRecordFactory();
+ ContainerLocalizer concreteLoc = new ContainerLocalizer(lfs, user,
+ appId, cId, logDir, localDirs,
+ new HashMap<LocalResource,Future<Path>>(), mockRF);
+ ContainerLocalizer localizer = spy(concreteLoc);
+
+ // return credential stream instead of opening local file
+ final Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+ System.out.println("SEED: " + seed);
+ DataInputBuffer appTokens = createFakeCredentials(r, 10);
+ Path tokenPath =
+ lfs.makeQualified(new Path(
+ String.format(ContainerLocalizer.TOKEN_FILE_FMT, cId)));
+ doReturn(new FSDataInputStream(new FakeFSDataInputStream(appTokens))
+ ).when(spylfs).open(tokenPath);
+
+ // mock heartbeat responses from NM
+ LocalizationProtocol nmProxy = mock(LocalizationProtocol.class);
+ LocalResource rsrcA = getMockRsrc(r, LocalResourceVisibility.PRIVATE);
+ LocalResource rsrcB = getMockRsrc(r, LocalResourceVisibility.PRIVATE);
+ LocalResource rsrcC = getMockRsrc(r, LocalResourceVisibility.APPLICATION);
+ LocalResource rsrcD = getMockRsrc(r, LocalResourceVisibility.PRIVATE);
+ when(nmProxy.heartbeat(isA(LocalizerStatus.class)))
+ .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
+ Collections.singletonList(rsrcA)))
+ .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
+ Collections.singletonList(rsrcB)))
+ .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
+ Collections.singletonList(rsrcC)))
+ .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
+ Collections.singletonList(rsrcD)))
+ .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.LIVE,
+ Collections.<LocalResource>emptyList()))
+ .thenReturn(new MockLocalizerHeartbeatResponse(LocalizerAction.DIE,
+ null));
+ doReturn(new FakeDownload(rsrcA.getResource().getFile(), true))
+ .when(localizer).download(isA(LocalDirAllocator.class), eq(rsrcA));
+ doReturn(new FakeDownload(rsrcB.getResource().getFile(), true))
+ .when(localizer).download(isA(LocalDirAllocator.class), eq(rsrcB));
+ doReturn(new FakeDownload(rsrcC.getResource().getFile(), true))
+ .when(localizer).download(isA(LocalDirAllocator.class), eq(rsrcC));
+ doReturn(new FakeDownload(rsrcD.getResource().getFile(), true))
+ .when(localizer).download(isA(LocalDirAllocator.class), eq(rsrcD));
+ doReturn(nmProxy).when(localizer).getProxy(nmAddr);
+ doNothing().when(localizer).sleep(anyInt());
+
+ // return result instantly for deterministic test
+ ExecutorService syncExec = mock(ExecutorService.class);
+ when(syncExec.submit(isA(Callable.class)))
+ .thenAnswer(new Answer<Future<Path>>() {
+ @Override
+ public Future<Path> answer(InvocationOnMock invoc)
+ throws Throwable {
+ Future<Path> done = mock(Future.class);
+ when(done.isDone()).thenReturn(true);
+ FakeDownload d = (FakeDownload) invoc.getArguments()[0];
+ when(done.get()).thenReturn(d.call());
+ return done;
+ }
+ });
+ doReturn(syncExec).when(localizer).createDownloadThreadPool();
+
+ // run localization
+ assertEquals(0, localizer.runLocalization(nmAddr));
+
+ // verify created cache, application dirs
+ for (Path p : localDirs) {
+ Path base = new Path(new Path(p, ContainerLocalizer.USERCACHE), user);
+ Path privcache = new Path(base, ContainerLocalizer.FILECACHE);
+ // $x/usercache/$user/filecache
+ verify(spylfs).mkdir(eq(privcache), isA(FsPermission.class), eq(true));
+ Path appDir =
+ new Path(base, new Path(ContainerLocalizer.APPCACHE, appId));
+ // $x/usercache/$user/appcache/$appId/filecache
+ Path appcache = new Path(appDir, ContainerLocalizer.FILECACHE);
+ verify(spylfs).mkdir(eq(appcache), isA(FsPermission.class), eq(true));
+ // $x/usercache/$user/appcache/$appId/output
+ Path appOutput = new Path(appDir, ContainerLocalizer.OUTPUTDIR);
+ verify(spylfs).mkdir(eq(appOutput), isA(FsPermission.class), eq(true));
+ }
+
+ // verify tokens read at expected location
+ verify(spylfs).open(tokenPath);
+
+ // verify log dir creation
+ verify(spylfs).mkdir(eq(logDir), isA(FsPermission.class), anyBoolean());
+
+ // verify downloaded resources reported to NM
+ verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcA)));
+ verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcB)));
+ verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcC)));
+ verify(nmProxy).heartbeat(argThat(new HBMatches(rsrcD)));
+
+ // verify all HB use localizerID provided
+ verify(nmProxy, never()).heartbeat(argThat(
+ new ArgumentMatcher<LocalizerStatus>() {
+ @Override
+ public boolean matches(Object o) {
+ LocalizerStatus status = (LocalizerStatus) o;
+ return !cId.equals(status.getLocalizerId());
+ }
+ }));
+ }
+
+ static class HBMatches extends ArgumentMatcher<LocalizerStatus> {
+ final LocalResource rsrc;
+ HBMatches(LocalResource rsrc) {
+ this.rsrc = rsrc;
+ }
+ @Override
+ public boolean matches(Object o) {
+ LocalizerStatus status = (LocalizerStatus) o;
+ for (LocalResourceStatus localized : status.getResources()) {
+ switch (localized.getStatus()) {
+ case FETCH_SUCCESS:
+ if (localized.getLocalPath().getFile().contains(
+ rsrc.getResource().getFile())) {
+ return true;
+ }
+ break;
+ default:
+ fail("Unexpected: " + localized.getStatus());
+ break;
+ }
+ }
+ return false;
+ }
+ }
+
+ static class FakeDownload implements Callable<Path> {
+ private final Path localPath;
+ private final boolean succeed;
+ FakeDownload(String absPath, boolean succeed) {
+ this.localPath = new Path("file:///localcache" + absPath);
+ this.succeed = succeed;
+ }
+ @Override
+ public Path call() throws IOException {
+ if (!succeed) {
+ throw new IOException("FAIL " + localPath);
+ }
+ return localPath;
+ }
+ }
+
+ static RecordFactory getMockLocalizerRecordFactory() {
+ RecordFactory mockRF = mock(RecordFactory.class);
+ when(mockRF.newRecordInstance(same(LocalResourceStatus.class)))
+ .thenAnswer(new Answer<LocalResourceStatus>() {
+ @Override
+ public LocalResourceStatus answer(InvocationOnMock invoc)
+ throws Throwable {
+ return new MockLocalResourceStatus();
+ }
+ });
+ when(mockRF.newRecordInstance(same(LocalizerStatus.class)))
+ .thenAnswer(new Answer<LocalizerStatus>() {
+ @Override
+ public LocalizerStatus answer(InvocationOnMock invoc)
+ throws Throwable {
+ return new MockLocalizerStatus();
+ }
+ });
+ return mockRF;
+ }
+
+ static LocalResource getMockRsrc(Random r,
+ LocalResourceVisibility vis) {
+ LocalResource rsrc = mock(LocalResource.class);
+
+ String name = Long.toHexString(r.nextLong());
+ URL uri = mock(org.apache.hadoop.yarn.api.records.URL.class);
+ when(uri.getScheme()).thenReturn("file");
+ when(uri.getHost()).thenReturn(null);
+ when(uri.getFile()).thenReturn("/local/" + vis + "/" + name);
+
+ when(rsrc.getResource()).thenReturn(uri);
+ when(rsrc.getSize()).thenReturn(r.nextInt(1024) + 1024L);
+ when(rsrc.getTimestamp()).thenReturn(r.nextInt(1024) + 2048L);
+ when(rsrc.getType()).thenReturn(LocalResourceType.FILE);
+ when(rsrc.getVisibility()).thenReturn(vis);
+
+ return rsrc;
+ }
+
+ static DataInputBuffer createFakeCredentials(Random r, int nTok)
+ throws IOException {
+ Credentials creds = new Credentials();
+ byte[] password = new byte[20];
+ Text kind = new Text();
+ Text service = new Text();
+ Text alias = new Text();
+ for (int i = 0; i < nTok; ++i) {
+ byte[] identifier = ("idef" + i).getBytes();
+ r.nextBytes(password);
+ kind.set("kind" + i);
+ service.set("service" + i);
+ alias.set("token" + i);
+ Token token = new Token(identifier, password, kind, service);
+ creds.addToken(alias, token);
+ }
+ DataOutputBuffer buf = new DataOutputBuffer();
+ creds.writeTokenStorageToStream(buf);
+ DataInputBuffer ret = new DataInputBuffer();
+ ret.reset(buf.getData(), 0, buf.getLength());
+ return ret;
+ }
+
+}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestFSDownload.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestFSDownload.java?rev=1097727&r1=1097726&r2=1097727&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestFSDownload.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestFSDownload.java Fri Apr 29 08:35:53 2011
@@ -20,15 +20,11 @@ package org.apache.hadoop.yarn.server.no
import java.io.IOException;
import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
-import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -38,7 +34,9 @@ import org.apache.hadoop.fs.FSDataOutput
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FSDownload;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -58,8 +56,11 @@ public class TestFSDownload {
fs.delete(new Path("target", TestFSDownload.class.getSimpleName()), true);
}
- static org.apache.hadoop.yarn.api.records.LocalResource createFile(FileContext files, Path p, int len, Random r)
- throws IOException, URISyntaxException {
+ static final RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+
+ static LocalResource createFile(FileContext files, Path p, int len,
+ Random r) throws IOException, URISyntaxException {
FSDataOutputStream out = null;
try {
byte[] bytes = new byte[len];
@@ -69,7 +70,7 @@ public class TestFSDownload {
} finally {
if (out != null) out.close();
}
- org.apache.hadoop.yarn.api.records.LocalResource ret = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(org.apache.hadoop.yarn.api.records.LocalResource.class);
+ LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
ret.setResource(ConverterUtils.getYarnUrlFromPath(p));
ret.setSize(len);
ret.setType(LocalResourceType.FILE);
@@ -87,39 +88,32 @@ public class TestFSDownload {
files.mkdir(basedir, null, true);
conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
- Collection<FSDownload> pending = new ArrayList<FSDownload>();
Random rand = new Random();
long sharedSeed = rand.nextLong();
rand.setSeed(sharedSeed);
System.out.println("SEED: " + sharedSeed);
+
+ Map<LocalResource,Future<Path>> pending =
+ new HashMap<LocalResource,Future<Path>>();
+ ExecutorService exec = Executors.newSingleThreadExecutor();
LocalDirAllocator dirs =
new LocalDirAllocator(TestFSDownload.class.getName());
int[] sizes = new int[10];
for (int i = 0; i < 10; ++i) {
sizes[i] = rand.nextInt(512) + 512;
- org.apache.hadoop.yarn.api.records.LocalResource rsrc = createFile(files, new Path(basedir, "" + i),
+ LocalResource rsrc = createFile(files, new Path(basedir, "" + i),
sizes[i], rand);
FSDownload fsd =
new FSDownload(files, conf, dirs, rsrc, new Random(sharedSeed));
- pending.add(fsd);
+ pending.put(rsrc, exec.submit(fsd));
}
- ExecutorService exec = Executors.newSingleThreadExecutor();
- CompletionService<Map<org.apache.hadoop.yarn.api.records.LocalResource,Path>> queue =
- new ExecutorCompletionService<Map<org.apache.hadoop.yarn.api.records.LocalResource,Path>>(exec);
try {
- for (FSDownload fsd : pending) {
- queue.submit(fsd);
- }
- Map<org.apache.hadoop.yarn.api.records.LocalResource,Path> results = new HashMap();
- for (int i = 0; i < 10; ++i) {
- Future<Map<org.apache.hadoop.yarn.api.records.LocalResource,Path>> result = queue.take();
- results.putAll(result.get());
- }
- for (Map.Entry<org.apache.hadoop.yarn.api.records.LocalResource,Path> localized : results.entrySet()) {
+ for (Map.Entry<LocalResource,Future<Path>> p : pending.entrySet()) {
+ Path localized = p.getValue().get();
assertEquals(
- sizes[Integer.valueOf(localized.getValue().getName())],
- localized.getKey().getSize() - 4096 - 16); // bad DU impl + .crc ; sigh
+ sizes[Integer.valueOf(localized.getName())],
+ p.getKey().getSize() - 4096 - 16); // bad DU impl + .crc ; sigh
}
} catch (ExecutionException e) {
throw new IOException("Failed exec", e);
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResource.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResource.java?rev=1097727&r1=1097726&r2=1097727&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResource.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResource.java Fri Apr 29 08:35:53 2011
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResource;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
import org.apache.hadoop.yarn.util.ConverterUtils;
import static org.apache.hadoop.yarn.api.records.LocalResourceType.*;
@@ -48,14 +48,14 @@ public class TestLocalResource {
return ret;
}
- static void checkEqual(LocalResource a, LocalResource b) {
+ static void checkEqual(LocalResourceRequest a, LocalResourceRequest b) {
assertEquals(a, b);
assertEquals(a.hashCode(), b.hashCode());
assertEquals(0, a.compareTo(b));
assertEquals(0, b.compareTo(a));
}
- static void checkNotEqual(LocalResource a, LocalResource b) {
+ static void checkNotEqual(LocalResourceRequest a, LocalResourceRequest b) {
assertFalse(a.equals(b));
assertFalse(b.equals(a));
assertFalse(a.hashCode() == b.hashCode());
@@ -75,40 +75,40 @@ public class TestLocalResource {
new Path("http://yak.org:80/foobar"), -1, basetime, FILE, PUBLIC);
org.apache.hadoop.yarn.api.records.LocalResource yB = getYarnResource(
new Path("http://yak.org:80/foobar"), -1, basetime, FILE, PUBLIC);
- final LocalResource a = new LocalResource(yA);
- LocalResource b = new LocalResource(yA);
+ final LocalResourceRequest a = new LocalResourceRequest(yA);
+ LocalResourceRequest b = new LocalResourceRequest(yA);
checkEqual(a, b);
- b = new LocalResource(yB);
+ b = new LocalResourceRequest(yB);
checkEqual(a, b);
// ignore visibility
yB = getYarnResource(
new Path("http://yak.org:80/foobar"), -1, basetime, FILE, PRIVATE);
- b = new LocalResource(yB);
+ b = new LocalResourceRequest(yB);
checkEqual(a, b);
// ignore size
yB = getYarnResource(
new Path("http://yak.org:80/foobar"), 0, basetime, FILE, PRIVATE);
- b = new LocalResource(yB);
+ b = new LocalResourceRequest(yB);
checkEqual(a, b);
// note path
yB = getYarnResource(
new Path("hdfs://dingo.org:80/foobar"), 0, basetime, ARCHIVE, PUBLIC);
- b = new LocalResource(yB);
+ b = new LocalResourceRequest(yB);
checkNotEqual(a, b);
// note type
yB = getYarnResource(
new Path("http://yak.org:80/foobar"), 0, basetime, ARCHIVE, PUBLIC);
- b = new LocalResource(yB);
+ b = new LocalResourceRequest(yB);
checkNotEqual(a, b);
// note timestamp
yB = getYarnResource(
new Path("http://yak.org:80/foobar"), 0, basetime + 1, FILE, PUBLIC);
- b = new LocalResource(yB);
+ b = new LocalResourceRequest(yB);
checkNotEqual(a, b);
}
@@ -121,24 +121,24 @@ public class TestLocalResource {
long basetime = r.nextLong() >>> 2;
org.apache.hadoop.yarn.api.records.LocalResource yA = getYarnResource(
new Path("http://yak.org:80/foobar"), -1, basetime, FILE, PUBLIC);
- final LocalResource a = new LocalResource(yA);
+ final LocalResourceRequest a = new LocalResourceRequest(yA);
// Path primary
org.apache.hadoop.yarn.api.records.LocalResource yB = getYarnResource(
new Path("http://yak.org:80/foobaz"), -1, basetime, FILE, PUBLIC);
- LocalResource b = new LocalResource(yB);
+ LocalResourceRequest b = new LocalResourceRequest(yB);
assertTrue(0 > a.compareTo(b));
// timestamp secondary
yB = getYarnResource(
new Path("http://yak.org:80/foobar"), -1, basetime + 1, FILE, PUBLIC);
- b = new LocalResource(yB);
+ b = new LocalResourceRequest(yB);
assertTrue(0 > a.compareTo(b));
// type tertiary
yB = getYarnResource(
new Path("http://yak.org:80/foobar"), -1, basetime, ARCHIVE, PUBLIC);
- b = new LocalResource(yB);
+ b = new LocalResourceRequest(yB);
assertTrue(0 != a.compareTo(b)); // don't care about order, just ne
}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java?rev=1097727&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java Fri Apr 29 08:35:53 2011
@@ -0,0 +1,244 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizedResource;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.mockito.ArgumentMatcher;
+import static org.mockito.Mockito.*;
+
+public class TestLocalizedResource {
+
+ static ContainerId getMockContainer(int id) {
+ ApplicationId appId = mock(ApplicationId.class);
+ when(appId.getClusterTimestamp()).thenReturn(314159265L);
+ when(appId.getId()).thenReturn(3);
+ ContainerId container = mock(ContainerId.class);
+ when(container.getId()).thenReturn(id);
+ when(container.getAppId()).thenReturn(appId);
+ return container;
+ }
+
+ @Test
+ @SuppressWarnings("unchecked") // mocked generic
+ public void testNotification() throws Exception {
+ DrainDispatcher dispatcher = new DrainDispatcher();
+ dispatcher.init(null);
+ try {
+ dispatcher.start();
+ EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
+ EventHandler<LocalizerEvent> localizerBus = mock(EventHandler.class);
+ dispatcher.register(ContainerEventType.class, containerBus);
+ dispatcher.register(LocalizerEventType.class, localizerBus);
+
+ // mock resource
+ LocalResource apiRsrc = createMockResource();
+
+ final ContainerId container0 = getMockContainer(0);
+ final Credentials creds0 = new Credentials();
+ final LocalResourceVisibility vis0 = LocalResourceVisibility.PRIVATE;
+ final LocalizerContext ctxt0 =
+ new LocalizerContext("yak", container0, creds0);
+ LocalResourceRequest rsrcA = new LocalResourceRequest(apiRsrc);
+ LocalizedResource local = new LocalizedResource(rsrcA, dispatcher);
+ local.handle(new ResourceRequestEvent(rsrcA, vis0, ctxt0));
+ dispatcher.await();
+
+ // Register C0, verify request event
+ LocalizerEventMatcher matchesL0Req =
+ new LocalizerEventMatcher(container0, creds0, vis0,
+ LocalizerEventType.REQUEST_RESOURCE_LOCALIZATION);
+ verify(localizerBus).handle(argThat(matchesL0Req));
+ assertEquals(ResourceState.DOWNLOADING, local.getState());
+
+ // Register C1, verify request event
+ final Credentials creds1 = new Credentials();
+ final ContainerId container1 = getMockContainer(1);
+ final LocalizerContext ctxt1 =
+ new LocalizerContext("yak", container1, creds1);
+ final LocalResourceVisibility vis1 = LocalResourceVisibility.PUBLIC;
+ local.handle(new ResourceRequestEvent(rsrcA, vis1, ctxt1));
+ dispatcher.await();
+ LocalizerEventMatcher matchesL1Req =
+ new LocalizerEventMatcher(container1, creds1, vis1,
+ LocalizerEventType.REQUEST_RESOURCE_LOCALIZATION);
+ verify(localizerBus).handle(argThat(matchesL1Req));
+
+ // Release C0 container localization, verify no notification
+ local.handle(new ResourceReleaseEvent(rsrcA, container0));
+ dispatcher.await();
+ verify(containerBus, never()).handle(isA(ContainerEvent.class));
+ assertEquals(ResourceState.DOWNLOADING, local.getState());
+
+ // Release C1 container localization, verify no notification
+ local.handle(new ResourceReleaseEvent(rsrcA, container1));
+ dispatcher.await();
+ verify(containerBus, never()).handle(isA(ContainerEvent.class));
+ assertEquals(ResourceState.INIT, local.getState());
+
+ // Register C2, C3
+ final ContainerId container2 = getMockContainer(2);
+ final LocalResourceVisibility vis2 = LocalResourceVisibility.PRIVATE;
+ final Credentials creds2 = new Credentials();
+ final LocalizerContext ctxt2 =
+ new LocalizerContext("yak", container2, creds2);
+
+ final ContainerId container3 = getMockContainer(3);
+ final LocalResourceVisibility vis3 = LocalResourceVisibility.PRIVATE;
+ final Credentials creds3 = new Credentials();
+ final LocalizerContext ctxt3 =
+ new LocalizerContext("yak", container3, creds3);
+
+ local.handle(new ResourceRequestEvent(rsrcA, vis2, ctxt2));
+ local.handle(new ResourceRequestEvent(rsrcA, vis3, ctxt3));
+ dispatcher.await();
+ LocalizerEventMatcher matchesL2Req =
+ new LocalizerEventMatcher(container2, creds2, vis2,
+ LocalizerEventType.REQUEST_RESOURCE_LOCALIZATION);
+ verify(localizerBus).handle(argThat(matchesL2Req));
+ LocalizerEventMatcher matchesL3Req =
+ new LocalizerEventMatcher(container3, creds3, vis3,
+ LocalizerEventType.REQUEST_RESOURCE_LOCALIZATION);
+ verify(localizerBus).handle(argThat(matchesL3Req));
+
+ // Successful localization. verify notification C2, C3
+ Path locA = new Path("file:///cache/rsrcA");
+ local.handle(new ResourceLocalizedEvent(rsrcA, locA, 10));
+ dispatcher.await();
+ ContainerEventMatcher matchesC2Localized =
+ new ContainerEventMatcher(container2,
+ ContainerEventType.RESOURCE_LOCALIZED);
+ ContainerEventMatcher matchesC3Localized =
+ new ContainerEventMatcher(container3,
+ ContainerEventType.RESOURCE_LOCALIZED);
+ verify(containerBus).handle(argThat(matchesC2Localized));
+ verify(containerBus).handle(argThat(matchesC3Localized));
+ assertEquals(ResourceState.LOCALIZED, local.getState());
+
+ // Register C4, verify notification
+ final ContainerId container4 = getMockContainer(4);
+ final Credentials creds4 = new Credentials();
+ final LocalizerContext ctxt4 =
+ new LocalizerContext("yak", container4, creds4);
+ final LocalResourceVisibility vis4 = LocalResourceVisibility.PRIVATE;
+ local.handle(new ResourceRequestEvent(rsrcA, vis4, ctxt4));
+ dispatcher.await();
+ ContainerEventMatcher matchesC4Localized =
+ new ContainerEventMatcher(container4,
+ ContainerEventType.RESOURCE_LOCALIZED);
+ verify(containerBus).handle(argThat(matchesC4Localized));
+ assertEquals(ResourceState.LOCALIZED, local.getState());
+ } finally {
+ dispatcher.stop();
+ }
+ }
+
+ @Test
+ public void testDirectLocalization() throws Exception {
+ DrainDispatcher dispatcher = new DrainDispatcher();
+ dispatcher.init(null);
+ try {
+ dispatcher.start();
+ LocalResource apiRsrc = createMockResource();
+ LocalResourceRequest rsrcA = new LocalResourceRequest(apiRsrc);
+ LocalizedResource local = new LocalizedResource(rsrcA, dispatcher);
+ Path p = new Path("file:///cache/rsrcA");
+ local.handle(new ResourceLocalizedEvent(rsrcA, p, 10));
+ dispatcher.await();
+ assertEquals(ResourceState.LOCALIZED, local.getState());
+ } finally {
+ dispatcher.stop();
+ }
+ }
+
+ static LocalResource createMockResource() {
+ // mock rsrc location
+ org.apache.hadoop.yarn.api.records.URL uriA =
+ mock(org.apache.hadoop.yarn.api.records.URL.class);
+ when(uriA.getScheme()).thenReturn("file");
+ when(uriA.getHost()).thenReturn(null);
+ when(uriA.getFile()).thenReturn("/localA/rsrc");
+
+ LocalResource apiRsrc = mock(LocalResource.class);
+ when(apiRsrc.getResource()).thenReturn(uriA);
+ when(apiRsrc.getTimestamp()).thenReturn(4344L);
+ when(apiRsrc.getType()).thenReturn(LocalResourceType.FILE);
+ return apiRsrc;
+ }
+
+
+ static class LocalizerEventMatcher extends ArgumentMatcher<LocalizerEvent> {
+ Credentials creds;
+ LocalResourceVisibility vis;
+ private final ContainerId idRef;
+ private final LocalizerEventType type;
+
+ public LocalizerEventMatcher(ContainerId idRef, Credentials creds,
+ LocalResourceVisibility vis, LocalizerEventType type) {
+ this.vis = vis;
+ this.type = type;
+ this.creds = creds;
+ this.idRef = idRef;
+ }
+ @Override
+ public boolean matches(Object o) {
+ if (!(o instanceof LocalizerResourceRequestEvent)) return false;
+ LocalizerResourceRequestEvent evt = (LocalizerResourceRequestEvent) o;
+ return idRef == evt.getContext().getContainer()
+ && type == evt.getType()
+ && vis == evt.getVisibility()
+ && creds == evt.getContext().getCredentials();
+ }
+ }
+
+ static class ContainerEventMatcher extends ArgumentMatcher<ContainerEvent> {
+ private final ContainerId idRef;
+ private final ContainerEventType type;
+ public ContainerEventMatcher(ContainerId idRef, ContainerEventType type) {
+ this.idRef = idRef;
+ this.type = type;
+ }
+ @Override
+ public boolean matches(Object o) {
+ if (!(o instanceof ContainerEvent)) return false;
+ ContainerEvent evt = (ContainerEvent) o;
+ return idRef == evt.getContainerID() && type == evt.getType();
+ }
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1097727&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Fri Apr 29 08:35:53 2011
@@ -0,0 +1,80 @@
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import static org.mockito.Mockito.*;
+
+import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOCAL_DIR;
+import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_LOCAL_DIR;
+
+public class TestResourceLocalizationService {
+
+ static final Path basedir =
+ new Path("target", TestResourceLocalizationService.class.getName());
+
+ @Test
+ public void testLocalizationInit() throws Exception {
+ final Configuration conf = new Configuration();
+ AsyncDispatcher dispatcher = new AsyncDispatcher();
+ dispatcher.init(null);
+
+ ContainerExecutor exec = mock(ContainerExecutor.class);
+ DeletionService delService = spy(new DeletionService(exec));
+ delService.init(null);
+ delService.start();
+
+ AbstractFileSystem spylfs =
+ spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+ FileContext lfs = FileContext.getFileContext(spylfs, conf);
+ doNothing().when(spylfs).mkdir(
+ isA(Path.class), isA(FsPermission.class), anyBoolean());
+
+ ResourceLocalizationService locService =
+ spy(new ResourceLocalizationService(dispatcher, exec, delService));
+ doReturn(lfs)
+ .when(locService).getLocalFileContext(isA(Configuration.class));
+ try {
+ dispatcher.start();
+ List<Path> localDirs = new ArrayList<Path>();
+ String[] sDirs = new String[4];
+ for (int i = 0; i < 4; ++i) {
+ localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
+ sDirs[i] = localDirs.get(i).toString();
+ }
+ conf.setStrings(NM_LOCAL_DIR, sDirs);
+
+ // initialize ResourceLocalizationService
+ locService.init(conf);
+
+ // verify directory creation
+ for (Path p : localDirs) {
+ Path usercache = new Path(p, ContainerLocalizer.USERCACHE);
+ verify(spylfs)
+ .mkdir(eq(usercache), isA(FsPermission.class), eq(true));
+ Path publicCache = new Path(p, ContainerLocalizer.FILECACHE);
+ verify(spylfs)
+ .mkdir(eq(publicCache), isA(FsPermission.class), eq(true));
+ Path nmPriv = new Path(p, ResourceLocalizationService.NM_PRIVATE_DIR);
+ verify(spylfs).mkdir(eq(nmPriv),
+ eq(ResourceLocalizationService.NM_PRIVATE_PERM), eq(true));
+ }
+ } finally {
+ dispatcher.stop();
+ delService.stop();
+ }
+ }
+
+}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java?rev=1097727&r1=1097726&r2=1097727&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java Fri Apr 29 08:35:53 2011
@@ -40,7 +40,6 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
@@ -48,6 +47,7 @@ import org.apache.hadoop.yarn.util.Build
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Before;
import org.junit.Test;
+import static org.mockito.Mockito.*;
public class TestNMWebServer {
@@ -84,16 +84,12 @@ public class TestNMWebServer {
Dispatcher dispatcher = new AsyncDispatcher();
String user = "nobody";
long clusterTimeStamp = 1234;
- Map<String, String> env = new HashMap<String, String>();
- Map<String, LocalResource> resources =
- new HashMap<String, LocalResource>();
- ByteBuffer containerTokens = ByteBuffer.allocate(0);
ApplicationId appId =
BuilderUtils.newApplicationId(recordFactory, clusterTimeStamp, 1);
- Application app =
- new ApplicationImpl(dispatcher, user, appId, env, resources,
- containerTokens);
- nmContext.getApplications().put(appId, app);
+ Application app = mock(Application.class);
+ when(app.getUser()).thenReturn(user);
+ when(app.getAppId()).thenReturn(appId);
+ nmContext.getApplications().put(appId, app);
ContainerId container1 =
BuilderUtils.newContainerId(recordFactory, appId, 0);
ContainerId container2 =
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java?rev=1097727&r1=1097726&r2=1097727&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java Fri Apr 29 08:35:53 2011
@@ -24,15 +24,16 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.NodeHealthCheckerService;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.service.AbstractService;
@@ -156,12 +157,8 @@ public class MiniYARNCluster extends Com
};
@Override
- protected
- org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater
- createNodeStatusUpdater(
- org.apache.hadoop.yarn.server.nodemanager.Context context,
- Dispatcher dispatcher,
- NodeHealthCheckerService healthChecker) {
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new NodeStatusUpdaterImpl(context, dispatcher,
healthChecker) {
@Override