You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/03/17 21:21:54 UTC
svn commit: r1082677 [32/38] - in /hadoop/mapreduce/branches/MR-279: ./
assembly/ ivy/ mr-client/ mr-client/hadoop-mapreduce-client-app/
mr-client/hadoop-mapreduce-client-app/src/
mr-client/hadoop-mapreduce-client-app/src/main/ mr-client/hadoop-mapredu...
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestApplicationLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestApplicationLocalizer.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestApplicationLocalizer.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestApplicationLocalizer.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,253 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.local.LocalFs;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ApplicationLocalizer;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FSDownload;
+
+import org.apache.hadoop.yarn.LocalResource;
+import org.apache.hadoop.yarn.LocalizationProtocol;
+import org.apache.hadoop.yarn.URL;
+
+import static org.apache.hadoop.yarn.LocalResourceType.*;
+import static org.apache.hadoop.yarn.LocalResourceVisibility.*;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Matchers;
+import static org.mockito.Mockito.*;
+
+public class TestApplicationLocalizer {
+
+ static final Path basedir =
+ new Path("target", TestApplicationLocalizer.class.getName());
+
+ private static final FsPermission urwx =
+ FsPermission.createImmutable((short) 0700);
+ private static final FsPermission urwx_gx =
+ FsPermission.createImmutable((short) 0710);
+
+ static DataInputBuffer createFakeCredentials(Random r, int nTok)
+ throws IOException {
+ Credentials creds = new Credentials();
+ byte[] password = new byte[20];
+ Text kind = new Text();
+ Text service = new Text();
+ Text alias = new Text();
+ for (int i = 0; i < nTok; ++i) {
+ byte[] identifier = ("idef" + i).getBytes();
+ r.nextBytes(password);
+ kind.set("kind" + i);
+ service.set("service" + i);
+ alias.set("token" + i);
+ Token token = new Token(identifier, password, kind, service);
+ creds.addToken(alias, token);
+ }
+ DataOutputBuffer buf = new DataOutputBuffer();
+ creds.writeTokenStorageToStream(buf);
+ DataInputBuffer ret = new DataInputBuffer();
+ ret.reset(buf.getData(), 0, buf.getLength());
+ return ret;
+ }
+
+ static Collection<LocalResource> createFakeResources(Random r, int nFiles,
+ Map<Long,LocalResource> sizes) throws IOException {
+ ArrayList<LocalResource> rsrc = new ArrayList<LocalResource>();
+ long basetime = r.nextLong() >>> 2;
+ for (int i = 0; i < nFiles; ++i) {
+ LocalResource resource = new LocalResource();
+ URL path = new URL();
+ path.scheme = "file";
+ path.host = null;
+ path.port = 0;
+ resource.timestamp = basetime + i;
+ r.setSeed(resource.timestamp);
+ sizes.put(r.nextLong() & Long.MAX_VALUE, resource);
+ StringBuilder sb = new StringBuilder("/" + r.nextLong());
+ while (r.nextInt(2) == 1) {
+ sb.append("/" + r.nextLong());
+ }
+ path.file = sb.toString();
+ resource.resource = path;
+ resource.size = -1;
+ resource.type = r.nextInt(2) == 1 ? FILE : ARCHIVE;
+ resource.state = PRIVATE;
+ rsrc.add(resource);
+ }
+ return rsrc;
+ }
+
+ static DataInputBuffer writeFakeAppFiles(Collection<LocalResource> rsrc)
+ throws IOException {
+ DataOutputBuffer dob = new DataOutputBuffer();
+ ApplicationLocalizer.writeResourceDescription(dob, rsrc);
+ DataInputBuffer dib = new DataInputBuffer();
+ dib.reset(dob.getData(), 0, dob.getLength());
+ return dib;
+ }
+
+ @Test
+ public void testLocalizationMain() throws IOException, InterruptedException {
+ Configuration conf = new Configuration();
+ AbstractFileSystem spylfs =
+ spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+ // don't actually create dirs
+ doNothing().when(spylfs).mkdir(Matchers.<Path>anyObject(),
+ Matchers.<FsPermission>anyObject(), anyBoolean());
+ FileContext lfs = FileContext.getFileContext(spylfs, conf);
+
+ // TODO mocked FileContext requires relative paths; LTC will provide abs
+ List<Path> localDirs = new ArrayList<Path>();
+ for (int i = 0; i < 4; ++i) {
+ localDirs.add(new Path(basedir,
+ new Path(i + "", ApplicationLocalizer.USERCACHE)));
+ }
+
+ final Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+ System.out.println("SEED: " + seed);
+ // return credential stream instead of opening local file
+ DataInputBuffer appTokens = createFakeCredentials(r, 10);
+ Path tokenPath =
+ lfs.makeQualified(new Path(ApplicationLocalizer.APPTOKEN_FILE));
+ doReturn(new FSDataInputStream(new FakeFSDataInputStream(appTokens))
+ ).when(spylfs).open(tokenPath);
+ // return file stream instead of opening local file
+ r.setSeed(seed);
+ System.out.println("SEED: " + seed);
+ final HashMap<Long,LocalResource> sizes = new HashMap<Long,LocalResource>();
+ Collection<LocalResource> resources = createFakeResources(r, 10, sizes);
+ DataInputBuffer appFiles = writeFakeAppFiles(resources);
+ Path filesPath =
+ lfs.makeQualified(new Path(ApplicationLocalizer.FILECACHE_FILE));
+ doReturn(new FSDataInputStream(new FakeFSDataInputStream(appFiles))
+ ).when(spylfs).open(filesPath);
+
+ final String user = "yak";
+ final String appId = "app_RM_0";
+ final InetSocketAddress nmAddr = new InetSocketAddress("foobar", 4344);
+ final Path logDir = new Path(basedir, "logs");
+ ApplicationLocalizer localizer = new ApplicationLocalizer(lfs, user,
+ appId, logDir, localDirs);
+ ApplicationLocalizer spyLocalizer = spy(localizer);
+ LocalizationProtocol mockLocalization = mock(LocalizationProtocol.class);
+ FSDownload mockDownload = mock(FSDownload.class);
+
+ // set to return mocks
+ doReturn(mockLocalization).when(spyLocalizer).getProxy(nmAddr);
+ for (Map.Entry<Long,LocalResource> rsrc : sizes.entrySet()) {
+ doReturn(new FalseDownload(rsrc.getValue(), rsrc.getKey())
+ ).when(spyLocalizer).download(Matchers.<LocalDirAllocator>anyObject(),
+ argThat(new LocalResourceMatches(rsrc.getValue())));
+ }
+ assertEquals(0, spyLocalizer.runLocalization(nmAddr));
+
+ // verify app files opened
+ verify(spylfs).open(tokenPath);
+ verify(spylfs).open(filesPath);
+ ArgumentMatcher<CharSequence> userMatch =
+ new ArgumentMatcher<CharSequence>() {
+ @Override
+ public boolean matches(Object o) {
+ return "yak".equals(o.toString());
+ }
+ };
+ for (final Map.Entry<Long,LocalResource> rsrc : sizes.entrySet()) {
+ ArgumentMatcher<LocalResource> localizedMatch =
+ new ArgumentMatcher<LocalResource>() {
+ @Override
+ public boolean matches(Object o) {
+ LocalResource other = (LocalResource) o;
+ r.setSeed(rsrc.getValue().timestamp);
+ boolean ret = (r.nextLong() & Long.MAX_VALUE) == other.size;
+ StringBuilder sb = new StringBuilder("/" + r.nextLong());
+ while (r.nextInt(2) == 1) {
+ sb.append("/" + r.nextLong());
+ }
+ ret &= other.resource.file.toString().equals(sb.toString());
+ ret &= other.type.equals(r.nextInt(2) == 1 ? FILE : ARCHIVE);
+ return ret;
+ }
+ };
+ ArgumentMatcher<URL> dstMatch =
+ new ArgumentMatcher<URL>() {
+ @Override
+ public boolean matches(Object o) {
+ r.setSeed(rsrc.getValue().timestamp);
+ return ((URL)o).file.toString().equals(
+ "/done/" + (r.nextLong() & Long.MAX_VALUE));
+ }
+ };
+ verify(mockLocalization).successfulLocalization(
+ argThat(userMatch), argThat(localizedMatch), argThat(dstMatch));
+ }
+ }
+
+ static class FalseDownload extends FSDownload {
+ private final long size;
+ public FalseDownload(LocalResource resource, long size) {
+ super(null, null, null, resource, null);
+ this.size = size;
+ }
+ @Override
+ public Map<LocalResource,Path> call() {
+ LocalResource ret = getResource();
+ ret.size = size;
+ return Collections.singletonMap(ret, new Path("/done/" + size));
+ }
+ }
+
+ // sigh.
+ class LocalResourceMatches extends ArgumentMatcher<LocalResource> {
+ final LocalResource rsrc;
+ LocalResourceMatches(LocalResource rsrc) {
+ this.rsrc = rsrc;
+ }
+ @Override
+ public boolean matches(Object o) {
+ return rsrc.timestamp == ((LocalResource)o).timestamp;
+ }
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestFSDownload.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestFSDownload.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestFSDownload.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestFSDownload.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,131 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FSDownload;
+import org.apache.hadoop.yarn.util.AvroUtil;
+
+import static org.apache.hadoop.fs.CreateFlag.*;
+
+import org.apache.hadoop.yarn.LocalResource;
+import org.apache.hadoop.yarn.LocalResourceType;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestFSDownload {
+
+ @AfterClass
+ public static void deleteTestDir() throws IOException {
+ FileContext fs = FileContext.getLocalFSFileContext();
+ fs.delete(new Path("target", TestFSDownload.class.getSimpleName()), true);
+ }
+
+ static LocalResource createFile(FileContext files, Path p, int len, Random r)
+ throws IOException, URISyntaxException {
+ FSDataOutputStream out = null;
+ try {
+ byte[] bytes = new byte[len];
+ out = files.create(p, EnumSet.of(CREATE, OVERWRITE));
+ r.nextBytes(bytes);
+ out.write(bytes);
+ } finally {
+ if (out != null) out.close();
+ }
+ LocalResource ret = new LocalResource();
+ ret.resource = AvroUtil.getYarnUrlFromPath(p);
+ ret.size = len;
+ ret.type = LocalResourceType.FILE;
+ ret.timestamp = files.getFileStatus(p).getModificationTime();
+ return ret;
+ }
+
+ @Test
+ public void testDownload() throws IOException, URISyntaxException,
+ InterruptedException {
+ Configuration conf = new Configuration();
+ FileContext files = FileContext.getLocalFSFileContext(conf);
+ final Path basedir = files.makeQualified(new Path("target",
+ TestFSDownload.class.getSimpleName()));
+ files.mkdir(basedir, null, true);
+ conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
+
+ Collection<FSDownload> pending = new ArrayList<FSDownload>();
+ Random rand = new Random();
+ long sharedSeed = rand.nextLong();
+ rand.setSeed(sharedSeed);
+ System.out.println("SEED: " + sharedSeed);
+ LocalDirAllocator dirs =
+ new LocalDirAllocator(TestFSDownload.class.getName());
+ int[] sizes = new int[10];
+ for (int i = 0; i < 10; ++i) {
+ sizes[i] = rand.nextInt(512) + 512;
+ LocalResource rsrc = createFile(files, new Path(basedir, "" + i),
+ sizes[i], rand);
+ FSDownload fsd =
+ new FSDownload(files, conf, dirs, rsrc, new Random(sharedSeed));
+ pending.add(fsd);
+ }
+
+ ExecutorService exec = Executors.newSingleThreadExecutor();
+ CompletionService<Map<LocalResource,Path>> queue =
+ new ExecutorCompletionService<Map<LocalResource,Path>>(exec);
+ try {
+ for (FSDownload fsd : pending) {
+ queue.submit(fsd);
+ }
+ Map<LocalResource,Path> results = new HashMap();
+ for (int i = 0; i < 10; ++i) {
+ Future<Map<LocalResource,Path>> result = queue.take();
+ results.putAll(result.get());
+ }
+ for (Map.Entry<LocalResource,Path> localized : results.entrySet()) {
+ assertEquals(
+ sizes[Integer.valueOf(localized.getValue().getName())],
+ localized.getKey().size - 4096 - 16); // bad DU impl + .crc ; sigh
+ }
+ } catch (ExecutionException e) {
+ throw new IOException("Failed exec", e);
+ } finally {
+ exec.shutdown();
+ }
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResource.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResource.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResource.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResource.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,144 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.net.URISyntaxException;
+import java.util.Random;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResource;
+import org.apache.hadoop.yarn.util.AvroUtil;
+
+import org.apache.hadoop.yarn.LocalResourceType;
+import org.apache.hadoop.yarn.LocalResourceVisibility;
+import static org.apache.hadoop.yarn.LocalResourceType.*;
+import static org.apache.hadoop.yarn.LocalResourceVisibility.*;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestLocalResource {
+
+ static org.apache.hadoop.yarn.LocalResource getYarnResource(Path p, long size,
+ long timestamp, LocalResourceType type, LocalResourceVisibility state)
+ throws URISyntaxException {
+ org.apache.hadoop.yarn.LocalResource ret = new org.apache.hadoop.yarn.LocalResource();
+ ret.resource = AvroUtil.getYarnUrlFromURI(p.toUri());
+ ret.size = size;
+ ret.timestamp = timestamp;
+ ret.type = type;
+ ret.state = state;
+ return ret;
+ }
+
+ static void checkEqual(LocalResource a, LocalResource b) {
+ assertEquals(a, b);
+ assertEquals(a.hashCode(), b.hashCode());
+ assertEquals(0, a.compareTo(b));
+ assertEquals(0, b.compareTo(a));
+ }
+
+ static void checkNotEqual(LocalResource a, LocalResource b) {
+ assertFalse(a.equals(b));
+ assertFalse(b.equals(a));
+ assertFalse(a.hashCode() == b.hashCode());
+ assertFalse(0 == a.compareTo(b));
+ assertFalse(0 == b.compareTo(a));
+ }
+
+ @Test
+ public void testResourceEquality() throws URISyntaxException {
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+ System.out.println("SEED: " + seed);
+
+ long basetime = r.nextLong() >>> 2;
+ org.apache.hadoop.yarn.LocalResource yA = getYarnResource(
+ new Path("http://yak.org:80/foobar"), -1, basetime, FILE, PUBLIC);
+ org.apache.hadoop.yarn.LocalResource yB = getYarnResource(
+ new Path("http://yak.org:80/foobar"), -1, basetime, FILE, PUBLIC);
+ final LocalResource a = new LocalResource(yA);
+ LocalResource b = new LocalResource(yA);
+ checkEqual(a, b);
+ b = new LocalResource(yB);
+ checkEqual(a, b);
+
+ // ignore visibility
+ yB = getYarnResource(
+ new Path("http://yak.org:80/foobar"), -1, basetime, FILE, PRIVATE);
+ b = new LocalResource(yB);
+ checkEqual(a, b);
+
+ // ignore size
+ yB = getYarnResource(
+ new Path("http://yak.org:80/foobar"), 0, basetime, FILE, PRIVATE);
+ b = new LocalResource(yB);
+ checkEqual(a, b);
+
+ // note path
+ yB = getYarnResource(
+ new Path("hdfs://dingo.org:80/foobar"), 0, basetime, ARCHIVE, PUBLIC);
+ b = new LocalResource(yB);
+ checkNotEqual(a, b);
+
+ // note type
+ yB = getYarnResource(
+ new Path("http://yak.org:80/foobar"), 0, basetime, ARCHIVE, PUBLIC);
+ b = new LocalResource(yB);
+ checkNotEqual(a, b);
+
+ // note timestamp
+ yB = getYarnResource(
+ new Path("http://yak.org:80/foobar"), 0, basetime + 1, FILE, PUBLIC);
+ b = new LocalResource(yB);
+ checkNotEqual(a, b);
+ }
+
+ @Test
+ public void testResourceOrder() throws URISyntaxException {
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+ System.out.println("SEED: " + seed);
+ long basetime = r.nextLong() >>> 2;
+ org.apache.hadoop.yarn.LocalResource yA = getYarnResource(
+ new Path("http://yak.org:80/foobar"), -1, basetime, FILE, PUBLIC);
+ final LocalResource a = new LocalResource(yA);
+
+ // Path primary
+ org.apache.hadoop.yarn.LocalResource yB = getYarnResource(
+ new Path("http://yak.org:80/foobaz"), -1, basetime, FILE, PUBLIC);
+ LocalResource b = new LocalResource(yB);
+ assertTrue(0 > a.compareTo(b));
+
+ // timestamp secondary
+ yB = getYarnResource(
+ new Path("http://yak.org:80/foobar"), -1, basetime + 1, FILE, PUBLIC);
+ b = new LocalResource(yB);
+ assertTrue(0 > a.compareTo(b));
+
+ // type tertiary
+ yB = getYarnResource(
+ new Path("http://yak.org:80/foobar"), -1, basetime, ARCHIVE, PUBLIC);
+ b = new LocalResource(yB);
+ assertTrue(0 != a.compareTo(b)); // don't care about order, just ne
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResources.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResources.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResources.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResources.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,220 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.*;
+import static java.util.concurrent.TimeUnit.*;
+
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.AppLocalizationRunnerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResource;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourcesTrackerImpl;
+import org.apache.hadoop.yarn.util.AvroUtil;
+
+import static org.apache.hadoop.yarn.LocalResourceType.*;
+import static org.apache.hadoop.yarn.LocalResourceVisibility.*;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import static org.mockito.Mockito.*;
+
+public class TestLocalResources {
+
+ private static List<org.apache.hadoop.yarn.LocalResource>
+ randResources(Random r, int nRsrc) throws URISyntaxException {
+ final List<org.apache.hadoop.yarn.LocalResource> ret =
+ new ArrayList<org.apache.hadoop.yarn.LocalResource>(nRsrc);
+ Path base = new Path("file:///foo/bar");
+ long basetime = r.nextLong() >>> 2;
+ for (int i = 0; i < nRsrc; ++i) {
+ org.apache.hadoop.yarn.LocalResource rsrc = new org.apache.hadoop.yarn.LocalResource();
+ rsrc.timestamp = basetime + i;
+ r.setSeed(rsrc.timestamp);
+ Path p = new Path(base, String.valueOf(r.nextInt(Integer.MAX_VALUE)));
+ while (r.nextInt(2) == 1) {
+ p = new Path(p, String.valueOf(r.nextInt(Integer.MAX_VALUE)));
+ }
+ rsrc.resource = AvroUtil.getYarnUrlFromPath(p);
+ rsrc.size = -1;
+ rsrc.type = r.nextInt(2) == 1 ? FILE : ARCHIVE;
+ rsrc.state = PRIVATE;
+
+ System.out.println("RSRC: " + rsrc);
+ ret.add(rsrc);
+ }
+ return ret;
+ }
+
+ private static void verify(
+ BlockingQueue<Future<Map<LocalResource,Path>>> doneQueue,
+ Collection<org.apache.hadoop.yarn.LocalResource> files)
+ throws ExecutionException, InterruptedException, URISyntaxException {
+ for (Future<Map<LocalResource,Path>> f = doneQueue.poll(); f != null;
+ f = doneQueue.poll()) {
+ Map<LocalResource,Path> q = f.get();
+ assertEquals(1, q.size());
+ for (Map.Entry<LocalResource,Path> loc : q.entrySet()) {
+ boolean found = false;
+ for (org.apache.hadoop.yarn.LocalResource yrsrc : files) {
+ LocalResource rsrc = new LocalResource(yrsrc);
+ found |= rsrc.equals(loc.getKey());
+ }
+ assertTrue(found);
+ assertEquals(new Path("file:///yak/" + loc.getKey().getTimestamp()),
+ loc.getValue());
+ }
+ }
+ }
+
+ private static AppLocalizationRunnerImpl getMockAppLoc(
+ final BlockingQueue<Future<Map<LocalResource,Path>>> doneQueue,
+ String name) {
+ AppLocalizationRunnerImpl mockAppLoc = mock(AppLocalizationRunnerImpl.class);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) {
+ doneQueue.offer(
+ (Future<Map<LocalResource,Path>>)invocation.getArguments()[0]);
+ return null;
+ }
+ }).when(mockAppLoc).localizedResource(
+ Matchers.<Future<Map<LocalResource,Path>>>anyObject());
+ when(mockAppLoc.toString()).thenReturn(name);
+ return mockAppLoc;
+ }
+
+ static void successfulLoc(LocalResourcesTrackerImpl rsrcMap,
+ List<org.apache.hadoop.yarn.LocalResource> rsrc)
+ throws InterruptedException, URISyntaxException {
+ long i = 0;
+ for (org.apache.hadoop.yarn.LocalResource yRsrc : rsrc) {
+ yRsrc.size = ++i;
+ rsrcMap.setSuccess(new LocalResource(yRsrc), yRsrc.size,
+ new Path("file:///yak/" + yRsrc.timestamp));
+ }
+ }
+
+ static void failedLoc(
+ BlockingQueue<Future<Map<LocalResource,Path>>> doneQueue) {
+ try {
+ Future<Map<LocalResource,Path>> f = doneQueue.poll();
+ f.get();
+ fail("Failed localization succeeded");
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail("Wrong exception");
+ } catch (ExecutionException e) {
+ /* expected */
+ }
+ }
+
+ @Test
+ public void testLocalResourceAcquire() throws Exception {
+ final int NUM_EXP = 4;
+ final int NUM_URIS = 1 << NUM_EXP;
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+ System.out.println("SEED: " + seed);
+ // shared resource map
+ LocalResourcesTrackerImpl rsrcMap = new LocalResourcesTrackerImpl();
+ List<org.apache.hadoop.yarn.LocalResource> resourcesA = randResources(r, NUM_URIS);
+
+ // set up application A mocks
+ final BlockingQueue<Future<Map<LocalResource,Path>>> doneQueueA =
+ new LinkedBlockingQueue<Future<Map<LocalResource,Path>>>();
+ AppLocalizationRunnerImpl mockAppLocA = getMockAppLoc(doneQueueA, "A");
+
+ // set up application B mocks
+ final BlockingQueue<Future<Map<LocalResource,Path>>> doneQueueB =
+ new LinkedBlockingQueue<Future<Map<LocalResource,Path>>>();
+ AppLocalizationRunnerImpl mockAppLocB = getMockAppLoc(doneQueueB, "B");
+
+ Collection<org.apache.hadoop.yarn.LocalResource> todoA =
+ rsrcMap.register(mockAppLocA, resourcesA);
+ // ensure no rsrc added until reported back
+ assertEquals(NUM_URIS, todoA.size());
+ assertTrue(doneQueueA.isEmpty());
+
+ // complete half A localization
+ successfulLoc(rsrcMap, resourcesA.subList(0, NUM_URIS >>> 1));
+
+ // verify callback
+ assertEquals(NUM_URIS >>> 1, doneQueueA.size());
+ verify(doneQueueA, todoA);
+
+ // set up B localization as 1/4 localized A rsrc, 1/4 non-localized A rsrc,
+ // 1/2 new rsrc
+ long seed2 = r.nextLong();
+ r.setSeed(seed2);
+ System.out.println("SEED: " + seed2);
+ List<org.apache.hadoop.yarn.LocalResource> resourcesB =
+ randResources(r, NUM_URIS >>> 1);
+ resourcesB.addAll(resourcesA.subList(NUM_URIS >>> 2,
+ NUM_URIS - (NUM_URIS >>> 2)));
+ Collection<org.apache.hadoop.yarn.LocalResource> todoB =
+ rsrcMap.register(mockAppLocB, resourcesB);
+ // all completed A rsrc
+ assertEquals(NUM_URIS >>> 2, doneQueueB.size());
+ // only uniq rsrc, not in A
+ assertEquals(NUM_URIS >>> 1, todoB.size());
+ // verify completed rsrc in B done queue completed by A
+ verify(doneQueueB, todoA);
+
+ // complete A localization, less 1 shared rsrc
+ successfulLoc(rsrcMap, resourcesA.subList((NUM_URIS >>> 1) + 1, NUM_URIS));
+ // completed A rsrc in B
+ assertEquals((NUM_URIS >>> 2) - 1, doneQueueB.size());
+ verify(doneQueueB, todoA);
+ assertEquals((NUM_URIS >>> 1) - 1, doneQueueA.size());
+ verify(doneQueueA, todoA);
+
+ // fail shared localization
+ rsrcMap.removeFailedResource(new LocalResource(resourcesA.get(NUM_URIS >>> 1)),
+ RPCUtil.getRemoteException(new IOException("FAIL!")));
+ assertEquals(1, doneQueueA.size());
+ assertEquals(1, doneQueueB.size());
+ failedLoc(doneQueueA);
+ failedLoc(doneQueueB);
+
+ // verify cleared
+ Collection<org.apache.hadoop.yarn.LocalResource> todoA2 =
+ rsrcMap.register(mockAppLocA, Collections.singletonList(
+ resourcesA.get(NUM_URIS >>> 1)));
+ assertEquals(1, todoA2.size());
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/pom.xml?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/pom.xml (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/pom.xml Thu Mar 17 20:21:13 2011
@@ -0,0 +1,35 @@
+<?xml version="1.0"?>
+<project>
+ <parent>
+ <artifactId>yarn-server</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ <version>${yarn.version}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>yarn-server-resourcemanager</artifactId>
+ <name>yarn-server-resourcemanager</name>
+ <version>${yarn.version}</version>
+ <url>http://maven.apache.org</url>
+
+ <dependencies>
+ <!-- begin MNG-4223 workaround -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>yarn-api</artifactId>
+ <version>${yarn.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>yarn-common</artifactId>
+ <version>${yarn.version}</version>
+ </dependency>
+ <!-- end MNG-4223 workaround -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>yarn-server-common</artifactId>
+ <version>${yarn.version}</version>
+ </dependency>
+ </dependencies>
+
+</project>
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,189 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.ipc.Server;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.yarn.AMRMProtocol;
+import org.apache.hadoop.yarn.AMResponse;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ApplicationMaster;
+import org.apache.hadoop.yarn.ApplicationStatus;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
+import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.ASMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationMasterHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationTrackerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+@Private
+public class ApplicationMasterService extends AbstractService implements
+AMRMProtocol, EventHandler<ASMEvent<ApplicationTrackerEventType>> {
+ private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class);
+ private ApplicationMasterHandler applicationsManager;
+ private YarnScheduler rScheduler;
+ private ApplicationTokenSecretManager appTokenManager;
+ private InetSocketAddress masterServiceAddress;
+ private Server server;
+ private Map<ApplicationID, AMResponse> responseMap =
+ new HashMap<ApplicationID, AMResponse>();
+ private final AMResponse reboot = new AMResponse();
+ private final ASMContext asmContext;
+
+ public ApplicationMasterService(ApplicationTokenSecretManager appTokenManager,
+ ApplicationMasterHandler applicationsManager, YarnScheduler scheduler, ASMContext asmContext) {
+ super(ApplicationMasterService.class.getName());
+ this.appTokenManager = appTokenManager;
+ this.applicationsManager = applicationsManager;
+ this.rScheduler = scheduler;
+ this.reboot.reboot = true;
+ this.reboot.containers = new ArrayList<Container>();
+ this.asmContext = asmContext;
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ String bindAddress =
+ conf.get(YarnConfiguration.SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_SCHEDULER_BIND_ADDRESS);
+ masterServiceAddress = NetUtils.createSocketAddr(bindAddress);
+ this.asmContext.getDispatcher().register(ApplicationTrackerEventType.class, this);
+ super.init(conf);
+ }
+
+ public void start() {
+ YarnRPC rpc = YarnRPC.create(getConfig());
+ Configuration serverConf = new Configuration(getConfig());
+ serverConf.setClass(
+ CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
+ SchedulerSecurityInfo.class, SecurityInfo.class);
+ this.server =
+ rpc.getServer(AMRMProtocol.class, this, masterServiceAddress,
+ serverConf, this.appTokenManager);
+ this.server.start();
+ super.start();
+ }
+ @Override
+ public Void registerApplicationMaster(ApplicationMaster applicationMaster)
+ throws AvroRemoteException {
+ try {
+ applicationsManager.registerApplicationMaster(applicationMaster);
+ } catch(IOException ie) {
+ LOG.info("Exception registering application ", ie);
+ throw RPCUtil.getRemoteException(ie);
+ }
+ return null;
+ }
+
+ @Override
+ public Void finishApplicationMaster(ApplicationMaster applicationMaster)
+ throws AvroRemoteException {
+ try {
+ applicationsManager.finishApplicationMaster(applicationMaster);
+ } catch(IOException ie) {
+ LOG.info("Exception finishing application", ie);
+ throw RPCUtil.getRemoteException(ie);
+ }
+ return null;
+ }
+
+ @Override
+ public AMResponse allocate(ApplicationStatus status,
+ List<ResourceRequest> ask, List<Container> release)
+ throws AvroRemoteException {
+ try {
+ /* check if its in cache */
+ synchronized(responseMap) {
+ AMResponse lastResponse = responseMap.get(status.applicationId);
+ if (lastResponse == null) {
+ LOG.error("Application doesnt exist in cache " + status.applicationId);
+ return reboot;
+ }
+ if ((status.responseID + 1) == lastResponse.responseId) {
+ /* old heartbeat */
+ return lastResponse;
+ } else if (status.responseID + 1 < lastResponse.responseId) {
+ LOG.error("Invalid responseid from application " + status.applicationId);
+ return reboot;
+ }
+ applicationsManager.applicationHeartbeat(status);
+ List<Container> containers = rScheduler.allocate(status.applicationId, ask, release);
+ AMResponse response = new AMResponse();
+ response.containers = containers;
+ response.responseId = lastResponse.responseId + 1;
+ responseMap.put(status.applicationId, response);
+ return response;
+ }
+ } catch(IOException ie) {
+ LOG.info("Error in allocation for " + status.applicationId, ie);
+ throw RPCUtil.getRemoteException(ie);
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (this.server != null) {
+ this.server.close();
+ }
+ super.stop();
+ }
+
+ @Override
+ public void handle(ASMEvent<ApplicationTrackerEventType> appEvent) {
+ ApplicationTrackerEventType event = appEvent.getType();
+ ApplicationID id = appEvent.getAppContext().getApplicationID();
+ synchronized(responseMap) {
+ switch (event) {
+ case ADD:
+ AMResponse response = new AMResponse();
+ response.containers = null;
+ response.responseId = 0;
+ responseMap.put(id, response);
+ break;
+ case REMOVE:
+ responseMap.remove(id);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,137 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.ipc.Server;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTracker;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ApplicationMaster;
+import org.apache.hadoop.yarn.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.ClientRMProtocol;
+import org.apache.hadoop.yarn.YarnClusterMetrics;
+
+/**
+ * The client interface to the Resource Manager. This module handles all the rpc
+ * interfaces to the resource manager from the client.
+ */
+public class ClientRMService extends AbstractService implements ClientRMProtocol {
+ private static final Log LOG = LogFactory.getLog(ClientRMService.class);
+ private RMResourceTracker clusterInfo;
+ private ApplicationsManager applicationsManager;
+ private String clientServiceBindAddress;
+ private Server server;
+ InetSocketAddress clientBindAddress;
+
+ public ClientRMService(ApplicationsManager applicationsManager,
+ RMResourceTracker clusterInfo) {
+ super(ClientRMService.class.getName());
+ this.clusterInfo = clusterInfo;
+ this.applicationsManager = applicationsManager;
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ clientServiceBindAddress =
+ conf.get(YarnConfiguration.APPSMANAGER_ADDRESS,
+ YarnConfiguration.DEFAULT_APPSMANAGER_BIND_ADDRESS);
+ clientBindAddress =
+ NetUtils.createSocketAddr(clientServiceBindAddress);
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ // All the clients to appsManager are supposed to be authenticated via
+ // Kerberos if security is enabled, so no secretManager.
+ YarnRPC rpc = YarnRPC.create(getConfig());
+ Configuration clientServerConf = new Configuration(getConfig());
+ clientServerConf.setClass(
+ CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
+ ClientRMSecurityInfo.class, SecurityInfo.class);
+ this.server =
+ rpc.getServer(ClientRMProtocol.class, this,
+ clientBindAddress,
+ clientServerConf, null);
+ this.server.start();
+ super.start();
+ }
+
+ @Override
+ public ApplicationID getNewApplicationId() throws AvroRemoteException {
+ return applicationsManager.getNewApplicationID();
+ }
+
+ @Override
+ public ApplicationMaster getApplicationMaster(ApplicationID applicationId)
+ throws AvroRemoteException {
+ return applicationsManager.getApplicationMaster(applicationId);
+ }
+
+ @Override
+ public Void submitApplication(ApplicationSubmissionContext context)
+ throws AvroRemoteException {
+ try {
+ applicationsManager.submitApplication(context);
+ } catch (IOException ie) {
+ LOG.info("Exception in submitting application", ie);
+ throw RPCUtil.getRemoteException(ie);
+ }
+ return null;
+ }
+
+ @Override
+ public Void finishApplication(ApplicationID applicationId)
+ throws AvroRemoteException {
+ try {
+ applicationsManager.finishApplication(applicationId);
+ } catch(IOException ie) {
+ LOG.info("Error finishing application ", ie);
+ }
+ return null;
+ }
+
+ @Override
+ public YarnClusterMetrics getClusterMetrics() throws AvroRemoteException {
+ return clusterInfo.getClusterMetrics();
+ }
+
+ @Override
+ public void stop() {
+ if (this.server != null) {
+ this.server.close();
+ }
+ super.stop();
+ }
+}
\ No newline at end of file
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMConfig.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMConfig.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMConfig.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,26 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+public class RMConfig {
+ public static final String RM_KEYTAB = YarnConfiguration.RM_PREFIX
+ + "keytab";
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,227 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+
+import java.io.IOException;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManagerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.SyncDispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.webapp.WebApp;
+import org.apache.hadoop.yarn.webapp.WebApps;
+
+/**
+ * The ResourceManager is the main class that is a set of components.
+ *
+ */
+public class ResourceManager extends CompositeService {
+ private static final Log LOG = LogFactory.getLog(ResourceManager.class);
+ public static final long clusterTimeStamp = System.currentTimeMillis();
+ private YarnConfiguration conf;
+
+ private ApplicationsManagerImpl applicationsManager;
+
+ private ContainerTokenSecretManager containerTokenSecretManager =
+ new ContainerTokenSecretManager();
+
+ private ApplicationTokenSecretManager appTokenSecretManager =
+ new ApplicationTokenSecretManager();
+
+
+ private ResourceScheduler scheduler;
+ private RMResourceTrackerImpl rmResourceTracker;
+ private ClientRMService clientRM;
+ private ApplicationMasterService masterService;
+ private Boolean shutdown = false;
+ private WebApp webApp;
+ private final ASMContext asmContext;
+
+ public ResourceManager() {
+ super("ResourceManager");
+ this.asmContext = new ASMContextImpl();
+ }
+
+
+ public interface ASMContext {
+ public SyncDispatcher getDispatcher();
+ }
+
+ public static class ASMContextImpl implements ASMContext {
+ private final SyncDispatcher asmEventDispatcher;
+
+ public ASMContextImpl() {
+ this.asmEventDispatcher = new SyncDispatcher();
+ }
+
+ @Override
+ public SyncDispatcher getDispatcher() {
+ return this.asmEventDispatcher;
+ }
+ }
+
+ @Override
+ public synchronized void init(Configuration conf) {
+ // Initialize the config
+ this.conf = new YarnConfiguration(conf);
+ // Initialize the scheduler
+ this.scheduler =
+ ReflectionUtils.newInstance(
+ conf.getClass(YarnConfiguration.RESOURCE_SCHEDULER,
+ FifoScheduler.class, ResourceScheduler.class),
+ this.conf);
+ this.scheduler.reinitialize(this.conf, this.containerTokenSecretManager);
+
+ //TODO change this to be random
+ this.appTokenSecretManager.setMasterKey(ApplicationTokenSecretManager
+ .createSecretKey("Dummy".getBytes()));
+
+ applicationsManager = createApplicationsManagerImpl();
+ addService(applicationsManager);
+
+ rmResourceTracker = createRMResourceTracker();
+ rmResourceTracker.register(this.scheduler);
+ addService(rmResourceTracker);
+
+ clientRM = createClientRMService();
+ addService(clientRM);
+
+ masterService = createApplicationMasterService();
+ addService(masterService) ;
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+
+ try {
+ doSecureLogin();
+ } catch(IOException ie) {
+ throw new AvroRuntimeException("Failed to login", ie);
+ }
+ super.start();
+
+ webApp = WebApps.$for("yarn", masterService).at(
+ conf.get(YarnConfiguration.RM_WEBAPP_BIND_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_WEBAPP_BIND_ADDRESS)).
+ start(new RMWebApp(this));
+ }
+
+ public void run() {
+ synchronized(shutdown) {
+ try {
+ shutdown.wait();
+ } catch(InterruptedException ie) {
+ LOG.info("Interrupted while waiting", ie);
+ }
+ }
+ }
+
+ protected void doSecureLogin() throws IOException {
+ SecurityUtil.login(conf, RMConfig.RM_KEYTAB,
+ YarnConfiguration.RM_SERVER_PRINCIPAL_KEY);
+ }
+
+ @Override
+ public void stop() {
+ if (webApp != null) {
+ webApp.stop();
+ }
+
+ synchronized(shutdown) {
+ shutdown = true;
+ shutdown.notify();
+ }
+ super.stop();
+ }
+
+ protected RMResourceTrackerImpl createRMResourceTracker() {
+ return new RMResourceTrackerImpl(this.containerTokenSecretManager);
+ }
+
+ protected ApplicationsManagerImpl createApplicationsManagerImpl() {
+ return new ApplicationsManagerImpl(
+ this.appTokenSecretManager, this.scheduler, this.asmContext);
+ }
+
+ protected ClientRMService createClientRMService() {
+ return new ClientRMService(applicationsManager, rmResourceTracker);
+ }
+
+ protected ApplicationMasterService createApplicationMasterService() {
+ return new ApplicationMasterService(
+ this.appTokenSecretManager, applicationsManager, scheduler, this.asmContext);
+ }
+
+ /**
+ * return applications manager.
+ * @return
+ */
+ public ApplicationsManager getApplicationsManager() {
+ return applicationsManager;
+ }
+
+ /**
+ * return the scheduler.
+ * @return
+ */
+ public ResourceScheduler getResourceScheduler() {
+ return this.scheduler;
+ }
+
+ /**
+ * return the resource tracking component.
+ * @return
+ */
+ public RMResourceTrackerImpl getResourceTracker() {
+ return this.rmResourceTracker;
+ }
+
+
+ public static void main(String argv[]) {
+ ResourceManager resourceManager = null;
+ try {
+ Configuration conf = new YarnConfiguration();
+ resourceManager = new ResourceManager();
+ resourceManager.init(conf);
+ resourceManager.start();
+ resourceManager.run();
+ } catch (Exception e) {
+ LOG.error("Error starting RM", e);
+ } finally {
+ if (resourceManager != null) {
+ resourceManager.stop();
+ }
+ }
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMLauncher.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMLauncher.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMLauncher.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,261 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.crypto.SecretKey;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerLaunchContext;
+import org.apache.hadoop.yarn.ContainerManager;
+import org.apache.hadoop.yarn.ContainerToken;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
+import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
+import org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.ASMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.AMLauncherEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
+
+public class AMLauncher implements Runnable {
+
+ private static final Log LOG = LogFactory.getLog(AMLauncher.class);
+
+ private ContainerManager containerMgrProxy;
+
+ private final AppContext master;
+ private final Configuration conf;
+ private ApplicationTokenSecretManager applicationTokenSecretManager;
+ private ClientToAMSecretManager clientToAMSecretManager;
+ private AMLauncherEventType event;
+
+ @SuppressWarnings("rawtypes")
+ private EventHandler handler;
+
+ @SuppressWarnings("unchecked")
+ public AMLauncher(ASMContext asmContext, AppContext master,
+ AMLauncherEventType event,ApplicationTokenSecretManager applicationTokenSecretManager,
+ ClientToAMSecretManager clientToAMSecretManager, Configuration conf) {
+ this.master = master;
+ this.conf = new Configuration(conf); // Just not to touch the sec-info class
+ this.applicationTokenSecretManager = applicationTokenSecretManager;
+ this.clientToAMSecretManager = clientToAMSecretManager;
+ this.conf.setClass(
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_INFO_CLASS_NAME,
+ ContainerManagerSecurityInfo.class, SecurityInfo.class);
+ this.event = event;
+ this.handler = asmContext.getDispatcher().getEventHandler();
+ }
+
+ private void connect() throws IOException {
+ ContainerID masterContainerID = master.getMasterContainer().id;
+ containerMgrProxy =
+ getContainerMgrProxy(masterContainerID.appID);
+ }
+
+ private void launch() throws IOException {
+ connect();
+ ContainerID masterContainerID = master.getMasterContainer().id;
+ ApplicationSubmissionContext applicationContext =
+ master.getSubmissionContext();
+ LOG.info("Setting up container " + master.getMasterContainer()
+ + " for AM " + master.getMaster());
+ ContainerLaunchContext launchContext =
+ getLaunchSpec(applicationContext, masterContainerID);
+ containerMgrProxy.startContainer(launchContext);
+ LOG.info("Done launching container " + master.getMasterContainer()
+ + " for AM " + master.getMaster());
+ }
+
+ private void cleanup() throws IOException {
+ connect();
+ ContainerID containerId = master.getMasterContainer().id;
+ containerMgrProxy.stopContainer(containerId);
+ containerMgrProxy.cleanupContainer(containerId);
+ }
+
+ private ContainerManager getContainerMgrProxy(
+ final ApplicationID applicationID) throws IOException {
+
+ Container container = master.getMasterContainer();
+
+ final String containerManagerBindAddress = container.hostName.toString();
+
+ final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and again.
+
+ UserGroupInformation currentUser =
+ UserGroupInformation.createRemoteUser("TODO"); // TODO
+ if (UserGroupInformation.isSecurityEnabled()) {
+ ContainerToken containerToken = container.containerToken;
+ Token<ContainerTokenIdentifier> token =
+ new Token<ContainerTokenIdentifier>(
+ containerToken.identifier.array(),
+ containerToken.password.array(), new Text(
+ containerToken.kind.toString()), new Text(
+ containerToken.service.toString()));
+ currentUser.addToken(token);
+ }
+ return currentUser.doAs(new PrivilegedAction<ContainerManager>() {
+ @Override
+ public ContainerManager run() {
+ return (ContainerManager) rpc.getProxy(ContainerManager.class,
+ NetUtils.createSocketAddr(containerManagerBindAddress), conf);
+ }
+ });
+ }
+
+ private ContainerLaunchContext getLaunchSpec(
+ ApplicationSubmissionContext applicationMasterContext,
+ ContainerID containerID) throws IOException {
+
+ // Construct the actual Container
+ ContainerLaunchContext container = new ContainerLaunchContext();
+ container.command = applicationMasterContext.command;
+ StringBuilder mergedCommand = new StringBuilder();
+ for (CharSequence str : container.command) {
+ mergedCommand.append(str).append(" ");
+ }
+ LOG.info("Command to launch container " +
+ containerID + " : " + mergedCommand);
+ container.env = applicationMasterContext.environment;
+
+ container.env.putAll(setupTokensInEnv(applicationMasterContext));
+
+ // Construct the actual Container
+ container.id = containerID;
+ container.user = applicationMasterContext.user;
+ container.resource = applicationMasterContext.masterCapability;
+ container.resources = applicationMasterContext.resources_todo;
+ container.containerTokens = applicationMasterContext.fsTokens_todo;
+ return container;
+ }
+
+ private Map<CharSequence, CharSequence> setupTokensInEnv(
+ ApplicationSubmissionContext asc)
+ throws IOException {
+ Map<CharSequence, CharSequence> env =
+ new HashMap<CharSequence, CharSequence>();
+ if (UserGroupInformation.isSecurityEnabled()) {
+ // TODO: Security enabled/disabled info should come from RM.
+
+ Credentials credentials = new Credentials();
+
+ DataInputByteBuffer dibb = new DataInputByteBuffer();
+ if (asc.fsTokens_todo != null) {
+ // TODO: Don't do this kind of checks everywhere.
+ dibb.reset(asc.fsTokens_todo);
+ credentials.readTokenStorageStream(dibb);
+ }
+
+ ApplicationTokenIdentifier id =
+ new ApplicationTokenIdentifier(master.getMasterContainer().id.appID);
+ Token<ApplicationTokenIdentifier> token =
+ new Token<ApplicationTokenIdentifier>(id,
+ this.applicationTokenSecretManager);
+ String schedulerAddressStr =
+ this.conf.get(YarnConfiguration.SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_SCHEDULER_BIND_ADDRESS);
+ InetSocketAddress unresolvedAddr =
+ NetUtils.createSocketAddr(schedulerAddressStr);
+ String resolvedAddr =
+ unresolvedAddr.getAddress().getHostAddress() + ":"
+ + unresolvedAddr.getPort();
+ token.setService(new Text(resolvedAddr));
+ String appMasterTokenEncoded = token.encodeToUrlString();
+ LOG.debug("Putting appMaster token in env : " + appMasterTokenEncoded);
+ env.put(YarnConfiguration.APPLICATION_MASTER_TOKEN_ENV_NAME,
+ appMasterTokenEncoded);
+
+ // Add the RM token
+ credentials.addToken(new Text(resolvedAddr), token);
+ DataOutputBuffer dob = new DataOutputBuffer();
+ credentials.writeTokenStorageToStream(dob);
+ asc.fsTokens_todo = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+
+ ApplicationTokenIdentifier identifier =
+ new ApplicationTokenIdentifier(
+ this.master.getMaster().applicationId);
+ SecretKey clientSecretKey =
+ this.clientToAMSecretManager.getMasterKey(identifier);
+ String encoded =
+ Base64.encodeBase64URLSafeString(clientSecretKey.getEncoded());
+ LOG.debug("The encoded client secret-key to be put in env : " + encoded);
+ env.put(YarnConfiguration.APPLICATION_CLIENT_SECRET_ENV_NAME, encoded);
+ }
+ return env;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void run() {
+ switch (event) {
+ case LAUNCH:
+ try {
+ LOG.info("Launching master" + master.getMaster());
+ launch();
+ } catch(IOException ie) {
+ LOG.info("Error launching ", ie);
+ handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.FAILED, master));
+ }
+ handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.LAUNCHED, master));
+ break;
+ case CLEANUP:
+ try {
+ LOG.info("Cleaning master " + master.getMaster());
+ cleanup();
+ } catch(IOException ie) {
+ LOG.info("Error cleaning master ", ie);
+ }
+ handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.FINISH, master));
+ break;
+ default:
+ break;
+ }
+ }
+
+ public AppContext getApplicationContext() {
+ return master;
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,351 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ApplicationMaster;
+import org.apache.hadoop.yarn.ApplicationState;
+import org.apache.hadoop.yarn.ApplicationStatus;
+import org.apache.hadoop.yarn.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.ASMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+/**
+ * This class tracks the application masters that are running. It tracks
+ * heartbeats from application master to see if it needs to expire some application
+ * master.
+ */
+@Evolving
+@Private
+public class AMTracker extends AbstractService implements EventHandler<ASMEvent
+ <ApplicationEventType>> {
+ private static final Log LOG = LogFactory.getLog(AMTracker.class);
+ private HeartBeatThread heartBeatThread;
+ private long amExpiryInterval;
+ @SuppressWarnings("rawtypes")
+ private EventHandler handler;
+
+ private final ASMContext asmContext;
+
+ private final Map<ApplicationID, ApplicationMasterInfo> applications =
+ new ConcurrentHashMap<ApplicationID, ApplicationMasterInfo>();
+
+ private TreeSet<ApplicationStatus> amExpiryQueue =
+ new TreeSet<ApplicationStatus>(
+ new Comparator<ApplicationStatus>() {
+ public int compare(ApplicationStatus p1, ApplicationStatus p2) {
+ if (p1.lastSeen < p2.lastSeen) {
+ return -1;
+ } else if (p1.lastSeen > p2.lastSeen) {
+ return 1;
+ } else {
+ return (p1.applicationId.id -
+ p2.applicationId.id);
+ }
+ }
+ }
+ );
+
+ public AMTracker(ASMContext asmContext) {
+ super(AMTracker.class.getName());
+ this.heartBeatThread = new HeartBeatThread();
+ this.asmContext = asmContext;
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ super.init(conf);
+ this.handler = asmContext.getDispatcher().getEventHandler();
+ this.amExpiryInterval = conf.getLong(YarnConfiguration.AM_EXPIRY_INTERVAL,
+ YarnConfiguration.DEFAULT_AM_EXPIRY_INTERVAL);
+ this.asmContext.getDispatcher().register(ApplicationEventType.class, this);
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ heartBeatThread.start();
+ }
+
+ /**
+ * This class runs continuosly to track the application masters
+ * that might be dead.
+ */
+ private class HeartBeatThread extends Thread {
+ private volatile boolean stop = false;
+
+ public HeartBeatThread() {
+ super("ApplicationsManager:" + HeartBeatThread.class.getName());
+ }
+
+ @Override
+ public void run() {
+ /* the expiry queue does not need to be in sync with applications,
+ * if an applications in the expiry queue cannot be found in applications
+ * its alright. We do not want to hold a hold on applications while going
+ * through the expiry queue.
+ */
+ List<ApplicationID> expired = new ArrayList<ApplicationID>();
+ while (!stop) {
+ ApplicationStatus leastRecent;
+ long now = System.currentTimeMillis();
+ expired.clear();
+ synchronized(amExpiryQueue) {
+ while ((amExpiryQueue.size() > 0) &&
+ (leastRecent = amExpiryQueue.first()) != null &&
+ ((now - leastRecent.lastSeen) >
+ amExpiryInterval)) {
+ amExpiryQueue.remove(leastRecent);
+ ApplicationMasterInfo info;
+ synchronized(applications) {
+ info = applications.get(leastRecent.applicationId);
+ }
+ if (info == null) {
+ continue;
+ }
+ ApplicationStatus status = info.getStatus();
+ if ((now - status.lastSeen) > amExpiryInterval) {
+ expired.add(status.applicationId);
+ } else {
+ amExpiryQueue.add(status);
+ }
+ }
+ }
+ expireAMs(expired);
+ }
+ }
+
+ public void shutdown() {
+ stop = true;
+ }
+ }
+
+ protected void expireAMs(List<ApplicationID> toExpire) {
+ for (ApplicationID app: toExpire) {
+ ApplicationMasterInfo am = null;
+ synchronized (applications) {
+ am = applications.get(app);
+ }
+
+ handler.handle(new ASMEvent<ApplicationEventType>
+ (ApplicationEventType.EXPIRE, am));
+ }
+ }
+
+ @Override
+ public void stop() {
+ heartBeatThread.interrupt();
+ heartBeatThread.shutdown();
+ try {
+ heartBeatThread.join(1000);
+ } catch (InterruptedException ie) {
+ LOG.info(heartBeatThread.getName() + " interrupted during join ",
+ ie); }
+ super.stop();
+ }
+
+ public void addMaster(String user, ApplicationSubmissionContext
+ submissionContext, String clientToken) {
+ ApplicationMasterInfo applicationMaster = new ApplicationMasterInfo(handler,
+ user, submissionContext, clientToken);
+ synchronized(applications) {
+ applications.put(applicationMaster.getApplicationID(), applicationMaster);
+ }
+ /* initiate the launching cycle for the AM */
+ handler.handle(new ASMEvent<ApplicationEventType>(
+ ApplicationEventType.ALLOCATE, applicationMaster));
+ }
+
+ public void finish(ApplicationID application) {
+ ApplicationMasterInfo masterInfo = null;
+ synchronized(applications) {
+ masterInfo = applications.get(application);
+ }
+ handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.FINISH,
+ masterInfo));
+ }
+
+ public ApplicationMasterInfo get(ApplicationID applicationId) {
+ ApplicationMasterInfo masterInfo = null;
+ synchronized (applications) {
+ masterInfo = applications.get(applicationId);
+ }
+ return masterInfo;
+ }
+
+ public void remove(ApplicationID applicationId) {
+ synchronized (applications) {
+ applications.remove(applicationId);
+ }
+ }
+
+ public synchronized List<AppContext> getAllApplications() {
+ List<AppContext> allAMs = new ArrayList<AppContext>();
+ synchronized (applications) {
+ for ( ApplicationMasterInfo val: applications.values()) {
+ allAMs.add(val);
+ }
+ }
+ return allAMs;
+ }
+
+ private void addForTracking(AppContext master) {
+ LOG.info("Adding application master for tracking " + master.getMaster());
+ synchronized (amExpiryQueue) {
+ amExpiryQueue.add(master.getStatus());
+ }
+ }
+
+ public void kill(ApplicationID applicationID) {
+ ApplicationMasterInfo masterInfo = null;
+
+ synchronized(applications) {
+ masterInfo = applications.get(applicationID);
+ }
+ handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.KILL,
+ masterInfo));
+ }
+
+ /*
+ * this class is used for passing status context to the application state
+ * machine.
+ */
+ private static class TrackerAppContext implements AppContext {
+ private final ApplicationID appID;
+ private final ApplicationMaster master;
+ private final UnsupportedOperationException notimplemented;
+
+ public TrackerAppContext(
+ ApplicationID appId, ApplicationMaster master) {
+ this.appID = appId;
+ this.master = master;
+ this.notimplemented = new NotImplementedException();
+ }
+
+ @Override
+ public ApplicationSubmissionContext getSubmissionContext() {
+ throw notimplemented;
+ }
+ @Override
+ public Resource getResource() {
+ throw notimplemented;
+ }
+ @Override
+ public ApplicationID getApplicationID() {
+ return appID;
+ }
+ @Override
+ public ApplicationStatus getStatus() {
+ return master.status;
+ }
+ @Override
+ public ApplicationMaster getMaster() {
+ return master;
+ }
+ @Override
+ public Container getMasterContainer() {
+ throw notimplemented;
+ }
+ @Override
+ public String getUser() {
+ throw notimplemented;
+ }
+ @Override
+ public long getLastSeen() {
+ return master.status.lastSeen;
+ }
+ @Override
+ public String getName() {
+ throw notimplemented;
+ }
+ @Override
+ public String getQueue() {
+ throw notimplemented;
+ }
+ }
+
+ public void heartBeat(ApplicationStatus status) {
+ ApplicationMaster master = new ApplicationMaster();
+ master.status = status;
+ master.applicationId = status.applicationId;
+ TrackerAppContext context = new TrackerAppContext(status.applicationId, master);
+ handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.STATUSUPDATE,
+ context));
+ }
+
+ public void registerMaster(ApplicationMaster applicationMaster) {
+ applicationMaster.status.lastSeen = System.currentTimeMillis();
+ ApplicationMasterInfo master = null;
+ synchronized(applications) {
+ master = applications.get(applicationMaster.applicationId);
+ }
+ LOG.info("AM registration " + master.getMaster());
+ TrackerAppContext registrationContext = new TrackerAppContext(
+ master.getApplicationID(), applicationMaster);
+ handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.
+ REGISTERED, registrationContext));
+ }
+
+ @Override
+ public void handle(ASMEvent<ApplicationEventType> event) {
+ ApplicationID appID = event.getAppContext().getApplicationID();
+ ApplicationMasterInfo masterInfo = null;
+ synchronized(applications) {
+ masterInfo = applications.get(appID);
+ }
+ try {
+ masterInfo.handle(event);
+ } catch(Throwable t) {
+ LOG.error("Error in handling event type " + event.getType() + " for application "
+ + event.getAppContext().getApplicationID());
+ }
+ /* we need to launch the applicaiton master on allocated transition */
+ if (masterInfo.getState() == ApplicationState.ALLOCATED) {
+ handler.handle(new ASMEvent<ApplicationEventType>(
+ ApplicationEventType.LAUNCH, masterInfo));
+ }
+ if (masterInfo.getState() == ApplicationState.LAUNCHED) {
+ /* the application move to a launched state start tracking */
+ synchronized (amExpiryQueue) {
+ LOG.info("DEBUG -- adding to expiry " + masterInfo.getStatus() +
+ " currenttime " + System.currentTimeMillis());
+ amExpiryQueue.add(masterInfo.getStatus());
+ }
+ }
+ }
+}
\ No newline at end of file
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AppContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AppContext.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AppContext.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AppContext.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,94 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
+
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ApplicationMaster;
+import org.apache.hadoop.yarn.ApplicationStatus;
+import org.apache.hadoop.yarn.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.Resource;
+
+/**
+ * The context of an application.
+ *
+ */
+public interface AppContext {
+
+ /**
+ * the application submission context for this application.
+ * @return the {@link ApplicationSubmissionContext} for the submitted
+ * application.
+ */
+ public ApplicationSubmissionContext getSubmissionContext();
+
+ /**
+ * get the resource required for the application master.
+ * @return the resource requirements of the application master
+ */
+ public Resource getResource();
+
+ /**
+ * get the application ID for this application
+ * @return the application id for this application
+ */
+ public ApplicationID getApplicationID();
+
+ /**
+ * get the status of the application
+ * @return the {@link ApplicationStatus} of this application
+ */
+ public ApplicationStatus getStatus();
+
+ /**
+ * the application master for this application.
+ * @return the {@link ApplicationMaster} for this application
+ */
+ public ApplicationMaster getMaster();
+
+ /**
+ * the container on which the application master is running.
+ * @return the container for running the application master.
+ */
+ public Container getMasterContainer();
+
+ /**
+ * the user for this application
+ * @return the user for this application
+ */
+ public String getUser();
+
+ /**
+ * The last time the RM heard from this application
+ * @return the last time RM heard from this application.
+ */
+ public long getLastSeen();
+
+ /**
+ * the name for this application
+ * @return the application name.
+ */
+ public String getName();
+
+ /**
+ * The queue of this application.
+ * @return the queue for this application
+ */
+ public String getQueue();
+}
\ No newline at end of file
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterHandler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterHandler.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterHandler.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterHandler.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,42 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.yarn.ApplicationMaster;
+import org.apache.hadoop.yarn.ApplicationStatus;
+
+/**
+ * Interface the application master use for application master status
+ * and registeration.
+ */
+@Private
+@Evolving
+public interface ApplicationMasterHandler {
+ void registerApplicationMaster(ApplicationMaster applicationMaster)
+ throws IOException ;
+
+ void applicationHeartbeat(ApplicationStatus status) throws IOException;
+
+ void finishApplicationMaster(ApplicationMaster applicationMaster)
+ throws IOException;
+}
\ No newline at end of file