You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/04/01 00:23:34 UTC
svn commit: r1087462 [16/20] - in /hadoop/mapreduce/branches/MR-279: ./
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/
mr-client/...
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ApplicationLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ApplicationLocalizer.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ApplicationLocalizer.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ApplicationLocalizer.java Thu Mar 31 22:23:22 2011
@@ -60,17 +60,23 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.FailedLocalizationRequest;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.SuccessfulLocalizationRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerSecurityInfo;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
-import org.apache.hadoop.yarn.util.AvroUtil;
-import org.apache.hadoop.yarn.LocalResource;
-import org.apache.hadoop.yarn.LocalizationProtocol;
-import org.apache.hadoop.yarn.YarnRemoteException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
/**
* Internal class responsible for initializing the job, not intended for users.
@@ -101,8 +107,9 @@ public class ApplicationLocalizer {
private final Configuration conf;
private final List<Path> localDirs;
private final LocalDirAllocator lDirAlloc;
- private final List<org.apache.hadoop.yarn.LocalResource> privateResources;
- private final List<org.apache.hadoop.yarn.LocalResource> applicationResources;
+ private final List<org.apache.hadoop.yarn.api.records.LocalResource> privateResources;
+ private final List<org.apache.hadoop.yarn.api.records.LocalResource> applicationResources;
+ private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
public ApplicationLocalizer(String user, String appId, Path logDir,
List<Path> localDirs) throws IOException {
@@ -125,13 +132,13 @@ public class ApplicationLocalizer {
this.conf = new Configuration();
this.localDirs = setLocalDirs(user, conf, localDirs);
lDirAlloc = new LocalDirAllocator(NM_LOCAL_DIR);
- privateResources = new ArrayList<LocalResource>();
- applicationResources = new ArrayList<LocalResource>();
+ privateResources = new ArrayList<org.apache.hadoop.yarn.api.records.LocalResource>();
+ applicationResources = new ArrayList<org.apache.hadoop.yarn.api.records.LocalResource>();
}
public static void writeLaunchEnv(OutputStream out,
- Map<CharSequence,CharSequence> environment, Map<Path,String> resources,
- List<CharSequence> command, List<Path> appDirs)
+ Map<String,String> environment, Map<Path,String> resources,
+ List<String> command, List<Path> appDirs)
throws IOException {
ShellScriptBuilder sb = new ShellScriptBuilder();
if (System.getenv("YARN_HOME") != null) {
@@ -140,7 +147,7 @@ public class ApplicationLocalizer {
sb.env(YARNApplicationConstants.LOCAL_DIR_ENV,
StringUtils.join(",", appDirs));
if (environment != null) {
- for (Map.Entry<CharSequence,CharSequence> env : environment.entrySet()) {
+ for (Map.Entry<String,String> env : environment.entrySet()) {
sb.env(env.getKey().toString(), env.getValue().toString());
}
}
@@ -154,7 +161,7 @@ public class ApplicationLocalizer {
cmd.add("/bin/bash ");
cmd.add("-c ");
cmd.add("\"");
- for (CharSequence cs : command) {
+ for (String cs : command) {
cmd.add(cs.toString());
cmd.add(" ");
}
@@ -172,12 +179,11 @@ public class ApplicationLocalizer {
}
static void writeResourceDescription(OutputStream out,
- Collection<LocalResource> rsrc) throws IOException {
+ Collection<org.apache.hadoop.yarn.api.records.LocalResource> rsrc) throws IOException {
try {
- BinaryEncoder encoder = new BinaryEncoder(out);
- SpecificDatumWriter writer = new SpecificDatumWriter(LocalResource.class);
- for (LocalResource r : rsrc) {
- writer.write(r, encoder);
+ for (org.apache.hadoop.yarn.api.records.LocalResource r : rsrc) {
+ LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) r;
+ rsrcPb.getProto().writeDelimitedTo(out);
}
} finally {
if (out != null) {
@@ -185,28 +191,62 @@ public class ApplicationLocalizer {
}
}
}
+
+ //TODO PB This part becomes dependent on the PB implementation.
+ //Add an interface which makes this independent of the serialization layer being used.
+
+// static void writeResourceDescription(OutputStream out,
+// Collection<org.apache.hadoop.yarn.api.records.LocalResource> rsrc) throws IOException {
+// try {
+// BinaryEncoder encoder = new BinaryEncoder(out);
+// SpecificDatumWriter writer = new SpecificDatumWriter(org.apache.hadoop.yarn.api.records.LocalResource.class);
+// for (org.apache.hadoop.yarn.api.records.LocalResource r : rsrc) {
+// writer.write(r, encoder);
+// }
+// } finally {
+// if (out != null) {
+// out.close();
+// }
+// }
+// }
private void readResourceDescription(InputStream in) throws IOException {
- BinaryDecoder decoder =
- DecoderFactory.defaultFactory().createBinaryDecoder(in, null);
- SpecificDatumReader<LocalResource> reader =
- new SpecificDatumReader<LocalResource>(LocalResource.class);
- while (!decoder.isEnd()) {
- LocalResource rsrc = reader.read(null, decoder);
- switch (rsrc.state) {
+ while (in.available() != 0) {
+ org.apache.hadoop.yarn.api.records.LocalResource rsrc = new LocalResourcePBImpl(LocalResourceProto.parseDelimitedFrom(in));
+ switch (rsrc.getVisibility()) {
case PRIVATE:
privateResources.add(rsrc);
break;
- // TODO: Commented to put everything in privateResources for now?
- //case APPLICATION:
- // applicationResources.add(rsrc);
- // break;
default:
privateResources.add(rsrc);
break;
}
}
}
+
+ //TODO PB This part becomes dependent on the PB implementation.
+ //Add an interface which makes this independent of the serialization layer being used.
+// private void readResourceDescription(InputStream in) throws IOException {
+// BinaryDecoder decoder =
+// DecoderFactory.defaultFactory().createBinaryDecoder(in, null);
+// SpecificDatumReader<org.apache.hadoop.yarn.api.records.LocalResource> reader =
+// new SpecificDatumReader<org.apache.hadoop.yarn.api.records.LocalResource>(org.apache.hadoop.yarn.api.records.LocalResource.class);
+// while (!decoder.isEnd()) {
+// org.apache.hadoop.yarn.api.records.LocalResource rsrc = reader.read(null, decoder);
+// switch (rsrc.getVisibility()) {
+// case PRIVATE:
+// privateResources.add(rsrc);
+// break;
+// // TODO: Commented to put everything in privateResources for now?
+// //case APPLICATION:
+// // applicationResources.add(rsrc);
+// // break;
+// default:
+// privateResources.add(rsrc);
+// break;
+// }
+// }
+// }
private static List<Path> setLocalDirs(String user, Configuration conf,
List<Path> localdirs) throws IOException {
@@ -233,7 +273,7 @@ public class ApplicationLocalizer {
return cacheDirs;
}
- FSDownload download(LocalDirAllocator lda, LocalResource rsrc)
+ FSDownload download(LocalDirAllocator lda, org.apache.hadoop.yarn.api.records.LocalResource rsrc)
throws IOException {
return new FSDownload(conf, lda, rsrc);
}
@@ -259,33 +299,38 @@ public class ApplicationLocalizer {
pull(applicationLDA, applicationResources, nodeManager);
}
- private void pull(LocalDirAllocator lda, Collection<LocalResource> resources,
+ private void pull(LocalDirAllocator lda, Collection<org.apache.hadoop.yarn.api.records.LocalResource> resources,
LocalizationProtocol nodeManager)
throws IOException, InterruptedException, YarnRemoteException {
ExecutorService exec = Executors.newSingleThreadExecutor();
- CompletionService<Map<LocalResource,Path>> queue =
- new ExecutorCompletionService<Map<LocalResource,Path>>(exec);
- Map<Future<Map<LocalResource,Path>>, LocalResource> pending =
- new HashMap<Future<Map<LocalResource,Path>>, LocalResource>();
- for (LocalResource rsrc : resources) {
+ CompletionService<Map<org.apache.hadoop.yarn.api.records.LocalResource,Path>> queue =
+ new ExecutorCompletionService<Map<org.apache.hadoop.yarn.api.records.LocalResource,Path>>(exec);
+ Map<Future<Map<org.apache.hadoop.yarn.api.records.LocalResource,Path>>, org.apache.hadoop.yarn.api.records.LocalResource> pending =
+ new HashMap<Future<Map<org.apache.hadoop.yarn.api.records.LocalResource,Path>>, org.apache.hadoop.yarn.api.records.LocalResource>();
+ for (org.apache.hadoop.yarn.api.records.LocalResource rsrc : resources) {
FSDownload dThread = download(lda, rsrc);
pending.put(queue.submit(dThread), rsrc);
}
try {
for (int i = 0, n = resources.size(); i < n; ++i) {
- Future<Map<LocalResource,Path>> result = queue.take();
+ Future<Map<org.apache.hadoop.yarn.api.records.LocalResource,Path>> result = queue.take();
try {
- Map<LocalResource,Path> localized = result.get();
- for (Map.Entry<LocalResource,Path> local : result.get().entrySet()) {
- nodeManager.successfulLocalization(user, local.getKey(),
- AvroUtil.getYarnUrlFromPath(local.getValue()));
+ Map<org.apache.hadoop.yarn.api.records.LocalResource,Path> localized = result.get();
+ for (Map.Entry<org.apache.hadoop.yarn.api.records.LocalResource,Path> local : result.get().entrySet()) {
+ SuccessfulLocalizationRequest successfulLocRequest = recordFactory.newRecordInstance(SuccessfulLocalizationRequest.class);
+ successfulLocRequest.setUser(user);
+ successfulLocRequest.setResource(local.getKey());
+ successfulLocRequest.setPath(ConverterUtils.getYarnUrlFromPath(local.getValue()));
+ nodeManager.successfulLocalization(successfulLocRequest);
pending.remove(result);
}
} catch (ExecutionException e) {
// TODO: Shouldn't we continue localizing other paths?
- nodeManager.failedLocalization(
- user, pending.get(result),
- RPCUtil.getRemoteException(e.getCause()));
+ FailedLocalizationRequest failedLocRequest = recordFactory.newRecordInstance(FailedLocalizationRequest.class);
+ failedLocRequest.setUser(user);
+ failedLocRequest.setResource(pending.get(result));
+ failedLocRequest.setException(RPCUtil.getRemoteException(e.getCause()));
+ nodeManager.failedLocalization(failedLocRequest);
throw new IOException("Failed to localize " +
pending.get(result), e);
}
@@ -293,10 +338,13 @@ public class ApplicationLocalizer {
} finally {
YarnRemoteException e = RPCUtil.getRemoteException("Localization failed");
exec.shutdownNow();
- for (LocalResource rsrc : pending.values()) {
+ for (org.apache.hadoop.yarn.api.records.LocalResource rsrc : pending.values()) {
try {
- nodeManager.failedLocalization(
- user, rsrc, RPCUtil.getRemoteException(e));
+ FailedLocalizationRequest failedLocRequest = recordFactory.newRecordInstance(FailedLocalizationRequest.class);
+ failedLocRequest.setUser(user);
+ failedLocRequest.setResource(rsrc);
+ failedLocRequest.setException(RPCUtil.getRemoteException(e));
+ nodeManager.failedLocalization(failedLocRequest);
} catch (YarnRemoteException error) {
LOG.error("Failure cancelling localization", error);
}
@@ -498,8 +546,8 @@ public class ApplicationLocalizer {
out.append(sb);
}
- public void line(CharSequence... command) {
- for (CharSequence s : command) {
+ public void line(String... command) {
+ for (String s : command) {
sb.append(s);
}
sb.append("\n");
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/FSDownload.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/FSDownload.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/FSDownload.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/FSDownload.java Thu Mar 31 22:23:22 2011
@@ -38,11 +38,11 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.RunJar;
-import org.apache.hadoop.yarn.util.AvroUtil;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import static org.apache.hadoop.fs.Options.*;
-import org.apache.hadoop.yarn.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResource;
/**
* Download a single URL to the local disk.
@@ -81,9 +81,9 @@ public class FSDownload implements Calla
private Path copy(Path sCopy, Path dstdir) throws IOException {
Path dCopy = new Path(dstdir, sCopy.getName() + ".tmp");
FileStatus sStat = files.getFileStatus(sCopy);
- if (sStat.getModificationTime() != resource.timestamp) {
+ if (sStat.getModificationTime() != resource.getTimestamp()) {
throw new IOException("Resource " + sCopy +
- " changed on src filesystem (expected " + resource.timestamp +
+ " changed on src filesystem (expected " + resource.getTimestamp() +
", was " + sStat.getModificationTime());
}
files.util().copy(sCopy, dCopy);
@@ -92,7 +92,7 @@ public class FSDownload implements Calla
private long unpack(File localrsrc, File dst) throws IOException {
File destDir = new File(localrsrc.getParent());
- switch (resource.type) {
+ switch (resource.getType()) {
case ARCHIVE:
String lowerDst = dst.getName().toLowerCase();
if (lowerDst.endsWith(".jar")) {
@@ -120,7 +120,7 @@ public class FSDownload implements Calla
public Map<LocalResource,Path> call() throws IOException {
Path sCopy;
try {
- sCopy = AvroUtil.getPathFromYarnURL(resource.resource);
+ sCopy = ConverterUtils.getPathFromYarnURL(resource.getResource());
} catch (URISyntaxException e) {
throw new IOException("Invalid resource", e);
}
@@ -140,8 +140,7 @@ public class FSDownload implements Calla
Path dFinal = files.makeQualified(new Path(dst_work, sCopy.getName()));
try {
Path dTmp = files.makeQualified(copy(sCopy, dst_work));
- resource.size =
- unpack(new File(dTmp.toUri()), new File(dFinal.toUri()));
+ resource.setSize(unpack(new File(dTmp.toUri()), new File(dFinal.toUri())));
files.rename(dst_work, dst, Rename.OVERWRITE);
} catch (IOException e) {
try { files.delete(dst, true); } catch (IOException ignore) { }
@@ -162,15 +161,15 @@ public class FSDownload implements Calla
}
private static long getEstimatedSize(LocalResource rsrc) {
- if (rsrc.size < 0) {
+ if (rsrc.getSize() < 0) {
return -1;
}
- switch (rsrc.type) {
+ switch (rsrc.getType()) {
case ARCHIVE:
- return 5 * rsrc.size;
+ return 5 * rsrc.getSize();
case FILE:
default:
- return rsrc.size;
+ return rsrc.getSize();
}
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResource.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResource.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResource.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResource.java Thu Mar 31 22:23:22 2011
@@ -21,12 +21,12 @@ package org.apache.hadoop.yarn.server.no
import java.net.URISyntaxException;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.util.AvroUtil;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.LocalResourceType;
/**
- * A comparable {@link org.apache.hadoop.yarn.LocalResource}.
+ * A comparable {@link org.apache.hadoop.yarn.XLocalResource}.
*
*/
class LocalResource implements Comparable<LocalResource> {
@@ -40,11 +40,11 @@ class LocalResource implements Comparabl
* @param resource
* @throws URISyntaxException
*/
- public LocalResource(org.apache.hadoop.yarn.LocalResource resource)
+ public LocalResource(org.apache.hadoop.yarn.api.records.LocalResource resource)
throws URISyntaxException {
- this.loc = AvroUtil.getPathFromYarnURL(resource.resource);
- this.timestamp = resource.timestamp;
- this.type = resource.type;
+ this.loc = ConverterUtils.getPathFromYarnURL(resource.getResource());
+ this.timestamp = resource.getTimestamp();
+ this.type = resource.getType();
}
@Override
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java Thu Mar 31 22:23:22 2011
@@ -22,13 +22,13 @@ import java.net.URISyntaxException;
import java.util.Collection;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.YarnRemoteException;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
public interface LocalResourcesTracker {
- Collection<org.apache.hadoop.yarn.LocalResource> register(
+ Collection<org.apache.hadoop.yarn.api.records.LocalResource> register(
AppLocalizationRunnerImpl appLocalizationRunner,
- Collection<org.apache.hadoop.yarn.LocalResource> values)
+ Collection<org.apache.hadoop.yarn.api.records.LocalResource> values)
throws URISyntaxException;
void setSuccess(LocalResource localRsrc, long size, Path pathFromYarnURL)
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Thu Mar 31 22:23:22 2011
@@ -34,7 +34,8 @@ import java.util.concurrent.atomic.Atomi
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.YarnRemoteException;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+
class LocalResourcesTrackerImpl implements LocalResourcesTracker {
@@ -67,13 +68,13 @@ class LocalResourcesTrackerImpl implemen
// TODO replace w/ queue over RPC
/** @return Resources not present in this bundle */
- public Collection<org.apache.hadoop.yarn.LocalResource> register(
+ public Collection<org.apache.hadoop.yarn.api.records.LocalResource> register(
AppLocalizationRunnerImpl app,
- Collection<org.apache.hadoop.yarn.LocalResource> rsrcs)
+ Collection<org.apache.hadoop.yarn.api.records.LocalResource> rsrcs)
throws URISyntaxException {
- ArrayList<org.apache.hadoop.yarn.LocalResource> ret =
- new ArrayList<org.apache.hadoop.yarn.LocalResource>(rsrcs.size());
- for (final org.apache.hadoop.yarn.LocalResource yrsrc : rsrcs) {
+ ArrayList<org.apache.hadoop.yarn.api.records.LocalResource> ret =
+ new ArrayList<org.apache.hadoop.yarn.api.records.LocalResource>(rsrcs.size());
+ for (final org.apache.hadoop.yarn.api.records.LocalResource yrsrc : rsrcs) {
final LocalizedResource cand =
new LocalizedResource(new Callable<Map<LocalResource,Path>>() {
@Override
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Thu Mar 31 22:23:22 2011
@@ -46,16 +46,21 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.LocalizationProtocol;
-import org.apache.hadoop.yarn.URL;
import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.YarnRemoteException;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.FailedLocalizationRequest;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.FailedLocalizationResponse;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.SuccessfulLocalizationRequest;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.SuccessfulLocalizationResponse;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
@@ -68,7 +73,9 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerSecurityInfo;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.util.AvroUtil;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.api.records.URL;
+
public class ResourceLocalizationService extends AbstractService
implements EventHandler<LocalizerEvent>, LocalizationProtocol {
@@ -87,6 +94,7 @@ public class ResourceLocalizationService
private final DeletionService delService;
private final ExecutorService appLocalizerThreadPool =
Executors.newCachedThreadPool();
+ private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
/**
* Map of private resources of users.
@@ -184,10 +192,12 @@ public class ResourceLocalizationService
* @param resource Resource localized
* @param path Location on the local filesystem, or null if failed
*/
+
@Override
- public Void successfulLocalization(CharSequence user,
- org.apache.hadoop.yarn.LocalResource resource, URL path)
- throws YarnRemoteException {
+ public SuccessfulLocalizationResponse successfulLocalization(SuccessfulLocalizationRequest request) throws YarnRemoteException {
+ String user = request.getUser();
+ org.apache.hadoop.yarn.api.records.LocalResource resource = request.getResource();
+ URL path = request.getPath();
// TODO validate request
LocalResourcesTracker userCache = privateRsrc.get(user.toString());
if (null == userCache) {
@@ -195,17 +205,18 @@ public class ResourceLocalizationService
}
try {
userCache.setSuccess(new LocalResource(resource),
- resource.size, AvroUtil.getPathFromYarnURL(path));
+ resource.getSize(), ConverterUtils.getPathFromYarnURL(path));
} catch (Exception e) {
throw RPCUtil.getRemoteException(e);
}
- return null;
+ SuccessfulLocalizationResponse response = recordFactory.newRecordInstance(SuccessfulLocalizationResponse.class);
+ return response;
}
- @Override
- public Void failedLocalization(CharSequence user,
- org.apache.hadoop.yarn.LocalResource resource, YarnRemoteException cause)
- throws YarnRemoteException {
+ public FailedLocalizationResponse failedLocalization(FailedLocalizationRequest request) throws YarnRemoteException {
+ String user = request.getUser();
+ org.apache.hadoop.yarn.api.records.LocalResource resource = request.getResource();
+ YarnRemoteException cause = request.getException();
LocalResourcesTracker userCache = privateRsrc.get(user.toString());
if (null == userCache) {
throw RPCUtil.getRemoteException("Unknown user: " + user);
@@ -215,7 +226,8 @@ public class ResourceLocalizationService
} catch (Exception e) {
throw RPCUtil.getRemoteException(e);
}
- return null;
+ FailedLocalizationResponse response = recordFactory.newRecordInstance(FailedLocalizationResponse.class);
+ return response;
}
@Override
@@ -271,7 +283,7 @@ public class ResourceLocalizationService
// Delete the container directories
userName = container.getUser();;
String containerIDStr = container.toString();
- appIDStr = AvroUtil.toString(container.getContainerID().appID);
+ appIDStr = ConverterUtils.toString(container.getContainerID().getAppId());
for (Path localDir : localDirs) {
Path usersdir = new Path(localDir, ApplicationLocalizer.USERCACHE);
Path userdir =
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/proto/LocalizationProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/proto/LocalizationProtocol.proto?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/proto/LocalizationProtocol.proto (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/proto/LocalizationProtocol.proto Thu Mar 31 22:23:22 2011
@@ -0,0 +1,12 @@
+option java_package = "org.apache.hadoop.yarn.proto";
+option java_outer_classname = "LocalizationProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "yarn_server_nodemanager_service_protos.proto";
+
+
+service LocalizationProtocolService {
+ rpc successfulLocalization(SuccessfulLocalizationRequestProto) returns (SuccessfulLocalizationResponseProto);
+ rpc failedLocalization(FailedLocalizationRequestProto) returns (FailedLocalizationResponseProto);
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto Thu Mar 31 22:23:22 2011
@@ -0,0 +1,22 @@
+option java_package = "org.apache.hadoop.yarn.proto";
+option java_outer_classname = "YarnServerNodemanagerServiceProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "yarn_protos.proto";
+
+message SuccessfulLocalizationRequestProto {
+ optional string user = 1;
+ optional LocalResourceProto resource = 2;
+ optional URLProto path = 3;
+}
+message SuccessfulLocalizationResponseProto {
+}
+
+message FailedLocalizationRequestProto {
+ optional string user = 1;
+ optional LocalResourceProto resource = 2;
+ optional YarnRemoteExceptionProto exception =3;
+}
+message FailedLocalizationResponseProto {
+}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java Thu Mar 31 22:23:22 2011
@@ -22,14 +22,17 @@ import java.util.HashMap;
import junit.framework.Assert;
-import org.apache.avro.ipc.AvroRemoteException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.ContainerID;
-import org.apache.hadoop.yarn.ContainerManager;
-import org.apache.hadoop.yarn.ContainerState;
-import org.apache.hadoop.yarn.ContainerStatus;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
@@ -51,6 +54,8 @@ public class DummyContainerManager exten
private static final Log LOG = LogFactory
.getLog(DummyContainerManager.class);
+ private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+
public DummyContainerManager(Context context, ContainerExecutor exec,
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater) {
super(context, exec, deletionContext, nodeStatusUpdater);
@@ -101,7 +106,7 @@ public class DummyContainerManager exten
@Override
public void handle(ContainersLauncherEvent event) {
Container container = event.getContainer();
- ContainerID containerId = container.getContainerID();
+ ContainerId containerId = container.getContainerID();
switch (event.getType()) {
case LAUNCH_CONTAINER:
dispatcher.getEventHandler().handle(
@@ -119,19 +124,21 @@ public class DummyContainerManager exten
}
public static void waitForContainerState(ContainerManager containerManager,
- ContainerID containerID, ContainerState finalState)
- throws InterruptedException, AvroRemoteException {
+ ContainerId containerID, ContainerState finalState)
+ throws InterruptedException, YarnRemoteException {
+ GetContainerStatusRequest request = recordFactory.newRecordInstance(GetContainerStatusRequest.class);
+ request.setContainerId(containerID);
ContainerStatus containerStatus =
- containerManager.getContainerStatus(containerID);
+ containerManager.getContainerStatus(request).getStatus();
int timeoutSecs = 0;
- while (!containerStatus.state.equals(finalState) && timeoutSecs++ < 20) {
+ while (!containerStatus.getState().equals(finalState) && timeoutSecs++ < 20) {
Thread.sleep(1000);
LOG.info("Waiting for container to get into state " + finalState
- + ". Current state is " + containerStatus.state);
- containerStatus = containerManager.getContainerStatus(containerID);
+ + ". Current state is " + containerStatus.getState());
+ containerStatus = containerManager.getContainerStatus(request).getStatus();
}
- LOG.info("Container state is " + containerStatus.state);
+ LOG.info("Container state is " + containerStatus.getState());
Assert.assertEquals("ContainerState is not correct (timedout)",
- finalState, containerStatus.state);
+ finalState, containerStatus.getState());
}
-}
\ No newline at end of file
+}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/LocalRMInterface.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/LocalRMInterface.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/LocalRMInterface.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/LocalRMInterface.java Thu Mar 31 22:23:22 2011
@@ -18,29 +18,36 @@
package org.apache.hadoop.yarn.server.nodemanager;
-import org.apache.avro.ipc.AvroRemoteException;
-import org.apache.hadoop.yarn.HeartbeatResponse;
-import org.apache.hadoop.yarn.NodeID;
-import org.apache.hadoop.yarn.NodeStatus;
-import org.apache.hadoop.yarn.RegistrationResponse;
-import org.apache.hadoop.yarn.Resource;
-import org.apache.hadoop.yarn.ResourceTracker;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeId;
+import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
public class LocalRMInterface implements ResourceTracker {
+ private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+
@Override
- public RegistrationResponse registerNodeManager(CharSequence node,
- Resource resource) throws AvroRemoteException {
- RegistrationResponse registrationResponse = new RegistrationResponse();
- registrationResponse.nodeID = new NodeID();
- return registrationResponse;
+ public RegisterNodeManagerResponse registerNodeManager(RegisterNodeManagerRequest request) throws YarnRemoteException {
+ String node = request.getNode();
+ Resource resource = request.getResource();
+ RegistrationResponse registrationResponse = recordFactory.newRecordInstance(RegistrationResponse.class);
+ registrationResponse.setNodeId(recordFactory.newRecordInstance(NodeId.class));
+ RegisterNodeManagerResponse response = recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
+ response.setRegistrationResponse(registrationResponse);
+ return response;
}
@Override
- public HeartbeatResponse nodeHeartbeat(NodeStatus nodeStatus)
- throws AvroRemoteException {
- // TODO Auto-generated method stub
- return null;
+ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnRemoteException {
+ NodeHeartbeatResponse response = recordFactory.newRecordInstance(NodeHeartbeatResponse.class);
+ return response;
}
-
}
\ No newline at end of file
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/SyntheticContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/SyntheticContainerLaunch.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/SyntheticContainerLaunch.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/SyntheticContainerLaunch.java Thu Mar 31 22:23:22 2011
@@ -33,54 +33,55 @@ import org.apache.hadoop.io.DataOutputBu
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo;
-import org.apache.hadoop.yarn.util.AvroUtil;
+import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.yarn.ContainerID;
-import org.apache.hadoop.yarn.ContainerLaunchContext;
-import org.apache.hadoop.yarn.ContainerManager;
-import org.apache.hadoop.yarn.LocalResource;
-import org.apache.hadoop.yarn.Resource;
-import org.apache.hadoop.yarn.URL;
-import org.apache.hadoop.yarn.YarnRemoteException;
-import static org.apache.hadoop.yarn.LocalResourceType.*;
-import static org.apache.hadoop.yarn.LocalResourceVisibility.*;
+import static org.apache.hadoop.yarn.api.records.LocalResourceType.*;
+import static org.apache.hadoop.yarn.api.records.LocalResourceVisibility.*;
public class SyntheticContainerLaunch {
static final long clusterTimeStamp = System.nanoTime();
+ private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
static ContainerLaunchContext getContainer(Configuration conf,
int appId, int cId, String user, Path tokens)
throws IOException, URISyntaxException {
- ContainerLaunchContext container = new ContainerLaunchContext();
+ ContainerLaunchContext container = recordFactory.newRecordInstance(ContainerLaunchContext.class);
// id
- ApplicationID appID = new ApplicationID();
- appID.id = appId;
- appID.clusterTimeStamp = clusterTimeStamp;
- container.id = new ContainerID();
- container.id.appID = appID;
- container.id.id = cId;
+ ApplicationId appID = recordFactory.newRecordInstance(ApplicationId.class);
+ appID.setId(appId);
+ appID.setClusterTimestamp(clusterTimeStamp);
+ container.setContainerId(recordFactory.newRecordInstance(ContainerId.class));
+ container.getContainerId().setAppId(appID);
+ container.getContainerId().setId(cId);
// user
- container.user = user;
+ container.setUser(user);
// Resource resource
- container.resource = new Resource();
- container.resource.memory = 1024;
+ container.setResource(recordFactory.newRecordInstance(Resource.class));
+ container.getResource().setMemory(1024);
// union {null, map<LocalResource>} resources_todo;
- container.resources = new HashMap<CharSequence,LocalResource>();
- LocalResource resource = new LocalResource();
- resource.resource = AvroUtil.getYarnUrlFromPath(
- new Path("file:///home/chrisdo/work/hadoop/mapred/CHANGES.txt"));
- resource.size = -1;
- resource.timestamp = 1294684255000L;
- resource.type = FILE;
- resource.state = PRIVATE;
- container.resources.put("dingos", resource);
+ LocalResource resource = recordFactory.newRecordInstance(LocalResource.class);
+ resource.setResource(ConverterUtils.getYarnUrlFromPath(
+ new Path("file:///home/chrisdo/work/hadoop/mapred/CHANGES.txt")));
+ resource.setSize(-1);
+ resource.setTimestamp(1294684255000L);
+ resource.setType(FILE);
+ resource.setVisibility(PRIVATE);
+ container.setLocalResource("dingos", resource);
//union {null, bytes} fsTokens_todo;
Credentials creds = new Credentials();
@@ -89,22 +90,22 @@ public class SyntheticContainerLaunch {
}
DataOutputBuffer buf = new DataOutputBuffer();
creds.writeTokenStorageToStream(buf);
- container.containerTokens =
- ByteBuffer.wrap(buf.getData(), 0, buf.getLength());
+ container.setContainerTokens(
+ ByteBuffer.wrap(buf.getData(), 0, buf.getLength()));
//union {null, map<bytes>} serviceData;
- container.serviceData = new HashMap<CharSequence,ByteBuffer>();
+// container.serviceData = new HashMap<CharSequence,ByteBuffer>();
// map<string> env;
- container.env = new HashMap<CharSequence,CharSequence>();
- container.env.put("MY_OUTPUT_FILE", "yak.txt");
+// container.env = new HashMap<CharSequence,CharSequence>();
+ container.setEnv("MY_OUTPUT_FILE", "yak.txt");
// array<string> command;
- container.command = new ArrayList<CharSequence>();
- container.command.add("cat");
- container.command.add("dingos");
- container.command.add(">");
- container.command.add("${MY_OUTPUT_FILE}");
+// container.command = new ArrayList<CharSequence>();
+ container.addCommand("cat");
+ container.addCommand("dingos");
+ container.addCommand(">");
+ container.addCommand("${MY_OUTPUT_FILE}");
return container;
}
@@ -122,7 +123,9 @@ public class SyntheticContainerLaunch {
ContainerManager client = getClient(conf, nmAddr);
Path tokens = (argv.length > 2) ? new Path(argv[2]) : null;
ContainerLaunchContext ctxt = getContainer(conf, 0, 0, argv[1], tokens);
- client.startContainer(ctxt);
+ StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class);
+ request.setContainerLaunchContext(ctxt);
+ client.startContainer(request);
System.out.println("START: " + ctxt);
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java Thu Mar 31 22:23:22 2011
@@ -43,7 +43,6 @@ import org.apache.hadoop.yarn.server.nod
import static org.apache.hadoop.fs.CreateFlag.*;
-import org.apache.hadoop.yarn.LocalizationProtocol;
import org.junit.AfterClass;
import org.junit.Test;
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java Thu Mar 31 22:23:22 2011
@@ -18,32 +18,35 @@
package org.apache.hadoop.yarn.server.nodemanager;
-import java.util.ArrayList;
-import java.util.HashMap;
-
-import org.apache.avro.ipc.AvroRemoteException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.NodeHealthCheckerService;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.yarn.ContainerID;
-import org.apache.hadoop.yarn.ContainerLaunchContext;
-import org.apache.hadoop.yarn.ContainerState;
-import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.junit.Test;
public class TestEventFlow {
private static final Log LOG = LogFactory.getLog(TestEventFlow.class);
+ private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
@Test
public void testSuccessfulContainerLaunch() throws InterruptedException,
- AvroRemoteException {
+ YarnRemoteException {
Context context = new NMContext();
YarnConfiguration conf = new YarnConfiguration();
@@ -54,13 +57,13 @@ public class TestEventFlow {
NodeStatusUpdater nodeStatusUpdater =
new NodeStatusUpdaterImpl(context, dispatcher, healthChecker) {
@Override
- protected org.apache.hadoop.yarn.ResourceTracker getRMClient() {
+ protected ResourceTracker getRMClient() {
return new LocalRMInterface();
};
@Override
protected void startStatusUpdater() throws InterruptedException,
- AvroRemoteException {
+ YarnRemoteException {
return; // Don't start any updating thread.
}
};
@@ -70,20 +73,22 @@ public class TestEventFlow {
containerManager.init(new Configuration());
containerManager.start();
- ContainerLaunchContext launchContext = new ContainerLaunchContext();
- ContainerID cID = new ContainerID();
- cID.appID = new ApplicationID();
- launchContext.id = cID;
- launchContext.user = "testing";
- launchContext.resource = new Resource();
- launchContext.env = new HashMap<CharSequence, CharSequence>();
- launchContext.command = new ArrayList<CharSequence>();
- containerManager.startContainer(launchContext);
+ ContainerLaunchContext launchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class);
+ ContainerId cID = recordFactory.newRecordInstance(ContainerId.class);
+ cID.setAppId(recordFactory.newRecordInstance(ApplicationId.class));
+ launchContext.setContainerId(cID);
+ launchContext.setUser("testing");
+ launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
+ StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class);
+ request.setContainerLaunchContext(launchContext);
+ containerManager.startContainer(request);
DummyContainerManager.waitForContainerState(containerManager, cID,
ContainerState.RUNNING);
- containerManager.stopContainer(cID);
+ StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class);
+ stopRequest.setContainerId(cID);
+ containerManager.stopContainer(stopRequest);
DummyContainerManager.waitForContainerState(containerManager, cID,
ContainerState.COMPLETE);
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Thu Mar 31 22:23:22 2011
@@ -21,26 +21,31 @@ package org.apache.hadoop.yarn.server.no
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.util.ArrayList;
import java.util.concurrent.ConcurrentMap;
-import org.apache.avro.ipc.AvroRemoteException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.NodeHealthCheckerService;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.yarn.ContainerID;
-import org.apache.hadoop.yarn.ContainerLaunchContext;
-import org.apache.hadoop.yarn.HeartbeatResponse;
-import org.apache.hadoop.yarn.NodeID;
-import org.apache.hadoop.yarn.NodeStatus;
-import org.apache.hadoop.yarn.RegistrationResponse;
-import org.apache.hadoop.yarn.Resource;
-import org.apache.hadoop.yarn.ResourceTracker;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
+import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeId;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.service.Service.STATE;
@@ -54,6 +59,7 @@ public class TestNodeStatusUpdater {
static final Log LOG = LogFactory.getLog(TestNodeStatusUpdater.class);
static final Path basedir =
new Path("target", TestNodeStatusUpdater.class.getName());
+ private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
int heartBeatID = 0;
volatile Error nmStartError = null;
@@ -67,8 +73,9 @@ public class TestNodeStatusUpdater {
}
@Override
- public RegistrationResponse registerNodeManager(CharSequence node,
- Resource resource) throws AvroRemoteException {
+ public RegisterNodeManagerResponse registerNodeManager(RegisterNodeManagerRequest request) throws YarnRemoteException {
+ String node = request.getNode();
+ Resource resource = request.getResource();
LOG.info("Registering " + node);
try {
Assert.assertEquals(InetAddress.getLocalHost().getHostAddress()
@@ -76,84 +83,88 @@ public class TestNodeStatusUpdater {
} catch (UnknownHostException e) {
Assert.fail(e.getMessage());
}
- Assert.assertEquals(5 * 1024, resource.memory);
- RegistrationResponse regResponse = new RegistrationResponse();
- regResponse.nodeID = new NodeID();
- return regResponse;
+ Assert.assertEquals(5 * 1024, resource.getMemory());
+ RegistrationResponse regResponse = recordFactory.newRecordInstance(RegistrationResponse.class);
+ regResponse.setNodeId(recordFactory.newRecordInstance(NodeId.class));
+
+ RegisterNodeManagerResponse response = recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
+ response.setRegistrationResponse(regResponse);
+ return response;
}
- ApplicationID applicationID = new ApplicationID();
- ContainerID firstContainerID = new ContainerID();
- ContainerID secondContainerID = new ContainerID();
+ ApplicationId applicationID = recordFactory.newRecordInstance(ApplicationId.class);
+ ContainerId firstContainerID = recordFactory.newRecordInstance(ContainerId.class);
+ ContainerId secondContainerID = recordFactory.newRecordInstance(ContainerId.class);
@Override
- public HeartbeatResponse nodeHeartbeat(NodeStatus nodeStatus)
- throws AvroRemoteException {
+ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnRemoteException {
+ NodeStatus nodeStatus = request.getNodeStatus();
LOG.info("Got heartbeat number " + heartBeatID);
- nodeStatus.responseId = heartBeatID++;
+ nodeStatus.setResponseId(heartBeatID++);
if (heartBeatID == 1) {
- Assert.assertEquals(0, nodeStatus.containers.size());
+ Assert.assertEquals(0, nodeStatus.getAllContainers().size());
// Give a container to the NM.
- applicationID.id = heartBeatID;
- firstContainerID.appID = applicationID;
- firstContainerID.id = heartBeatID;
- ContainerLaunchContext launchContext = new ContainerLaunchContext();
- launchContext.id = firstContainerID;
- launchContext.resource = new Resource();
- launchContext.resource.memory = 2; // 2GB
+ applicationID.setId(heartBeatID);
+ firstContainerID.setAppId(applicationID);
+ firstContainerID.setId(heartBeatID);
+ ContainerLaunchContext launchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class);
+ launchContext.setContainerId(firstContainerID);
+ launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
+ launchContext.getResource().setMemory(2);
Container container = new ContainerImpl(null, launchContext);
this.context.getContainers().put(firstContainerID, container);
} else if (heartBeatID == 2) {
// Checks on the RM end
Assert.assertEquals("Number of applications should only be one!", 1,
- nodeStatus.containers.size());
+ nodeStatus.getAllContainers().size());
Assert.assertEquals("Number of container for the app should be one!",
- 1, nodeStatus.containers.get(String.valueOf(applicationID.id))
+ 1, nodeStatus.getContainers(String.valueOf(applicationID.getId()))
.size());
Assert.assertEquals(2,
- nodeStatus.containers.get(String.valueOf(applicationID.id))
- .get(0).resource.memory);
+ nodeStatus.getContainers(String.valueOf(applicationID.getId()))
+ .get(0).getResource().getMemory());
// Checks on the NM end
- ConcurrentMap<ContainerID, Container> activeContainers =
+ ConcurrentMap<ContainerId, Container> activeContainers =
this.context.getContainers();
Assert.assertEquals(1, activeContainers.size());
// Give another container to the NM.
- applicationID.id = heartBeatID;
- secondContainerID.appID = applicationID;
- secondContainerID.id = heartBeatID;
- ContainerLaunchContext launchContext = new ContainerLaunchContext();
- launchContext.id = secondContainerID;
- launchContext.resource = new Resource();
- launchContext.resource.memory = 3; // 3GB
+ applicationID.setId(heartBeatID);
+ secondContainerID.setAppId(applicationID);
+ secondContainerID.setId(heartBeatID);
+ ContainerLaunchContext launchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class);
+ launchContext.setContainerId(secondContainerID);
+ launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
+ launchContext.getResource().setMemory(3);
Container container = new ContainerImpl(null, launchContext);
this.context.getContainers().put(secondContainerID, container);
} else if (heartBeatID == 3) {
// Checks on the RM end
Assert.assertEquals("Number of applications should only be one!", 1,
- nodeStatus.containers.size());
+ nodeStatus.getAllContainers().size());
Assert.assertEquals("Number of container for the app should be two!",
- 2, nodeStatus.containers.get(String.valueOf(applicationID.id))
+ 2, nodeStatus.getContainers(String.valueOf(applicationID.getId()))
.size());
Assert.assertEquals(2,
- nodeStatus.containers.get(String.valueOf(applicationID.id))
- .get(0).resource.memory);
+ nodeStatus.getContainers(String.valueOf(applicationID.getId()))
+ .get(0).getResource().getMemory());
Assert.assertEquals(3,
- nodeStatus.containers.get(String.valueOf(applicationID.id))
- .get(1).resource.memory);
+ nodeStatus.getContainers(String.valueOf(applicationID.getId()))
+ .get(1).getResource().getMemory());
// Checks on the NM end
- ConcurrentMap<ContainerID, Container> activeContainers =
+ ConcurrentMap<ContainerId, Container> activeContainers =
this.context.getContainers();
Assert.assertEquals(2, activeContainers.size());
}
- HeartbeatResponse response = new HeartbeatResponse();
- response.responseId = heartBeatID;
- response.containersToCleanup = new ArrayList<org.apache.hadoop.yarn.Container>();
- response.appplicationsToCleanup = new ArrayList<ApplicationID>();
- return response;
+ HeartbeatResponse response = recordFactory.newRecordInstance(HeartbeatResponse.class);
+ response.setResponseId(heartBeatID);
+
+ NodeHeartbeatResponse nhResponse = recordFactory.newRecordInstance(NodeHeartbeatResponse.class);
+ nhResponse.setHeartbeatResponse(response);
+ return nhResponse;
}
}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestRPCFactories.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestRPCFactories.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestRPCFactories.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestRPCFactories.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,96 @@
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.net.InetSocketAddress;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl;
+import org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl;
+import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.FailedLocalizationRequest;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.FailedLocalizationResponse;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.SuccessfulLocalizationRequest;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.SuccessfulLocalizationResponse;
+import org.junit.Test;
+
+public class TestRPCFactories {
+
+
+
+ @Test
+ public void test() {
+ testPbServerFactory();
+
+ testPbClientFactory();
+ }
+
+
+
+ private void testPbServerFactory() {
+ InetSocketAddress addr = new InetSocketAddress(0);
+ Configuration conf = new Configuration();
+ LocalizationProtocol instance = new LocalizationProtocolTestImpl();
+ Server server = null;
+ try {
+ server = RpcServerFactoryPBImpl.get().getServer(LocalizationProtocol.class, instance, addr, conf, null);
+ server.start();
+ } catch (YarnException e) {
+ e.printStackTrace();
+ Assert.fail("Failed to create server");
+ } finally {
+ server.stop();
+ }
+ }
+
+
+ private void testPbClientFactory() {
+ InetSocketAddress addr = new InetSocketAddress(0);
+ System.err.println(addr.getHostName() + addr.getPort());
+ Configuration conf = new Configuration();
+ LocalizationProtocol instance = new LocalizationProtocolTestImpl();
+ Server server = null;
+ try {
+ server = RpcServerFactoryPBImpl.get().getServer(LocalizationProtocol.class, instance, addr, conf, null);
+ server.start();
+ System.err.println(server.getListenerAddress());
+ System.err.println(NetUtils.getConnectAddress(server));
+
+ LocalizationProtocol client = null;
+ try {
+ client = (LocalizationProtocol) RpcClientFactoryPBImpl.get().getClient(LocalizationProtocol.class, 1, NetUtils.getConnectAddress(server), conf);
+ } catch (YarnException e) {
+ e.printStackTrace();
+ Assert.fail("Failed to create client");
+ }
+
+ } catch (YarnException e) {
+ e.printStackTrace();
+ Assert.fail("Failed to create server");
+ } finally {
+ server.stop();
+ }
+ }
+
+ public class LocalizationProtocolTestImpl implements LocalizationProtocol {
+
+ @Override
+ public SuccessfulLocalizationResponse successfulLocalization(
+ SuccessfulLocalizationRequest request) throws YarnRemoteException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public FailedLocalizationResponse failedLocalization(
+ FailedLocalizationRequest request) throws YarnRemoteException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestRecordFactory.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestRecordFactory.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestRecordFactory.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,27 @@
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.FailedLocalizationRequest;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb.FailedLocalizationRequestPBImpl;
+import org.junit.Test;
+
+public class TestRecordFactory {
+
+ @Test
+ public void testPbRecordFactory() {
+ RecordFactory pbRecordFactory = RecordFactoryPBImpl.get();
+
+ try {
+ FailedLocalizationRequest response = pbRecordFactory.newRecordInstance(FailedLocalizationRequest.class);
+ Assert.assertEquals(FailedLocalizationRequestPBImpl.class, response.getClass());
+ } catch (YarnException e) {
+ e.printStackTrace();
+ Assert.fail("Failed to crete record");
+ }
+ }
+
+}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java Thu Mar 31 22:23:22 2011
@@ -24,13 +24,14 @@ import static org.junit.Assert.*;
import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.Service;
-import org.apache.hadoop.yarn.ApplicationID;
import static org.apache.hadoop.yarn.service.Service.STATE.*;
@@ -60,14 +61,14 @@ public class TestAuxServices {
super.stop();
}
@Override
- public void initApp(String user, ApplicationID appId, ByteBuffer data) {
+ public void initApp(String user, ApplicationId appId, ByteBuffer data) {
assertEquals(idef, data.getChar());
assertEquals(expected_appId, data.getInt());
- assertEquals(expected_appId, appId.id);
+ assertEquals(expected_appId, appId.getId());
}
@Override
- public void stopApp(ApplicationID appId) {
- assertEquals(expected_appId, appId.id);
+ public void stopApp(ApplicationId appId) {
+ assertEquals(expected_appId, appId.getId());
}
}
@@ -93,8 +94,8 @@ public class TestAuxServices {
aux.init(conf);
aux.start();
- ApplicationID appId = new ApplicationID();
- appId.id = 65;
+ ApplicationId appId = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class);
+ appId.setId(65);
ByteBuffer buf = ByteBuffer.allocate(6);
buf.putChar('A');
buf.putInt(65);
@@ -102,7 +103,7 @@ public class TestAuxServices {
AuxServicesEvent event = new AuxServicesEvent(
AuxServicesEventType.APPLICATION_INIT, "user0", appId, "Asrv", buf);
aux.handle(event);
- appId.id = 66;
+ appId.setId(66);
event = new AuxServicesEvent(
AuxServicesEventType.APPLICATION_STOP, "user0", appId, "Bsrv", null);
}