You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/04/01 00:23:34 UTC
svn commit: r1087462 [17/20] - in /hadoop/mapreduce/branches/MR-279: ./
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/
mr-client/...
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java Thu Mar 31 22:23:22 2011
@@ -23,14 +23,10 @@ import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.PrintWriter;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
import junit.framework.Assert;
-import org.apache.avro.ipc.AvroRemoteException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.NodeHealthCheckerService;
@@ -39,18 +35,25 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.yarn.ContainerID;
-import org.apache.hadoop.yarn.ContainerLaunchContext;
-import org.apache.hadoop.yarn.ContainerState;
-import org.apache.hadoop.yarn.ContainerStatus;
-import org.apache.hadoop.yarn.LocalResource;
-import org.apache.hadoop.yarn.LocalResourceType;
-import org.apache.hadoop.yarn.LocalResourceVisibility;
-import org.apache.hadoop.yarn.URL;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
@@ -69,13 +72,14 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ApplicationLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.service.Service.STATE;
-import org.apache.hadoop.yarn.util.AvroUtil;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestContainerManager {
+ private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
static {
DefaultMetricsSystem.setMiniClusterMode(true);
}
@@ -105,13 +109,13 @@ public class TestContainerManager {
private NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl(
context, dispatcher, healthChecker) {
@Override
- protected org.apache.hadoop.yarn.ResourceTracker getRMClient() {
+ protected ResourceTracker getRMClient() {
return new LocalRMInterface();
};
@Override
protected void startStatusUpdater() throws InterruptedException,
- AvroRemoteException {
+ YarnRemoteException {
return; // Don't start any updating thread.
}
};
@@ -167,8 +171,10 @@ public class TestContainerManager {
// Just do a query for a non-existing container.
boolean throwsException = false;
try {
- containerManager.getContainerStatus(new ContainerID());
- } catch (AvroRemoteException e) {
+ GetContainerStatusRequest request = recordFactory.newRecordInstance(GetContainerStatusRequest.class);
+ request.setContainerId(recordFactory.newRecordInstance(ContainerId.class));
+ containerManager.getContainerStatus(request);
+ } catch (YarnRemoteException e) {
throwsException = true;
}
Assert.assertTrue(throwsException);
@@ -187,45 +193,46 @@ public class TestContainerManager {
fileWriter.write("Hello World!");
fileWriter.close();
- ContainerLaunchContext container = new ContainerLaunchContext();
+ ContainerLaunchContext container = recordFactory.newRecordInstance(ContainerLaunchContext.class);
// ////// Construct the Container-id
- ApplicationID appId = new ApplicationID();
- ContainerID cId = new ContainerID();
- cId.appID = appId;
- container.id = cId;
+ ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
+ ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
+ cId.setAppId(appId);
+ container.setContainerId(cId);
- container.user = user;
+ container.setUser(user);
// ////// Construct the container-spec.
- ContainerLaunchContext containerLaunchContext =
- new ContainerLaunchContext();
- containerLaunchContext.resources =
- new HashMap<CharSequence, LocalResource>();
+ ContainerLaunchContext containerLaunchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class);
+// containerLaunchContext.resources = new HashMap<CharSequence, LocalResource>();
URL resource_alpha =
- AvroUtil.getYarnUrlFromPath(localFS
+ ConverterUtils.getYarnUrlFromPath(localFS
.makeQualified(new Path(file.getAbsolutePath())));
- LocalResource rsrc_alpha = new LocalResource();
- rsrc_alpha.resource = resource_alpha;
- rsrc_alpha.size= -1;
- rsrc_alpha.state = LocalResourceVisibility.APPLICATION;
- rsrc_alpha.type = LocalResourceType.FILE;
- rsrc_alpha.timestamp = file.lastModified();
+ LocalResource rsrc_alpha = recordFactory.newRecordInstance(LocalResource.class);
+ rsrc_alpha.setResource(resource_alpha);
+ rsrc_alpha.setSize(-1);
+ rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
+ rsrc_alpha.setType(LocalResourceType.FILE);
+ rsrc_alpha.setTimestamp(file.lastModified());
String destinationFile = "dest_file";
- containerLaunchContext.resources.put(destinationFile, rsrc_alpha);
- containerLaunchContext.user = container.user;
- containerLaunchContext.id = container.id;
- containerLaunchContext.command = new ArrayList<CharSequence>();
+ containerLaunchContext.setLocalResource(destinationFile, rsrc_alpha);
+ containerLaunchContext.setUser(container.getUser());
+ containerLaunchContext.setContainerId(container.getContainerId());
+// containerLaunchContext.command = new ArrayList<CharSequence>();
- containerManager.startContainer(containerLaunchContext);
+ StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
+ startRequest.setContainerLaunchContext(containerLaunchContext);
+
+ containerManager.startContainer(startRequest);
DummyContainerManager.waitForContainerState(containerManager, cId,
ContainerState.COMPLETE);
// Now ascertain that the resources are localised correctly.
// TODO: Don't we need clusterStamp in localDir?
- String appIDStr = AvroUtil.toString(appId);
- String containerIDStr = AvroUtil.toString(cId);
+ String appIDStr = ConverterUtils.toString(appId);
+ String containerIDStr = ConverterUtils.toString(cId);
File userCacheDir = new File(localDir, ApplicationLocalizer.USERCACHE);
File userDir = new File(userCacheDir, user);
File appCache = new File(userDir, ApplicationLocalizer.APPCACHE);
@@ -269,36 +276,34 @@ public class TestContainerManager {
fileWriter.write("\nsleep 100");
fileWriter.close();
- ContainerLaunchContext containerLaunchContext =
- new ContainerLaunchContext();
+ ContainerLaunchContext containerLaunchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class);
// ////// Construct the Container-id
- ApplicationID appId = new ApplicationID();
- ContainerID cId = new ContainerID();
- cId.appID = appId;
- containerLaunchContext.id = cId;
+ ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
+ ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
+ cId.setAppId(appId);
+ containerLaunchContext.setContainerId(cId);
- containerLaunchContext.user = user;
+ containerLaunchContext.setUser(user);
- containerLaunchContext.resources =
- new HashMap<CharSequence, LocalResource>();
+// containerLaunchContext.resources =new HashMap<CharSequence, LocalResource>();
URL resource_alpha =
- AvroUtil.getYarnUrlFromPath(localFS
+ ConverterUtils.getYarnUrlFromPath(localFS
.makeQualified(new Path(scriptFile.getAbsolutePath())));
- LocalResource rsrc_alpha = new LocalResource();
- rsrc_alpha.resource = resource_alpha;
- rsrc_alpha.size= -1;
- rsrc_alpha.state = LocalResourceVisibility.APPLICATION;
- rsrc_alpha.type = LocalResourceType.FILE;
- rsrc_alpha.timestamp = scriptFile.lastModified();
+ LocalResource rsrc_alpha = recordFactory.newRecordInstance(LocalResource.class);
+ rsrc_alpha.setResource(resource_alpha);
+ rsrc_alpha.setSize(-1);
+ rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
+ rsrc_alpha.setType(LocalResourceType.FILE);
+ rsrc_alpha.setTimestamp(scriptFile.lastModified());
String destinationFile = "dest_file";
- containerLaunchContext.resources.put(destinationFile, rsrc_alpha);
- containerLaunchContext.user = containerLaunchContext.user;
- List<CharSequence> commandArgs = new ArrayList<CharSequence>();
- commandArgs.add("/bin/bash");
- commandArgs.add(scriptFile.getAbsolutePath());
- containerLaunchContext.command = commandArgs;
- containerManager.startContainer(containerLaunchContext);
+ containerLaunchContext.setLocalResource(destinationFile, rsrc_alpha);
+ containerLaunchContext.setUser(containerLaunchContext.getUser());
+ containerLaunchContext.addCommand("/bin/bash");
+ containerLaunchContext.addCommand(scriptFile.getAbsolutePath());
+ StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
+ startRequest.setContainerLaunchContext(containerLaunchContext);
+ containerManager.startContainer(startRequest);
int timeoutSecs = 0;
while (!processStartFile.exists() && timeoutSecs++ < 20) {
@@ -328,13 +333,18 @@ public class TestContainerManager {
exec.signalContainer(user,
pid, Signal.NULL));
- containerManager.stopContainer(cId);
+ StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class);
+ stopRequest.setContainerId(cId);
+ containerManager.stopContainer(stopRequest);
DummyContainerManager.waitForContainerState(containerManager, cId,
ContainerState.COMPLETE);
- ContainerStatus containerStatus = containerManager.getContainerStatus(cId);
+
+ GetContainerStatusRequest gcsRequest = recordFactory.newRecordInstance(GetContainerStatusRequest.class);
+ gcsRequest.setContainerId(cId);
+ ContainerStatus containerStatus = containerManager.getContainerStatus(gcsRequest).getStatus();
Assert.assertEquals(ExitCode.KILLED.getExitCode(),
- containerStatus.exitStatus);
+ containerStatus.getExitStatus());
// Assert that the process is not alive anymore
Assert.assertFalse("Process is still alive!",
@@ -360,47 +370,48 @@ public class TestContainerManager {
fileWriter.write("Hello World!");
fileWriter.close();
- ContainerLaunchContext container = new ContainerLaunchContext();
+ ContainerLaunchContext container = recordFactory.newRecordInstance(ContainerLaunchContext.class);
// ////// Construct the Container-id
- ApplicationID appId = new ApplicationID();
- ContainerID cId = new ContainerID();
- cId.appID = appId;
- container.id = cId;
+ ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
+ ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
+ cId.setAppId(appId);
+ container.setContainerId(cId);
- container.user = user;
+ container.setUser(user);
// ////// Construct the container-spec.
- ContainerLaunchContext containerLaunchContext =
- new ContainerLaunchContext();
- containerLaunchContext.resources =
- new HashMap<CharSequence, LocalResource>();
+ ContainerLaunchContext containerLaunchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class);
+// containerLaunchContext.resources =
+// new HashMap<CharSequence, LocalResource>();
URL resource_alpha =
- AvroUtil.getYarnUrlFromPath(FileContext.getLocalFSFileContext()
+ ConverterUtils.getYarnUrlFromPath(FileContext.getLocalFSFileContext()
.makeQualified(new Path(file.getAbsolutePath())));
- LocalResource rsrc_alpha = new LocalResource();
- rsrc_alpha.resource = resource_alpha;
- rsrc_alpha.size = -1;
- rsrc_alpha.state = LocalResourceVisibility.APPLICATION;
- rsrc_alpha.type = LocalResourceType.FILE;
- rsrc_alpha.timestamp = file.lastModified();
+ LocalResource rsrc_alpha = recordFactory.newRecordInstance(LocalResource.class);
+ rsrc_alpha.setResource(resource_alpha);
+ rsrc_alpha.setSize(-1);
+ rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
+ rsrc_alpha.setType(LocalResourceType.FILE);
+ rsrc_alpha.setTimestamp(file.lastModified());
String destinationFile = "dest_file";
- containerLaunchContext.resources.put(destinationFile, rsrc_alpha);
- containerLaunchContext.user = container.user;
- containerLaunchContext.id = container.id;
- containerLaunchContext.command = new ArrayList<CharSequence>();
-
- containerManager.startContainer(containerLaunchContext);
+ containerLaunchContext.setLocalResource(destinationFile, rsrc_alpha);
+ containerLaunchContext.setUser(container.getUser());
+ containerLaunchContext.setContainerId(container.getContainerId());
+// containerLaunchContext.command = new ArrayList<CharSequence>();
+
+ StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class);
+ request.setContainerLaunchContext(containerLaunchContext);
+ containerManager.startContainer(request);
DummyContainerManager.waitForContainerState(containerManager, cId,
ContainerState.COMPLETE);
- waitForApplicationState(containerManager, cId.appID,
+ waitForApplicationState(containerManager, cId.getAppId(),
ApplicationState.RUNNING);
// Now ascertain that the resources are localised correctly.
- String appIDStr = AvroUtil.toString(appId);
- String containerIDStr = AvroUtil.toString(cId);
+ String appIDStr = ConverterUtils.toString(appId);
+ String containerIDStr = ConverterUtils.toString(cId);
File userCacheDir = new File(localDir, ApplicationLocalizer.USERCACHE);
File userDir = new File(userCacheDir, user);
File appCache = new File(userDir, ApplicationLocalizer.APPCACHE);
@@ -425,9 +436,9 @@ public class TestContainerManager {
// Simulate RM sending an AppFinish event.
containerManager.handle(new CMgrCompletedAppsEvent(Arrays
- .asList(new ApplicationID[] { appId })));
+ .asList(new ApplicationId[] { appId })));
- waitForApplicationState(containerManager, cId.appID,
+ waitForApplicationState(containerManager, cId.getAppId(),
ApplicationState.FINISHED);
// Now ascertain that the resources are localised correctly.
@@ -546,7 +557,7 @@ public class TestContainerManager {
// }
static void waitForApplicationState(ContainerManagerImpl containerManager,
- ApplicationID appID, ApplicationState finalState)
+ ApplicationId appID, ApplicationState finalState)
throws InterruptedException {
// Wait for app-finish
Application app =
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestApplicationLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestApplicationLocalizer.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestApplicationLocalizer.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestApplicationLocalizer.java Thu Mar 31 22:23:22 2011
@@ -41,15 +41,17 @@ import org.apache.hadoop.io.DataOutputBu
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.SuccessfulLocalizationRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ApplicationLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FSDownload;
-import org.apache.hadoop.yarn.LocalResource;
-import org.apache.hadoop.yarn.LocalizationProtocol;
-import org.apache.hadoop.yarn.URL;
+import org.apache.hadoop.yarn.api.records.URL;
-import static org.apache.hadoop.yarn.LocalResourceType.*;
-import static org.apache.hadoop.yarn.LocalResourceVisibility.*;
+import static org.apache.hadoop.yarn.api.records.LocalResourceType.*;
+import static org.apache.hadoop.yarn.api.records.LocalResourceVisibility.*;
import org.junit.Test;
import static org.junit.Assert.*;
@@ -66,6 +68,7 @@ public class TestApplicationLocalizer {
FsPermission.createImmutable((short) 0700);
private static final FsPermission urwx_gx =
FsPermission.createImmutable((short) 0710);
+ private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
static DataInputBuffer createFakeCredentials(Random r, int nTok)
throws IOException {
@@ -90,34 +93,34 @@ public class TestApplicationLocalizer {
return ret;
}
- static Collection<LocalResource> createFakeResources(Random r, int nFiles,
- Map<Long,LocalResource> sizes) throws IOException {
- ArrayList<LocalResource> rsrc = new ArrayList<LocalResource>();
+ static Collection<org.apache.hadoop.yarn.api.records.LocalResource> createFakeResources(Random r, int nFiles,
+ Map<Long,org.apache.hadoop.yarn.api.records.LocalResource> sizes) throws IOException {
+ ArrayList<org.apache.hadoop.yarn.api.records.LocalResource> rsrc = new ArrayList<org.apache.hadoop.yarn.api.records.LocalResource>();
long basetime = r.nextLong() >>> 2;
for (int i = 0; i < nFiles; ++i) {
- LocalResource resource = new LocalResource();
- URL path = new URL();
- path.scheme = "file";
- path.host = null;
- path.port = 0;
- resource.timestamp = basetime + i;
- r.setSeed(resource.timestamp);
+ org.apache.hadoop.yarn.api.records.LocalResource resource = recordFactory.newRecordInstance(org.apache.hadoop.yarn.api.records.LocalResource.class);
+ URL path = recordFactory.newRecordInstance(URL.class);
+ path.setScheme("file");
+// path.host = null;
+ path.setPort(0);
+ resource.setTimestamp(basetime + i);
+ r.setSeed(resource.getTimestamp());
sizes.put(r.nextLong() & Long.MAX_VALUE, resource);
StringBuilder sb = new StringBuilder("/" + r.nextLong());
while (r.nextInt(2) == 1) {
sb.append("/" + r.nextLong());
}
- path.file = sb.toString();
- resource.resource = path;
- resource.size = -1;
- resource.type = r.nextInt(2) == 1 ? FILE : ARCHIVE;
- resource.state = PRIVATE;
+ path.setFile(sb.toString());
+ resource.setResource(path);
+ resource.setSize(-1);
+ resource.setType(r.nextInt(2) == 1 ? FILE : ARCHIVE);
+ resource.setVisibility(PRIVATE);
rsrc.add(resource);
}
return rsrc;
}
- static DataInputBuffer writeFakeAppFiles(Collection<LocalResource> rsrc)
+ static DataInputBuffer writeFakeAppFiles(Collection<org.apache.hadoop.yarn.api.records.LocalResource> rsrc)
throws IOException {
DataOutputBuffer dob = new DataOutputBuffer();
ApplicationLocalizer.writeResourceDescription(dob, rsrc);
@@ -156,8 +159,8 @@ public class TestApplicationLocalizer {
// return file stream instead of opening local file
r.setSeed(seed);
System.out.println("SEED: " + seed);
- final HashMap<Long,LocalResource> sizes = new HashMap<Long,LocalResource>();
- Collection<LocalResource> resources = createFakeResources(r, 10, sizes);
+ final HashMap<Long,org.apache.hadoop.yarn.api.records.LocalResource> sizes = new HashMap<Long,org.apache.hadoop.yarn.api.records.LocalResource>();
+ Collection<org.apache.hadoop.yarn.api.records.LocalResource> resources = createFakeResources(r, 10, sizes);
DataInputBuffer appFiles = writeFakeAppFiles(resources);
Path filesPath =
lfs.makeQualified(new Path(ApplicationLocalizer.FILECACHE_FILE));
@@ -176,7 +179,7 @@ public class TestApplicationLocalizer {
// set to return mocks
doReturn(mockLocalization).when(spyLocalizer).getProxy(nmAddr);
- for (Map.Entry<Long,LocalResource> rsrc : sizes.entrySet()) {
+ for (Map.Entry<Long,org.apache.hadoop.yarn.api.records.LocalResource> rsrc : sizes.entrySet()) {
doReturn(new FalseDownload(rsrc.getValue(), rsrc.getKey())
).when(spyLocalizer).download(Matchers.<LocalDirAllocator>anyObject(),
argThat(new LocalResourceMatches(rsrc.getValue())));
@@ -186,27 +189,27 @@ public class TestApplicationLocalizer {
// verify app files opened
verify(spylfs).open(tokenPath);
verify(spylfs).open(filesPath);
- ArgumentMatcher<CharSequence> userMatch =
- new ArgumentMatcher<CharSequence>() {
+ ArgumentMatcher<String> userMatch =
+ new ArgumentMatcher<String>() {
@Override
public boolean matches(Object o) {
return "yak".equals(o.toString());
}
};
- for (final Map.Entry<Long,LocalResource> rsrc : sizes.entrySet()) {
- ArgumentMatcher<LocalResource> localizedMatch =
- new ArgumentMatcher<LocalResource>() {
+ for (final Map.Entry<Long,org.apache.hadoop.yarn.api.records.LocalResource> rsrc : sizes.entrySet()) {
+ ArgumentMatcher<org.apache.hadoop.yarn.api.records.LocalResource> localizedMatch =
+ new ArgumentMatcher<org.apache.hadoop.yarn.api.records.LocalResource>() {
@Override
public boolean matches(Object o) {
- LocalResource other = (LocalResource) o;
- r.setSeed(rsrc.getValue().timestamp);
- boolean ret = (r.nextLong() & Long.MAX_VALUE) == other.size;
+ org.apache.hadoop.yarn.api.records.LocalResource other = (org.apache.hadoop.yarn.api.records.LocalResource) o;
+ r.setSeed(rsrc.getValue().getTimestamp());
+ boolean ret = (r.nextLong() & Long.MAX_VALUE) == other.getSize();
StringBuilder sb = new StringBuilder("/" + r.nextLong());
while (r.nextInt(2) == 1) {
sb.append("/" + r.nextLong());
}
- ret &= other.resource.file.toString().equals(sb.toString());
- ret &= other.type.equals(r.nextInt(2) == 1 ? FILE : ARCHIVE);
+ ret &= other.getResource().getFile().equals(sb.toString());
+ ret &= other.getType().equals(r.nextInt(2) == 1 ? FILE : ARCHIVE);
return ret;
}
};
@@ -214,39 +217,71 @@ public class TestApplicationLocalizer {
new ArgumentMatcher<URL>() {
@Override
public boolean matches(Object o) {
- r.setSeed(rsrc.getValue().timestamp);
- return ((URL)o).file.toString().equals(
+ r.setSeed(rsrc.getValue().getTimestamp());
+ return ((URL)o).getFile().equals(
"/done/" + (r.nextLong() & Long.MAX_VALUE));
}
};
- verify(mockLocalization).successfulLocalization(
- argThat(userMatch), argThat(localizedMatch), argThat(dstMatch));
+
+ ArgumentMatcher<SuccessfulLocalizationRequest> sucLocMatch = new ArgumentMatcher<SuccessfulLocalizationRequest>() {
+
+ @Override
+ public boolean matches(Object o) {
+ SuccessfulLocalizationRequest req = (SuccessfulLocalizationRequest)o;
+
+ //UserMatch
+ String user = req.getUser();
+ boolean retUser = "yak".equals(user.toString());
+
+ //LocalResourceMatch
+ org.apache.hadoop.yarn.api.records.LocalResource other = req.getResource();
+ r.setSeed(rsrc.getValue().getTimestamp());
+ boolean retLocalResource = (r.nextLong() & Long.MAX_VALUE) == other.getSize();
+ StringBuilder sb = new StringBuilder("/" + r.nextLong());
+ while (r.nextInt(2) == 1) {
+ sb.append("/" + r.nextLong());
+ }
+ retLocalResource &= other.getResource().getFile().equals(sb.toString());
+ retLocalResource &= other.getType().equals(r.nextInt(2) == 1 ? FILE : ARCHIVE);
+
+ //Path Match
+ URL url = req.getPath();
+ r.setSeed(rsrc.getValue().getTimestamp());
+ boolean retUrl = (url.getFile().equals(
+ "/done/" + (r.nextLong() & Long.MAX_VALUE)));
+
+ return (retUser && retLocalResource && retUrl);
+ }
+
+ };
+
+ verify(mockLocalization).successfulLocalization(argThat(sucLocMatch));
}
}
static class FalseDownload extends FSDownload {
private final long size;
- public FalseDownload(LocalResource resource, long size) {
+ public FalseDownload(org.apache.hadoop.yarn.api.records.LocalResource resource, long size) {
super(null, null, null, resource, null);
this.size = size;
}
@Override
- public Map<LocalResource,Path> call() {
- LocalResource ret = getResource();
- ret.size = size;
+ public Map<org.apache.hadoop.yarn.api.records.LocalResource,Path> call() {
+ org.apache.hadoop.yarn.api.records.LocalResource ret = getResource();
+ ret.setSize(size);
return Collections.singletonMap(ret, new Path("/done/" + size));
}
}
// sigh.
- class LocalResourceMatches extends ArgumentMatcher<LocalResource> {
- final LocalResource rsrc;
- LocalResourceMatches(LocalResource rsrc) {
+ class LocalResourceMatches extends ArgumentMatcher<org.apache.hadoop.yarn.api.records.LocalResource> {
+ final org.apache.hadoop.yarn.api.records.LocalResource rsrc;
+ LocalResourceMatches(org.apache.hadoop.yarn.api.records.LocalResource rsrc) {
this.rsrc = rsrc;
}
@Override
public boolean matches(Object o) {
- return rsrc.timestamp == ((LocalResource)o).timestamp;
+ return rsrc.getTimestamp() == ((org.apache.hadoop.yarn.api.records.LocalResource)o).getTimestamp();
}
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestFSDownload.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestFSDownload.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestFSDownload.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestFSDownload.java Thu Mar 31 22:23:22 2011
@@ -38,13 +38,13 @@ import org.apache.hadoop.fs.FSDataOutput
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FSDownload;
-import org.apache.hadoop.yarn.util.AvroUtil;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import static org.apache.hadoop.fs.CreateFlag.*;
-import org.apache.hadoop.yarn.LocalResource;
-import org.apache.hadoop.yarn.LocalResourceType;
import org.junit.AfterClass;
import org.junit.Test;
@@ -58,7 +58,7 @@ public class TestFSDownload {
fs.delete(new Path("target", TestFSDownload.class.getSimpleName()), true);
}
- static LocalResource createFile(FileContext files, Path p, int len, Random r)
+ static org.apache.hadoop.yarn.api.records.LocalResource createFile(FileContext files, Path p, int len, Random r)
throws IOException, URISyntaxException {
FSDataOutputStream out = null;
try {
@@ -69,11 +69,11 @@ public class TestFSDownload {
} finally {
if (out != null) out.close();
}
- LocalResource ret = new LocalResource();
- ret.resource = AvroUtil.getYarnUrlFromPath(p);
- ret.size = len;
- ret.type = LocalResourceType.FILE;
- ret.timestamp = files.getFileStatus(p).getModificationTime();
+ org.apache.hadoop.yarn.api.records.LocalResource ret = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(org.apache.hadoop.yarn.api.records.LocalResource.class);
+ ret.setResource(ConverterUtils.getYarnUrlFromPath(p));
+ ret.setSize(len);
+ ret.setType(LocalResourceType.FILE);
+ ret.setTimestamp(files.getFileStatus(p).getModificationTime());
return ret;
}
@@ -97,7 +97,7 @@ public class TestFSDownload {
int[] sizes = new int[10];
for (int i = 0; i < 10; ++i) {
sizes[i] = rand.nextInt(512) + 512;
- LocalResource rsrc = createFile(files, new Path(basedir, "" + i),
+ org.apache.hadoop.yarn.api.records.LocalResource rsrc = createFile(files, new Path(basedir, "" + i),
sizes[i], rand);
FSDownload fsd =
new FSDownload(files, conf, dirs, rsrc, new Random(sharedSeed));
@@ -105,21 +105,21 @@ public class TestFSDownload {
}
ExecutorService exec = Executors.newSingleThreadExecutor();
- CompletionService<Map<LocalResource,Path>> queue =
- new ExecutorCompletionService<Map<LocalResource,Path>>(exec);
+ CompletionService<Map<org.apache.hadoop.yarn.api.records.LocalResource,Path>> queue =
+ new ExecutorCompletionService<Map<org.apache.hadoop.yarn.api.records.LocalResource,Path>>(exec);
try {
for (FSDownload fsd : pending) {
queue.submit(fsd);
}
- Map<LocalResource,Path> results = new HashMap();
+ Map<org.apache.hadoop.yarn.api.records.LocalResource,Path> results = new HashMap();
for (int i = 0; i < 10; ++i) {
- Future<Map<LocalResource,Path>> result = queue.take();
+ Future<Map<org.apache.hadoop.yarn.api.records.LocalResource,Path>> result = queue.take();
results.putAll(result.get());
}
- for (Map.Entry<LocalResource,Path> localized : results.entrySet()) {
+ for (Map.Entry<org.apache.hadoop.yarn.api.records.LocalResource,Path> localized : results.entrySet()) {
assertEquals(
sizes[Integer.valueOf(localized.getValue().getName())],
- localized.getKey().size - 4096 - 16); // bad DU impl + .crc ; sigh
+ localized.getKey().getSize() - 4096 - 16); // bad DU impl + .crc ; sigh
}
} catch (ExecutionException e) {
throw new IOException("Failed exec", e);
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResource.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResource.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResource.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResource.java Thu Mar 31 22:23:22 2011
@@ -22,28 +22,29 @@ import java.net.URISyntaxException;
import java.util.Random;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResource;
-import org.apache.hadoop.yarn.util.AvroUtil;
+import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.LocalResourceType;
-import org.apache.hadoop.yarn.LocalResourceVisibility;
-import static org.apache.hadoop.yarn.LocalResourceType.*;
-import static org.apache.hadoop.yarn.LocalResourceVisibility.*;
+import static org.apache.hadoop.yarn.api.records.LocalResourceType.*;
+import static org.apache.hadoop.yarn.api.records.LocalResourceVisibility.*;
import org.junit.Test;
import static org.junit.Assert.*;
public class TestLocalResource {
- static org.apache.hadoop.yarn.LocalResource getYarnResource(Path p, long size,
+ static org.apache.hadoop.yarn.api.records.LocalResource getYarnResource(Path p, long size,
long timestamp, LocalResourceType type, LocalResourceVisibility state)
throws URISyntaxException {
- org.apache.hadoop.yarn.LocalResource ret = new org.apache.hadoop.yarn.LocalResource();
- ret.resource = AvroUtil.getYarnUrlFromURI(p.toUri());
- ret.size = size;
- ret.timestamp = timestamp;
- ret.type = type;
- ret.state = state;
+ org.apache.hadoop.yarn.api.records.LocalResource ret = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(org.apache.hadoop.yarn.api.records.LocalResource.class);
+ ret.setResource(ConverterUtils.getYarnUrlFromURI(p.toUri()));
+ ret.setSize(size);
+ ret.setTimestamp(timestamp);
+ ret.setType(type);
+ ret.setVisibility(state);
return ret;
}
@@ -70,9 +71,9 @@ public class TestLocalResource {
System.out.println("SEED: " + seed);
long basetime = r.nextLong() >>> 2;
- org.apache.hadoop.yarn.LocalResource yA = getYarnResource(
+ org.apache.hadoop.yarn.api.records.LocalResource yA = getYarnResource(
new Path("http://yak.org:80/foobar"), -1, basetime, FILE, PUBLIC);
- org.apache.hadoop.yarn.LocalResource yB = getYarnResource(
+ org.apache.hadoop.yarn.api.records.LocalResource yB = getYarnResource(
new Path("http://yak.org:80/foobar"), -1, basetime, FILE, PUBLIC);
final LocalResource a = new LocalResource(yA);
LocalResource b = new LocalResource(yA);
@@ -118,12 +119,12 @@ public class TestLocalResource {
r.setSeed(seed);
System.out.println("SEED: " + seed);
long basetime = r.nextLong() >>> 2;
- org.apache.hadoop.yarn.LocalResource yA = getYarnResource(
+ org.apache.hadoop.yarn.api.records.LocalResource yA = getYarnResource(
new Path("http://yak.org:80/foobar"), -1, basetime, FILE, PUBLIC);
final LocalResource a = new LocalResource(yA);
// Path primary
- org.apache.hadoop.yarn.LocalResource yB = getYarnResource(
+ org.apache.hadoop.yarn.api.records.LocalResource yB = getYarnResource(
new Path("http://yak.org:80/foobaz"), -1, basetime, FILE, PUBLIC);
LocalResource b = new LocalResource(yB);
assertTrue(0 > a.compareTo(b));
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResources.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResources.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResources.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResources.java Thu Mar 31 22:23:22 2011
@@ -34,14 +34,15 @@ import static java.util.concurrent.TimeU
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.AppLocalizationRunnerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResource;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourcesTrackerImpl;
-import org.apache.hadoop.yarn.util.AvroUtil;
+import org.apache.hadoop.yarn.util.ConverterUtils;
-import static org.apache.hadoop.yarn.LocalResourceType.*;
-import static org.apache.hadoop.yarn.LocalResourceVisibility.*;
+import static org.apache.hadoop.yarn.api.records.LocalResourceType.*;
+import static org.apache.hadoop.yarn.api.records.LocalResourceVisibility.*;
import org.junit.Test;
import static org.junit.Assert.*;
@@ -53,24 +54,24 @@ import static org.mockito.Mockito.*;
public class TestLocalResources {
- private static List<org.apache.hadoop.yarn.LocalResource>
+ private static List<org.apache.hadoop.yarn.api.records.LocalResource>
randResources(Random r, int nRsrc) throws URISyntaxException {
- final List<org.apache.hadoop.yarn.LocalResource> ret =
- new ArrayList<org.apache.hadoop.yarn.LocalResource>(nRsrc);
+ final List<org.apache.hadoop.yarn.api.records.LocalResource> ret =
+ new ArrayList<org.apache.hadoop.yarn.api.records.LocalResource>(nRsrc);
Path base = new Path("file:///foo/bar");
long basetime = r.nextLong() >>> 2;
for (int i = 0; i < nRsrc; ++i) {
- org.apache.hadoop.yarn.LocalResource rsrc = new org.apache.hadoop.yarn.LocalResource();
- rsrc.timestamp = basetime + i;
- r.setSeed(rsrc.timestamp);
+ org.apache.hadoop.yarn.api.records.LocalResource rsrc = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(org.apache.hadoop.yarn.api.records.LocalResource.class);
+ rsrc.setTimestamp(basetime + i);
+ r.setSeed(rsrc.getTimestamp());
Path p = new Path(base, String.valueOf(r.nextInt(Integer.MAX_VALUE)));
while (r.nextInt(2) == 1) {
p = new Path(p, String.valueOf(r.nextInt(Integer.MAX_VALUE)));
}
- rsrc.resource = AvroUtil.getYarnUrlFromPath(p);
- rsrc.size = -1;
- rsrc.type = r.nextInt(2) == 1 ? FILE : ARCHIVE;
- rsrc.state = PRIVATE;
+ rsrc.setResource(ConverterUtils.getYarnUrlFromPath(p));
+ rsrc.setSize(-1);
+ rsrc.setType(r.nextInt(2) == 1 ? FILE : ARCHIVE);
+ rsrc.setVisibility(PRIVATE);
System.out.println("RSRC: " + rsrc);
ret.add(rsrc);
@@ -80,7 +81,7 @@ public class TestLocalResources {
private static void verify(
BlockingQueue<Future<Map<LocalResource,Path>>> doneQueue,
- Collection<org.apache.hadoop.yarn.LocalResource> files)
+ Collection<org.apache.hadoop.yarn.api.records.LocalResource> files)
throws ExecutionException, InterruptedException, URISyntaxException {
for (Future<Map<LocalResource,Path>> f = doneQueue.poll(); f != null;
f = doneQueue.poll()) {
@@ -88,7 +89,7 @@ public class TestLocalResources {
assertEquals(1, q.size());
for (Map.Entry<LocalResource,Path> loc : q.entrySet()) {
boolean found = false;
- for (org.apache.hadoop.yarn.LocalResource yrsrc : files) {
+ for (org.apache.hadoop.yarn.api.records.LocalResource yrsrc : files) {
LocalResource rsrc = new LocalResource(yrsrc);
found |= rsrc.equals(loc.getKey());
}
@@ -117,13 +118,13 @@ public class TestLocalResources {
}
static void successfulLoc(LocalResourcesTrackerImpl rsrcMap,
- List<org.apache.hadoop.yarn.LocalResource> rsrc)
+ List<org.apache.hadoop.yarn.api.records.LocalResource> rsrc)
throws InterruptedException, URISyntaxException {
long i = 0;
- for (org.apache.hadoop.yarn.LocalResource yRsrc : rsrc) {
- yRsrc.size = ++i;
- rsrcMap.setSuccess(new LocalResource(yRsrc), yRsrc.size,
- new Path("file:///yak/" + yRsrc.timestamp));
+ for (org.apache.hadoop.yarn.api.records.LocalResource yRsrc : rsrc) {
+ yRsrc.setSize(++i);
+ rsrcMap.setSuccess(new LocalResource(yRsrc), yRsrc.getSize(),
+ new Path("file:///yak/" + yRsrc.getTimestamp()));
}
}
@@ -151,7 +152,7 @@ public class TestLocalResources {
System.out.println("SEED: " + seed);
// shared resource map
LocalResourcesTrackerImpl rsrcMap = new LocalResourcesTrackerImpl();
- List<org.apache.hadoop.yarn.LocalResource> resourcesA = randResources(r, NUM_URIS);
+ List<org.apache.hadoop.yarn.api.records.LocalResource> resourcesA = randResources(r, NUM_URIS);
// set up application A mocks
final BlockingQueue<Future<Map<LocalResource,Path>>> doneQueueA =
@@ -163,7 +164,7 @@ public class TestLocalResources {
new LinkedBlockingQueue<Future<Map<LocalResource,Path>>>();
AppLocalizationRunnerImpl mockAppLocB = getMockAppLoc(doneQueueB, "B");
- Collection<org.apache.hadoop.yarn.LocalResource> todoA =
+ Collection<org.apache.hadoop.yarn.api.records.LocalResource> todoA =
rsrcMap.register(mockAppLocA, resourcesA);
// ensure no rsrc added until reported back
assertEquals(NUM_URIS, todoA.size());
@@ -181,11 +182,11 @@ public class TestLocalResources {
long seed2 = r.nextLong();
r.setSeed(seed2);
System.out.println("SEED: " + seed2);
- List<org.apache.hadoop.yarn.LocalResource> resourcesB =
+ List<org.apache.hadoop.yarn.api.records.LocalResource> resourcesB =
randResources(r, NUM_URIS >>> 1);
resourcesB.addAll(resourcesA.subList(NUM_URIS >>> 2,
NUM_URIS - (NUM_URIS >>> 2)));
- Collection<org.apache.hadoop.yarn.LocalResource> todoB =
+ Collection<org.apache.hadoop.yarn.api.records.LocalResource> todoB =
rsrcMap.register(mockAppLocB, resourcesB);
// all completed A rsrc
assertEquals(NUM_URIS >>> 2, doneQueueB.size());
@@ -211,7 +212,7 @@ public class TestLocalResources {
failedLoc(doneQueueB);
// verify cleared
- Collection<org.apache.hadoop.yarn.LocalResource> todoA2 =
+ Collection<org.apache.hadoop.yarn.api.records.LocalResource> todoA2 =
rsrcMap.register(mockAppLocA, Collections.singletonList(
resourcesA.get(NUM_URIS >>> 1)));
assertEquals(1, todoA2.size());
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Thu Mar 31 22:23:22 2011
@@ -20,12 +20,10 @@ package org.apache.hadoop.yarn.server.re
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.avro.ipc.AvroRemoteException;
import org.apache.avro.ipc.Server;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,15 +32,24 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityInfo;
-import org.apache.hadoop.yarn.AMRMProtocol;
-import org.apache.hadoop.yarn.AMResponse;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.yarn.ApplicationMaster;
-import org.apache.hadoop.yarn.ApplicationStatus;
-import org.apache.hadoop.yarn.Container;
-import org.apache.hadoop.yarn.ResourceRequest;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationMaster;
+import org.apache.hadoop.yarn.api.records.ApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
@@ -63,9 +70,9 @@ AMRMProtocol, EventHandler<ASMEvent<Appl
private ApplicationTokenSecretManager appTokenManager;
private InetSocketAddress masterServiceAddress;
private Server server;
- private Map<ApplicationID, AMResponse> responseMap =
- new HashMap<ApplicationID, AMResponse>();
- private final AMResponse reboot = new AMResponse();
+ private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+ private Map<ApplicationId, AMResponse> responseMap = new HashMap<ApplicationId, AMResponse>();
+ private final AMResponse reboot = recordFactory.newRecordInstance(AMResponse.class);
private final ASMContext asmContext;
public ApplicationMasterService(ApplicationTokenSecretManager appTokenManager,
@@ -74,8 +81,8 @@ AMRMProtocol, EventHandler<ASMEvent<Appl
this.appTokenManager = appTokenManager;
this.applicationsManager = applicationsManager;
this.rScheduler = scheduler;
- this.reboot.reboot = true;
- this.reboot.containers = new ArrayList<Container>();
+ this.reboot.setReboot(true);
+// this.reboot.containers = new ArrayList<Container>();
this.asmContext = asmContext;
}
@@ -101,59 +108,68 @@ AMRMProtocol, EventHandler<ASMEvent<Appl
this.server.start();
super.start();
}
+
@Override
- public Void registerApplicationMaster(ApplicationMaster applicationMaster)
- throws AvroRemoteException {
+ public RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request) throws YarnRemoteException {
+ ApplicationMaster applicationMaster = request.getApplicationMaster();
try {
applicationsManager.registerApplicationMaster(applicationMaster);
} catch(IOException ie) {
LOG.info("Exception registering application ", ie);
throw RPCUtil.getRemoteException(ie);
}
- return null;
+ RegisterApplicationMasterResponse response = recordFactory.newRecordInstance(RegisterApplicationMasterResponse.class);
+ return response;
}
@Override
- public Void finishApplicationMaster(ApplicationMaster applicationMaster)
- throws AvroRemoteException {
+ public FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request) throws YarnRemoteException {
+ ApplicationMaster applicationMaster = request.getApplicationMaster();
try {
applicationsManager.finishApplicationMaster(applicationMaster);
} catch(IOException ie) {
LOG.info("Exception finishing application", ie);
throw RPCUtil.getRemoteException(ie);
}
- return null;
+ FinishApplicationMasterResponse response = recordFactory.newRecordInstance(FinishApplicationMasterResponse.class);
+ return response;
}
@Override
- public AMResponse allocate(ApplicationStatus status,
- List<ResourceRequest> ask, List<Container> release)
- throws AvroRemoteException {
+ public AllocateResponse allocate(AllocateRequest request) throws YarnRemoteException {
+ ApplicationStatus status = request.getApplicationStatus();
+ List<ResourceRequest> ask = request.getAskList();
+ List<Container> release = request.getReleaseList();
try {
/* check if its in cache */
synchronized(responseMap) {
- AMResponse lastResponse = responseMap.get(status.applicationId);
+ AllocateResponse allocateResponse = recordFactory.newRecordInstance(AllocateResponse.class);
+ AMResponse lastResponse = responseMap.get(status.getApplicationId());
if (lastResponse == null) {
- LOG.error("Application doesnt exist in cache " + status.applicationId);
- return reboot;
+ LOG.error("Application doesnt exist in cache " + status.getApplicationId());
+ allocateResponse.setAMResponse(reboot);
+ return allocateResponse;
}
- if ((status.responseID + 1) == lastResponse.responseId) {
+ if ((status.getResponseId() + 1) == lastResponse.getResponseId()) {
/* old heartbeat */
- return lastResponse;
- } else if (status.responseID + 1 < lastResponse.responseId) {
- LOG.error("Invalid responseid from application " + status.applicationId);
- return reboot;
+ allocateResponse.setAMResponse(lastResponse);
+ return allocateResponse;
+ } else if (status.getResponseId() + 1 < lastResponse.getResponseId()) {
+ LOG.error("Invalid responseid from application " + status.getApplicationId());
+ allocateResponse.setAMResponse(reboot);
+ return allocateResponse;
}
applicationsManager.applicationHeartbeat(status);
- List<Container> containers = rScheduler.allocate(status.applicationId, ask, release);
- AMResponse response = new AMResponse();
- response.containers = containers;
- response.responseId = lastResponse.responseId + 1;
- responseMap.put(status.applicationId, response);
- return response;
+ List<Container> containers = rScheduler.allocate(status.getApplicationId(), ask, release);
+ AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
+ response.addAllContainers(containers);
+ response.setResponseId(lastResponse.getResponseId() + 1);
+ responseMap.put(status.getApplicationId(), response);
+ allocateResponse.setAMResponse(response);
+ return allocateResponse;
}
} catch(IOException ie) {
- LOG.info("Error in allocation for " + status.applicationId, ie);
+ LOG.info("Error in allocation for " + status.getApplicationId(), ie);
throw RPCUtil.getRemoteException(ie);
}
}
@@ -169,13 +185,13 @@ AMRMProtocol, EventHandler<ASMEvent<Appl
@Override
public void handle(ASMEvent<ApplicationTrackerEventType> appEvent) {
ApplicationTrackerEventType event = appEvent.getType();
- ApplicationID id = appEvent.getAppContext().getApplicationID();
+ ApplicationId id = appEvent.getAppContext().getApplicationID();
synchronized(responseMap) {
switch (event) {
case ADD:
- AMResponse response = new AMResponse();
- response.containers = null;
- response.responseId = 0;
+ AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
+// response.containers = null;
+ response.setResponseId(0);
responseMap.put(id, response);
break;
case REMOVE:
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Thu Mar 31 22:23:22 2011
@@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.re
import java.io.IOException;
import java.net.InetSocketAddress;
-import org.apache.avro.ipc.AvroRemoteException;
import org.apache.avro.ipc.Server;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -29,12 +28,23 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityInfo;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.yarn.ApplicationMaster;
-import org.apache.hadoop.yarn.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.ClientRMProtocol;
-import org.apache.hadoop.yarn.YarnClusterMetrics;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo;
@@ -52,6 +62,7 @@ public class ClientRMService extends Abs
private ApplicationsManager applicationsManager;
private String clientServiceBindAddress;
private Server server;
+ private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
InetSocketAddress clientBindAddress;
public ClientRMService(ApplicationsManager applicationsManager,
@@ -89,42 +100,49 @@ public class ClientRMService extends Abs
}
@Override
- public ApplicationID getNewApplicationId() throws AvroRemoteException {
- return applicationsManager.getNewApplicationID();
+ public GetNewApplicationIdResponse getNewApplicationId(GetNewApplicationIdRequest request) throws YarnRemoteException {
+ GetNewApplicationIdResponse response = recordFactory.newRecordInstance(GetNewApplicationIdResponse.class);
+ response.setApplicationId(applicationsManager.getNewApplicationID());
+ return response;
}
-
+
@Override
- public ApplicationMaster getApplicationMaster(ApplicationID applicationId)
- throws AvroRemoteException {
- return applicationsManager.getApplicationMaster(applicationId);
+ public GetApplicationMasterResponse getApplicationMaster(GetApplicationMasterRequest request) throws YarnRemoteException {
+ ApplicationId applicationId = request.getApplicationId();
+ GetApplicationMasterResponse response = recordFactory.newRecordInstance(GetApplicationMasterResponse.class);
+ response.setApplicationMaster(applicationsManager.getApplicationMaster(applicationId));
+ return response;
}
- @Override
- public Void submitApplication(ApplicationSubmissionContext context)
- throws AvroRemoteException {
+ public SubmitApplicationResponse submitApplication(SubmitApplicationRequest request) throws YarnRemoteException {
+ ApplicationSubmissionContext context = request.getApplicationSubmissionContext();
try {
applicationsManager.submitApplication(context);
} catch (IOException ie) {
LOG.info("Exception in submitting application", ie);
throw RPCUtil.getRemoteException(ie);
}
- return null;
+ SubmitApplicationResponse response = recordFactory.newRecordInstance(SubmitApplicationResponse.class);
+ return response;
}
@Override
- public Void finishApplication(ApplicationID applicationId)
- throws AvroRemoteException {
+ public FinishApplicationResponse finishApplication(FinishApplicationRequest request) throws YarnRemoteException {
+ ApplicationId applicationId = request.getApplicationId();
try {
applicationsManager.finishApplication(applicationId);
} catch(IOException ie) {
LOG.info("Error finishing application ", ie);
}
- return null;
+ FinishApplicationResponse response = recordFactory.newRecordInstance(FinishApplicationResponse.class);
+ return response;
}
@Override
- public YarnClusterMetrics getClusterMetrics() throws AvroRemoteException {
- return clusterInfo.getClusterMetrics();
+ public GetClusterMetricsResponse getClusterMetrics(GetClusterMetricsRequest request) throws YarnRemoteException {
+ GetClusterMetricsResponse response = recordFactory.newRecordInstance(GetClusterMetricsResponse.class);
+ response.setClusterMetrics(clusterInfo.getClusterMetrics());
+ return response;
}
@Override
@@ -134,4 +152,4 @@ public class ClientRMService extends Abs
}
super.stop();
}
-}
\ No newline at end of file
+}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMLauncher.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMLauncher.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMLauncher.java Thu Mar 31 22:23:22 2011
@@ -40,15 +40,20 @@ import org.apache.hadoop.security.Creden
import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.yarn.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.Container;
-import org.apache.hadoop.yarn.ContainerID;
-import org.apache.hadoop.yarn.ContainerLaunchContext;
-import org.apache.hadoop.yarn.ContainerManager;
-import org.apache.hadoop.yarn.ContainerToken;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.CleanupContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
@@ -68,6 +73,7 @@ public class AMLauncher implements Runna
private final AppContext master;
private final Configuration conf;
+ private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private ApplicationTokenSecretManager applicationTokenSecretManager;
private ClientToAMSecretManager clientToAMSecretManager;
private AMLauncherEventType event;
@@ -91,51 +97,59 @@ public class AMLauncher implements Runna
}
private void connect() throws IOException {
- ContainerID masterContainerID = master.getMasterContainer().id;
+ ContainerId masterContainerID = master.getMasterContainer().getId();
+
containerMgrProxy =
- getContainerMgrProxy(masterContainerID.appID);
+ getContainerMgrProxy(masterContainerID.getAppId());
}
private void launch() throws IOException {
connect();
- ContainerID masterContainerID = master.getMasterContainer().id;
+ ContainerId masterContainerID = master.getMasterContainer().getId();
ApplicationSubmissionContext applicationContext =
master.getSubmissionContext();
LOG.info("Setting up container " + master.getMasterContainer()
+ " for AM " + master.getMaster());
ContainerLaunchContext launchContext =
getLaunchSpec(applicationContext, masterContainerID);
- containerMgrProxy.startContainer(launchContext);
+ StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class);
+ request.setContainerLaunchContext(launchContext);
+ containerMgrProxy.startContainer(request);
LOG.info("Done launching container " + master.getMasterContainer()
+ " for AM " + master.getMaster());
}
private void cleanup() throws IOException {
connect();
- ContainerID containerId = master.getMasterContainer().id;
- containerMgrProxy.stopContainer(containerId);
- containerMgrProxy.cleanupContainer(containerId);
+ ContainerId containerId = master.getMasterContainer().getId();
+ StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class);
+ stopRequest.setContainerId(containerId);
+ containerMgrProxy.stopContainer(stopRequest);
+
+ CleanupContainerRequest cleanupRequest = recordFactory.newRecordInstance(CleanupContainerRequest.class);
+ cleanupRequest.setContainerId(containerId);
+ containerMgrProxy.cleanupContainer(cleanupRequest);
}
private ContainerManager getContainerMgrProxy(
- final ApplicationID applicationID) throws IOException {
+ final ApplicationId applicationID) throws IOException {
Container container = master.getMasterContainer();
- final String containerManagerBindAddress = container.hostName.toString();
+ final String containerManagerBindAddress = container.getHostName();
final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and again.
UserGroupInformation currentUser =
UserGroupInformation.createRemoteUser("TODO"); // TODO
if (UserGroupInformation.isSecurityEnabled()) {
- ContainerToken containerToken = container.containerToken;
+ ContainerToken containerToken = container.getContainerToken();
Token<ContainerTokenIdentifier> token =
new Token<ContainerTokenIdentifier>(
- containerToken.identifier.array(),
- containerToken.password.array(), new Text(
- containerToken.kind.toString()), new Text(
- containerToken.service.toString()));
+ containerToken.getIdentifier().array(),
+ containerToken.getPassword().array(), new Text(
+ containerToken.getKind()), new Text(
+ containerToken.getService()));
currentUser.addToken(token);
}
return currentUser.doAs(new PrivilegedAction<ContainerManager>() {
@@ -149,49 +163,49 @@ public class AMLauncher implements Runna
private ContainerLaunchContext getLaunchSpec(
ApplicationSubmissionContext applicationMasterContext,
- ContainerID containerID) throws IOException {
+ ContainerId containerID) throws IOException {
// Construct the actual Container
- ContainerLaunchContext container = new ContainerLaunchContext();
- container.command = applicationMasterContext.command;
+ ContainerLaunchContext container = recordFactory.newRecordInstance(ContainerLaunchContext.class);
+ container.addAllCommands(applicationMasterContext.getCommandList());
StringBuilder mergedCommand = new StringBuilder();
- for (CharSequence str : container.command) {
+ for (String str : container.getCommandList()) {
mergedCommand.append(str).append(" ");
}
LOG.info("Command to launch container " +
containerID + " : " + mergedCommand);
- container.env = applicationMasterContext.environment;
+ container.addAllEnv(applicationMasterContext.getAllEnvironment());
- container.env.putAll(setupTokensInEnv(applicationMasterContext));
+ container.addAllEnv(setupTokensInEnv(applicationMasterContext));
// Construct the actual Container
- container.id = containerID;
- container.user = applicationMasterContext.user;
- container.resource = applicationMasterContext.masterCapability;
- container.resources = applicationMasterContext.resources_todo;
- container.containerTokens = applicationMasterContext.fsTokens_todo;
+ container.setContainerId(containerID);
+ container.setUser(applicationMasterContext.getUser());
+ container.setResource(applicationMasterContext.getMasterCapability());
+ container.addAllLocalResources(applicationMasterContext.getAllResourcesTodo());
+ container.setContainerTokens(applicationMasterContext.getFsTokensTodo());
return container;
}
- private Map<CharSequence, CharSequence> setupTokensInEnv(
+ private Map<String, String> setupTokensInEnv(
ApplicationSubmissionContext asc)
throws IOException {
- Map<CharSequence, CharSequence> env =
- new HashMap<CharSequence, CharSequence>();
+ Map<String, String> env =
+ new HashMap<String, String>();
if (UserGroupInformation.isSecurityEnabled()) {
// TODO: Security enabled/disabled info should come from RM.
Credentials credentials = new Credentials();
DataInputByteBuffer dibb = new DataInputByteBuffer();
- if (asc.fsTokens_todo != null) {
+ if (asc.getFsTokensTodo() != null) {
// TODO: Don't do this kind of checks everywhere.
- dibb.reset(asc.fsTokens_todo);
+ dibb.reset(asc.getFsTokensTodo());
credentials.readTokenStorageStream(dibb);
}
ApplicationTokenIdentifier id =
- new ApplicationTokenIdentifier(master.getMasterContainer().id.appID);
+ new ApplicationTokenIdentifier(master.getMasterContainer().getId().getAppId());
Token<ApplicationTokenIdentifier> token =
new Token<ApplicationTokenIdentifier>(id,
this.applicationTokenSecretManager);
@@ -213,11 +227,11 @@ public class AMLauncher implements Runna
credentials.addToken(new Text(resolvedAddr), token);
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
- asc.fsTokens_todo = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ asc.setFsTokensTodo(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
ApplicationTokenIdentifier identifier =
new ApplicationTokenIdentifier(
- this.master.getMaster().applicationId);
+ this.master.getMaster().getApplicationId());
SecretKey clientSecretKey =
this.clientToAMSecretManager.getMasterKey(identifier);
String encoded =
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java Thu Mar 31 22:23:22 2011
@@ -31,15 +31,17 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.yarn.ApplicationMaster;
-import org.apache.hadoop.yarn.ApplicationState;
-import org.apache.hadoop.yarn.ApplicationStatus;
-import org.apache.hadoop.yarn.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.Container;
-import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationMaster;
+import org.apache.hadoop.yarn.api.records.ApplicationState;
+import org.apache.hadoop.yarn.api.records.ApplicationStatus;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.ASMContext;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ASMEvent;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.events.ApplicationMasterEvents.ApplicationEventType;
@@ -59,25 +61,25 @@ public class AMTracker extends AbstractS
private long amExpiryInterval;
@SuppressWarnings("rawtypes")
private EventHandler handler;
-
+ private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private int amMaxRetries;
private final ASMContext asmContext;
- private final Map<ApplicationID, ApplicationMasterInfo> applications =
- new ConcurrentHashMap<ApplicationID, ApplicationMasterInfo>();
+ private final Map<ApplicationId, ApplicationMasterInfo> applications =
+ new ConcurrentHashMap<ApplicationId, ApplicationMasterInfo>();
private TreeSet<ApplicationStatus> amExpiryQueue =
new TreeSet<ApplicationStatus>(
new Comparator<ApplicationStatus>() {
public int compare(ApplicationStatus p1, ApplicationStatus p2) {
- if (p1.lastSeen < p2.lastSeen) {
+ if (p1.getLastSeen() < p2.getLastSeen()) {
return -1;
- } else if (p1.lastSeen > p2.lastSeen) {
+ } else if (p1.getLastSeen() > p2.getLastSeen()) {
return 1;
} else {
- return (p1.applicationId.id -
- p2.applicationId.id);
+ return (p1.getApplicationId().getId() -
+ p2.getApplicationId().getId());
}
}
}
@@ -126,7 +128,7 @@ public class AMTracker extends AbstractS
* its alright. We do not want to hold a hold on applications while going
* through the expiry queue.
*/
- List<ApplicationID> expired = new ArrayList<ApplicationID>();
+ List<ApplicationId> expired = new ArrayList<ApplicationId>();
while (!stop) {
ApplicationStatus leastRecent;
long now = System.currentTimeMillis();
@@ -134,19 +136,19 @@ public class AMTracker extends AbstractS
synchronized(amExpiryQueue) {
while ((amExpiryQueue.size() > 0) &&
(leastRecent = amExpiryQueue.first()) != null &&
- ((now - leastRecent.lastSeen) >
+ ((now - leastRecent.getLastSeen()) >
amExpiryInterval)) {
amExpiryQueue.remove(leastRecent);
ApplicationMasterInfo info;
synchronized(applications) {
- info = applications.get(leastRecent.applicationId);
+ info = applications.get(leastRecent.getApplicationId());
}
if (info == null) {
continue;
}
ApplicationStatus status = info.getStatus();
- if ((now - status.lastSeen) > amExpiryInterval) {
- expired.add(status.applicationId);
+ if ((now - status.getLastSeen()) > amExpiryInterval) {
+ expired.add(status.getApplicationId());
} else {
amExpiryQueue.add(status);
}
@@ -161,8 +163,8 @@ public class AMTracker extends AbstractS
}
}
- protected void expireAMs(List<ApplicationID> toExpire) {
- for (ApplicationID app: toExpire) {
+ protected void expireAMs(List<ApplicationId> toExpire) {
+ for (ApplicationId app: toExpire) {
ApplicationMasterInfo am = null;
synchronized (applications) {
am = applications.get(app);
@@ -196,7 +198,7 @@ public class AMTracker extends AbstractS
ApplicationEventType.ALLOCATE, applicationMaster));
}
- public void finish(ApplicationID application) {
+ public void finish(ApplicationId application) {
ApplicationMasterInfo masterInfo = null;
synchronized(applications) {
masterInfo = applications.get(application);
@@ -205,7 +207,7 @@ public class AMTracker extends AbstractS
masterInfo));
}
- public ApplicationMasterInfo get(ApplicationID applicationId) {
+ public ApplicationMasterInfo get(ApplicationId applicationId) {
ApplicationMasterInfo masterInfo = null;
synchronized (applications) {
masterInfo = applications.get(applicationId);
@@ -215,7 +217,7 @@ public class AMTracker extends AbstractS
/* As of now we dont remove applications from the RM */
/* TODO we need to decide on a strategy for expiring done applications */
- public void remove(ApplicationID applicationId) {
+ public void remove(ApplicationId applicationId) {
synchronized (applications) {
applications.remove(applicationId);
}
@@ -238,7 +240,7 @@ public class AMTracker extends AbstractS
}
}
- public void kill(ApplicationID applicationID) {
+ public void kill(ApplicationId applicationID) {
ApplicationMasterInfo masterInfo = null;
synchronized(applications) {
@@ -253,12 +255,12 @@ public class AMTracker extends AbstractS
* machine.
*/
private static class TrackerAppContext implements AppContext {
- private final ApplicationID appID;
+ private final ApplicationId appID;
private final ApplicationMaster master;
private final UnsupportedOperationException notimplemented;
public TrackerAppContext(
- ApplicationID appId, ApplicationMaster master) {
+ ApplicationId appId, ApplicationMaster master) {
this.appID = appId;
this.master = master;
this.notimplemented = new NotImplementedException();
@@ -273,12 +275,12 @@ public class AMTracker extends AbstractS
throw notimplemented;
}
@Override
- public ApplicationID getApplicationID() {
+ public ApplicationId getApplicationID() {
return appID;
}
@Override
public ApplicationStatus getStatus() {
- return master.status;
+ return master.getStatus();
}
@Override
public ApplicationMaster getMaster() {
@@ -294,7 +296,7 @@ public class AMTracker extends AbstractS
}
@Override
public long getLastSeen() {
- return master.status.lastSeen;
+ return master.getStatus().getLastSeen();
}
@Override
public String getName() {
@@ -312,19 +314,19 @@ public class AMTracker extends AbstractS
}
public void heartBeat(ApplicationStatus status) {
- ApplicationMaster master = new ApplicationMaster();
- master.status = status;
- master.applicationId = status.applicationId;
- TrackerAppContext context = new TrackerAppContext(status.applicationId, master);
+ ApplicationMaster master = recordFactory.newRecordInstance(ApplicationMaster.class);
+ master.setStatus(status);
+ master.setApplicationId(status.getApplicationId());
+ TrackerAppContext context = new TrackerAppContext(status.getApplicationId(), master);
handler.handle(new ASMEvent<ApplicationEventType>(ApplicationEventType.STATUSUPDATE,
context));
}
public void registerMaster(ApplicationMaster applicationMaster) {
- applicationMaster.status.lastSeen = System.currentTimeMillis();
+ applicationMaster.getStatus().setLastSeen(System.currentTimeMillis());
ApplicationMasterInfo master = null;
synchronized(applications) {
- master = applications.get(applicationMaster.applicationId);
+ master = applications.get(applicationMaster.getApplicationId());
}
LOG.info("AM registration " + master.getMaster());
TrackerAppContext registrationContext = new TrackerAppContext(
@@ -335,7 +337,7 @@ public class AMTracker extends AbstractS
@Override
public void handle(ASMEvent<ApplicationEventType> event) {
- ApplicationID appID = event.getAppContext().getApplicationID();
+ ApplicationId appID = event.getAppContext().getApplicationID();
ApplicationMasterInfo masterInfo = null;
synchronized(applications) {
masterInfo = applications.get(appID);
@@ -372,4 +374,4 @@ public class AMTracker extends AbstractS
}
}
}
-}
\ No newline at end of file
+}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AppContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AppContext.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AppContext.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AppContext.java Thu Mar 31 22:23:22 2011
@@ -18,12 +18,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.yarn.ApplicationMaster;
-import org.apache.hadoop.yarn.ApplicationStatus;
-import org.apache.hadoop.yarn.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.Container;
-import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationMaster;
+import org.apache.hadoop.yarn.api.records.ApplicationStatus;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Resource;
/**
* The context of an application.
@@ -33,7 +33,7 @@ public interface AppContext {
/**
* the application submission context for this application.
- * @return the {@link ApplicationSubmissionContext} for the submitted
+ * @return the {@link XApplicationSubmissionContext} for the submitted
* application.
*/
public ApplicationSubmissionContext getSubmissionContext();
@@ -48,17 +48,17 @@ public interface AppContext {
* get the application ID for this application
* @return the application id for this application
*/
- public ApplicationID getApplicationID();
+ public ApplicationId getApplicationID();
/**
* get the status of the application
- * @return the {@link ApplicationStatus} of this application
+ * @return the {@link XApplicationStatus} of this application
*/
public ApplicationStatus getStatus();
/**
* the application master for this application.
- * @return the {@link ApplicationMaster} for this application
+ * @return the {@link XApplicationMaster} for this application
*/
public ApplicationMaster getMaster();
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterHandler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterHandler.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterHandler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationMasterHandler.java Thu Mar 31 22:23:22 2011
@@ -22,8 +22,8 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
-import org.apache.hadoop.yarn.ApplicationMaster;
-import org.apache.hadoop.yarn.ApplicationStatus;
+import org.apache.hadoop.yarn.api.records.ApplicationMaster;
+import org.apache.hadoop.yarn.api.records.ApplicationStatus;
/**
* Interface the application master use for application master status