You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by to...@apache.org on 2013/04/01 18:47:34 UTC
svn commit: r1463203 [3/8] - in
/hadoop/common/branches/HDFS-347/hadoop-yarn-project: ./ hadoop-yarn/
hadoop-yarn/bin/ hadoop-yarn/conf/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/
hadoop-yarn/hadoop-yarn-api/src/main/java/org...
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java Mon Apr 1 16:47:16 2013
@@ -23,6 +23,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Shell;
/**
* Plugin to calculate resource information on the system.
@@ -31,6 +32,18 @@ import org.apache.hadoop.util.Reflection
@InterfaceAudience.Private
@InterfaceStability.Unstable
public abstract class ResourceCalculatorPlugin extends Configured {
+
+ protected String processPid = null;
+
+ /**
+ * set the pid of the process for which <code>getProcResourceValues</code>
+ * will be invoked
+ *
+ * @param pid
+ */
+ public void setProcessPid(String pid) {
+ processPid = pid;
+ }
/**
* Obtain the total size of the virtual memory present in the system.
@@ -109,10 +122,12 @@ public abstract class ResourceCalculator
// No class given, try a os specific class
try {
- String osName = System.getProperty("os.name");
- if (osName.startsWith("Linux")) {
+ if (Shell.LINUX) {
return new LinuxResourceCalculatorPlugin();
}
+ if (Shell.WINDOWS) {
+ return new WindowsResourceCalculatorPlugin();
+ }
} catch (SecurityException se) {
// Failed to get Operating System name.
return null;
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorProcessTree.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorProcessTree.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorProcessTree.java Mon Apr 1 16:47:16 2013
@@ -145,14 +145,11 @@ public abstract class ResourceCalculator
}
// No class given, try a os specific class
- try {
- String osName = System.getProperty("os.name");
- if (osName.startsWith("Linux")) {
- return new ProcfsBasedProcessTree(pid);
- }
- } catch (SecurityException se) {
- // Failed to get Operating System name.
- return null;
+ if (ProcfsBasedProcessTree.isAvailable()) {
+ return new ProcfsBasedProcessTree(pid);
+ }
+ if (WindowsBasedProcessTree.isAvailable()) {
+ return new WindowsBasedProcessTree(pid);
}
// Not supported on this system.
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/InfoBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/InfoBlock.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/InfoBlock.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/InfoBlock.java Mon Apr 1 16:47:16 2013
@@ -20,7 +20,11 @@ package org.apache.hadoop.yarn.webapp.vi
import org.apache.hadoop.yarn.webapp.ResponseInfo;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
-import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.*;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TD;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TR;
+
import com.google.inject.Inject;
@@ -47,7 +51,19 @@ public class InfoBlock extends HtmlBlock
String value = String.valueOf(item.value);
if (item.url == null) {
if (!item.isRaw) {
- tr.td(value);
+ TD<TR<TABLE<DIV<Hamlet>>>> td = tr.td();
+ if ( value.lastIndexOf('\n') > 0) {
+ String []lines = value.split("\n");
+ DIV<TD<TR<TABLE<DIV<Hamlet>>>>> singleLineDiv;
+ for ( String line :lines) {
+ singleLineDiv = td.div();
+ singleLineDiv._r(line);
+ singleLineDiv._();
+ }
+ } else {
+ td._r(value);
+ }
+ td._();
} else {
tr.td()._r(value)._();
}
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/JQueryUI.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/JQueryUI.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/JQueryUI.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/JQueryUI.java Mon Apr 1 16:47:16 2013
@@ -107,12 +107,21 @@ public class JQueryUI extends HtmlBlock
protected void initDataTables(List<String> list) {
String defaultInit = "{bJQueryUI: true, sPaginationType: 'full_numbers'}";
+ String stateSaveInit = "bStateSave : true, " +
+ "\"fnStateSave\": function (oSettings, oData) { " +
+ "sessionStorage.setItem( oSettings.sTableId, JSON.stringify(oData) ); }, " +
+ "\"fnStateLoad\": function (oSettings) { " +
+ "return JSON.parse( sessionStorage.getItem(oSettings.sTableId) );}, ";
+
for (String id : split($(DATATABLES_ID))) {
if (Html.isValidId(id)) {
String init = $(initID(DATATABLES, id));
if (init.isEmpty()) {
init = defaultInit;
}
+ // for inserting stateSaveInit
+ int pos = init.indexOf('{') + 1;
+ init = new StringBuffer(init).insert(pos, stateSaveInit).toString();
list.add(join(id,"DataTable = $('#", id, "').dataTable(", init,
").fnSetFilteringDelay(188);"));
String postInit = $(postInitID(DATATABLES, id));
@@ -126,9 +135,12 @@ public class JQueryUI extends HtmlBlock
String init = $(initSelector(DATATABLES));
if (init.isEmpty()) {
init = defaultInit;
- }
+ }
+ int pos = init.indexOf('{') + 1;
+ init = new StringBuffer(init).insert(pos, stateSaveInit).toString();
list.add(join(" $('", escapeJavaScript(selector), "').dataTable(", init,
- ").fnSetFilteringDelay(288);"));
+ ").fnSetFilteringDelay(288);"));
+
}
}
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml Mon Apr 1 16:47:16 2013
@@ -135,8 +135,12 @@
</property>
<property>
- <description>The maximum number of application master retries.</description>
- <name>yarn.resourcemanager.am.max-retries</name>
+ <description>The maximum number of application attempts. It's a global
+ setting for all application masters. Each application master can specify
+ its individual maximum number of application attempts via the API, but the
+ individual number cannot be more than the global upper bound. If it is,
+ the resourcemanager will override it.</description>
+ <name>yarn.resourcemanager.am.max-attempts</name>
<value>1</value>
</property>
@@ -448,6 +452,20 @@
</property>
<property>
+ <description>Whether physical memory limits will be enforced for
+ containers.</description>
+ <name>yarn.nodemanager.pmem-check-enabled</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <description>Whether virtual memory limits will be enforced for
+ containers.</description>
+ <name>yarn.nodemanager.vmem-check-enabled</name>
+ <value>true</value>
+ </property>
+
+ <property>
<description>Ratio between virtual memory to physical memory when
setting memory limits for containers. Container allocations are
expressed in terms of physical memory, and virtual memory usage
@@ -597,6 +615,20 @@
<value>2000</value>
</property>
+ <property>
+ <description>Max time, in seconds, to wait to establish a connection to RM when NM starts.
+ The NM will shutdown if it cannot connect to RM within the specified max time period.
+ If the value is set as -1, then NM will retry forever.</description>
+ <name>yarn.nodemanager.resourcemanager.connect.wait.secs</name>
+ <value>900</value>
+ </property>
+
+ <property>
+ <description>Time interval, in seconds, between each NM attempt to connect to RM.</description>
+ <name>yarn.nodemanager.resourcemanager.connect.retry_interval.secs</name>
+ <value>30</value>
+ </property>
+
<!--Map Reduce configuration-->
<property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java Mon Apr 1 16:47:16 2013
@@ -23,9 +23,9 @@ import junit.framework.Assert;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
-import org.apache.hadoop.yarn.api.records.AMResponse;
-import org.apache.hadoop.yarn.api.records.impl.pb.AMResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
import org.junit.Test;
public class TestRecordFactory {
@@ -35,15 +35,17 @@ public class TestRecordFactory {
RecordFactory pbRecordFactory = RecordFactoryPBImpl.get();
try {
- AMResponse response = pbRecordFactory.newRecordInstance(AMResponse.class);
- Assert.assertEquals(AMResponsePBImpl.class, response.getClass());
+ AllocateResponse response =
+ pbRecordFactory.newRecordInstance(AllocateResponse.class);
+ Assert.assertEquals(AllocateResponsePBImpl.class, response.getClass());
} catch (YarnException e) {
e.printStackTrace();
Assert.fail("Failed to crete record");
}
try {
- AllocateRequest response = pbRecordFactory.newRecordInstance(AllocateRequest.class);
+ AllocateRequest response =
+ pbRecordFactory.newRecordInstance(AllocateRequest.class);
Assert.assertEquals(AllocateRequestPBImpl.class, response.getClass());
} catch (YarnException e) {
e.printStackTrace();
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java Mon Apr 1 16:47:16 2013
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertEqu
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.net.URI;
import java.net.URISyntaxException;
import java.util.EnumSet;
import java.util.HashMap;
@@ -47,10 +48,12 @@ import org.apache.hadoop.fs.FSDataOutput
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -113,7 +116,127 @@ public class TestFSDownload {
return ret;
}
- @Test
+ static LocalResource createTarFile(FileContext files, Path p, int len,
+ Random r, LocalResourceVisibility vis) throws IOException,
+ URISyntaxException {
+
+ FSDataOutputStream outFile = null;
+ try {
+ byte[] bytes = new byte[len];
+ Path tarPath = new Path(p.toString());
+ outFile = files.create(tarPath, EnumSet.of(CREATE, OVERWRITE));
+ r.nextBytes(bytes);
+ outFile.write(bytes);
+ } finally {
+ if (outFile != null)
+ outFile.close();
+ }
+ StringBuffer tarCommand = new StringBuffer();
+ URI u = new URI(p.getParent().toString());
+ tarCommand.append("cd '");
+ tarCommand.append(FileUtil.makeShellPath(u.getPath().toString()));
+ tarCommand.append("' ; ");
+ tarCommand.append("tar -czf " + p.getName() + ".tar " + p.getName());
+ String[] shellCmd = { "bash", "-c", tarCommand.toString() };
+ ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd);
+ shexec.execute();
+ int exitcode = shexec.getExitCode();
+ if (exitcode != 0) {
+ throw new IOException("Error untarring file " + p
+ + ". Tar process exited with exit code " + exitcode);
+ }
+ LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
+ ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(p.toString()
+ + ".tar")));
+ ret.setSize(len);
+ ret.setType(LocalResourceType.ARCHIVE);
+ ret.setVisibility(vis);
+ ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".tar"))
+ .getModificationTime());
+ return ret;
+ }
+
+ static LocalResource createJarFile(FileContext files, Path p, int len,
+ Random r, LocalResourceVisibility vis) throws IOException,
+ URISyntaxException {
+
+ FSDataOutputStream outFile = null;
+ try {
+ byte[] bytes = new byte[len];
+ Path tarPath = new Path(p.toString());
+ outFile = files.create(tarPath, EnumSet.of(CREATE, OVERWRITE));
+ r.nextBytes(bytes);
+ outFile.write(bytes);
+ } finally {
+ if (outFile != null)
+ outFile.close();
+ }
+ StringBuffer tarCommand = new StringBuffer();
+ URI u = new URI(p.getParent().toString());
+ tarCommand.append("cd '");
+ tarCommand.append(FileUtil.makeShellPath(u.getPath().toString()));
+ tarCommand.append("' ; ");
+ tarCommand.append("jar cf " + p.getName() + ".jar " + p.getName());
+ String[] shellCmd = { "bash", "-c", tarCommand.toString() };
+ ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd);
+ shexec.execute();
+ int exitcode = shexec.getExitCode();
+ if (exitcode != 0) {
+ throw new IOException("Error untarring file " + p
+ + ". Tar process exited with exit code " + exitcode);
+ }
+ LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
+ ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(p.toString()
+ + ".jar")));
+ ret.setSize(len);
+ ret.setType(LocalResourceType.ARCHIVE);
+ ret.setVisibility(vis);
+ ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".jar"))
+ .getModificationTime());
+ return ret;
+ }
+
+ static LocalResource createZipFile(FileContext files, Path p, int len,
+ Random r, LocalResourceVisibility vis) throws IOException,
+ URISyntaxException {
+
+ FSDataOutputStream outFile = null;
+ try {
+ byte[] bytes = new byte[len];
+ Path tarPath = new Path(p.toString());
+ outFile = files.create(tarPath, EnumSet.of(CREATE, OVERWRITE));
+ r.nextBytes(bytes);
+ outFile.write(bytes);
+ } finally {
+ if (outFile != null)
+ outFile.close();
+ }
+ StringBuffer zipCommand = new StringBuffer();
+ URI u = new URI(p.getParent().toString());
+ zipCommand.append("cd '");
+ zipCommand.append(FileUtil.makeShellPath(u.getPath().toString()));
+ zipCommand.append("' ; ");
+ zipCommand.append("gzip " + p.getName());
+ String[] shellCmd = { "bash", "-c", zipCommand.toString() };
+ ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd);
+ shexec.execute();
+ int exitcode = shexec.getExitCode();
+ if (exitcode != 0) {
+ throw new IOException("Error untarring file " + p
+ + ". Tar process exited with exit code " + exitcode);
+ }
+ LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
+ ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(p.toString()
+ + ".zip")));
+ ret.setSize(len);
+ ret.setType(LocalResourceType.ARCHIVE);
+ ret.setVisibility(vis);
+ ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".gz"))
+ .getModificationTime());
+ return ret;
+ }
+
+ @Test (timeout=10000)
public void testDownloadBadPublic() throws IOException, URISyntaxException,
InterruptedException {
Configuration conf = new Configuration();
@@ -161,7 +284,7 @@ public class TestFSDownload {
}
}
- @Test
+ @Test (timeout=10000)
public void testDownload() throws IOException, URISyntaxException,
InterruptedException {
Configuration conf = new Configuration();
@@ -229,6 +352,175 @@ public class TestFSDownload {
}
}
+ @SuppressWarnings("deprecation")
+ @Test (timeout=10000)
+ public void testDownloadArchive() throws IOException, URISyntaxException,
+ InterruptedException {
+ Configuration conf = new Configuration();
+ conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
+ FileContext files = FileContext.getLocalFSFileContext(conf);
+ final Path basedir = files.makeQualified(new Path("target",
+ TestFSDownload.class.getSimpleName()));
+ files.mkdir(basedir, null, true);
+ conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
+
+ Random rand = new Random();
+ long sharedSeed = rand.nextLong();
+ rand.setSeed(sharedSeed);
+ System.out.println("SEED: " + sharedSeed);
+
+ Map<LocalResource, Future<Path>> pending = new HashMap<LocalResource, Future<Path>>();
+ ExecutorService exec = Executors.newSingleThreadExecutor();
+ LocalDirAllocator dirs = new LocalDirAllocator(
+ TestFSDownload.class.getName());
+
+ int size = rand.nextInt(512) + 512;
+ LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE;
+
+ Path p = new Path(basedir, "" + 1);
+ LocalResource rsrc = createTarFile(files, p, size, rand, vis);
+ Path destPath = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
+ FSDownload fsd = new FSDownload(files,
+ UserGroupInformation.getCurrentUser(), conf, destPath, rsrc,
+ new Random(sharedSeed));
+ pending.put(rsrc, exec.submit(fsd));
+
+ try {
+ FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus(
+ basedir);
+ for (FileStatus filestatus : filesstatus) {
+ if (filestatus.isDir()) {
+ FileStatus[] childFiles = files.getDefaultFileSystem().listStatus(
+ filestatus.getPath());
+ for (FileStatus childfile : childFiles) {
+ if (childfile.getPath().getName().equalsIgnoreCase("1.tar.tmp")) {
+ Assert.fail("Tmp File should not have been there "
+ + childfile.getPath());
+ }
+ }
+ }
+ }
+ }catch (Exception e) {
+ throw new IOException("Failed exec", e);
+ }
+ finally {
+ exec.shutdown();
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test (timeout=10000)
+ public void testDownloadPatternJar() throws IOException, URISyntaxException,
+ InterruptedException {
+ Configuration conf = new Configuration();
+ conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
+ FileContext files = FileContext.getLocalFSFileContext(conf);
+ final Path basedir = files.makeQualified(new Path("target",
+ TestFSDownload.class.getSimpleName()));
+ files.mkdir(basedir, null, true);
+ conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
+
+ Random rand = new Random();
+ long sharedSeed = rand.nextLong();
+ rand.setSeed(sharedSeed);
+ System.out.println("SEED: " + sharedSeed);
+
+ Map<LocalResource, Future<Path>> pending = new HashMap<LocalResource, Future<Path>>();
+ ExecutorService exec = Executors.newSingleThreadExecutor();
+ LocalDirAllocator dirs = new LocalDirAllocator(
+ TestFSDownload.class.getName());
+
+ int size = rand.nextInt(512) + 512;
+ LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE;
+
+ Path p = new Path(basedir, "" + 1);
+ LocalResource rsrcjar = createJarFile(files, p, size, rand, vis);
+ rsrcjar.setType(LocalResourceType.PATTERN);
+ Path destPathjar = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
+ FSDownload fsdjar = new FSDownload(files,
+ UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrcjar,
+ new Random(sharedSeed));
+ pending.put(rsrcjar, exec.submit(fsdjar));
+
+ try {
+ FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus(
+ basedir);
+ for (FileStatus filestatus : filesstatus) {
+ if (filestatus.isDir()) {
+ FileStatus[] childFiles = files.getDefaultFileSystem().listStatus(
+ filestatus.getPath());
+ for (FileStatus childfile : childFiles) {
+ if (childfile.getPath().getName().equalsIgnoreCase("1.jar.tmp")) {
+ Assert.fail("Tmp File should not have been there "
+ + childfile.getPath());
+ }
+ }
+ }
+ }
+ }catch (Exception e) {
+ throw new IOException("Failed exec", e);
+ }
+ finally {
+ exec.shutdown();
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test (timeout=10000)
+ public void testDownloadArchiveZip() throws IOException, URISyntaxException,
+ InterruptedException {
+ Configuration conf = new Configuration();
+ conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
+ FileContext files = FileContext.getLocalFSFileContext(conf);
+ final Path basedir = files.makeQualified(new Path("target",
+ TestFSDownload.class.getSimpleName()));
+ files.mkdir(basedir, null, true);
+ conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
+
+ Random rand = new Random();
+ long sharedSeed = rand.nextLong();
+ rand.setSeed(sharedSeed);
+ System.out.println("SEED: " + sharedSeed);
+
+ Map<LocalResource, Future<Path>> pending = new HashMap<LocalResource, Future<Path>>();
+ ExecutorService exec = Executors.newSingleThreadExecutor();
+ LocalDirAllocator dirs = new LocalDirAllocator(
+ TestFSDownload.class.getName());
+
+ int size = rand.nextInt(512) + 512;
+ LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE;
+
+ Path p = new Path(basedir, "" + 1);
+ LocalResource rsrczip = createZipFile(files, p, size, rand, vis);
+ Path destPathjar = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
+ FSDownload fsdzip = new FSDownload(files,
+ UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrczip,
+ new Random(sharedSeed));
+ pending.put(rsrczip, exec.submit(fsdzip));
+
+ try {
+ FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus(
+ basedir);
+ for (FileStatus filestatus : filesstatus) {
+ if (filestatus.isDir()) {
+ FileStatus[] childFiles = files.getDefaultFileSystem().listStatus(
+ filestatus.getPath());
+ for (FileStatus childfile : childFiles) {
+ if (childfile.getPath().getName().equalsIgnoreCase("1.gz.tmp")) {
+ Assert.fail("Tmp File should not have been there "
+ + childfile.getPath());
+ }
+ }
+ }
+ }
+ }catch (Exception e) {
+ throw new IOException("Failed exec", e);
+ }
+ finally {
+ exec.shutdown();
+ }
+ }
+
private void verifyPermsRecursively(FileSystem fs,
FileContext files, Path p,
LocalResourceVisibility vis) throws IOException {
@@ -261,7 +553,7 @@ public class TestFSDownload {
}
}
- @Test
+ @Test (timeout=10000)
public void testDirDownload() throws IOException, InterruptedException {
Configuration conf = new Configuration();
FileContext files = FileContext.getLocalFSFileContext(conf);
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestProcfsBasedProcessTree.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestProcfsBasedProcessTree.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestProcfsBasedProcessTree.java Mon Apr 1 16:47:16 2013
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.util;
+import static org.junit.Assert.fail;
+
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
@@ -36,6 +38,7 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
@@ -104,17 +107,21 @@ public class TestProcfsBasedProcessTree
new Path(TEST_ROOT_DIR.getAbsolutePath()), true);
}
- @Test
+ @Test (timeout = 30000)
public void testProcessTree() throws Exception {
+ if (!Shell.LINUX) {
+ System.out
+ .println("ProcfsBasedProcessTree is not available on this system. Not testing");
+ return;
+
+ }
try {
- if (!ProcfsBasedProcessTree.isAvailable()) {
- System.out
- .println("ProcfsBasedProcessTree is not available on this system. Not testing");
- return;
- }
+ Assert.assertTrue(ProcfsBasedProcessTree.isAvailable());
} catch (Exception e) {
LOG.info(StringUtils.stringifyException(e));
+ Assert.assertTrue("ProcfsBaseProcessTree should be available on Linux",
+ false);
return;
}
// create shell script
@@ -183,11 +190,20 @@ public class TestProcfsBasedProcessTree
// destroy the process and all its subprocesses
destroyProcessTree(pid);
- if (isSetsidAvailable()) { // whole processtree should be gone
- Assert.assertFalse("Proceesses in process group live",
- isAnyProcessInTreeAlive(p));
- } else {// process should be gone
- Assert.assertFalse("ProcessTree must have been gone", isAlive(pid));
+ boolean isAlive = true;
+ for (int tries = 100; tries > 0; tries--) {
+ if (isSetsidAvailable()) {// whole processtree
+ isAlive = isAnyProcessInTreeAlive(p);
+ } else {// process
+ isAlive = isAlive(pid);
+ }
+ if (!isAlive) {
+ break;
+ }
+ Thread.sleep(100);
+ }
+ if (isAlive) {
+ fail("ProcessTree shouldn't be alive");
}
LOG.info("Process-tree dump follows: \n" + processTreeDump);
@@ -328,7 +344,7 @@ public class TestProcfsBasedProcessTree
* @throws IOException if there was a problem setting up the
* fake procfs directories or files.
*/
- @Test
+ @Test (timeout = 30000)
public void testCpuAndMemoryForProcessTree() throws IOException {
// test processes
@@ -402,7 +418,7 @@ public class TestProcfsBasedProcessTree
* @throws IOException if there was a problem setting up the
* fake procfs directories or files.
*/
- @Test
+ @Test (timeout = 30000)
public void testMemForOlderProcesses() throws IOException {
// initial list of processes
String[] pids = { "100", "200", "300", "400" };
@@ -509,7 +525,7 @@ public class TestProcfsBasedProcessTree
* @throws IOException if there was a problem setting up the
* fake procfs directories or files.
*/
- @Test
+ @Test (timeout = 30000)
public void testDestroyProcessTree() throws IOException {
// test process
String pid = "100";
@@ -535,7 +551,7 @@ public class TestProcfsBasedProcessTree
*
* @throws IOException
*/
- @Test
+ @Test (timeout = 30000)
public void testProcessTreeDump()
throws IOException {
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestRackResolver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestRackResolver.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestRackResolver.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestRackResolver.java Mon Apr 1 16:47:16 2013
@@ -18,9 +18,13 @@
package org.apache.hadoop.yarn.util;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.net.DNSToSwitchMapping;
@@ -30,9 +34,12 @@ import org.junit.Test;
public class TestRackResolver {
+ private static Log LOG = LogFactory.getLog(TestRackResolver.class);
+
public static final class MyResolver implements DNSToSwitchMapping {
int numHost1 = 0;
+ public static String resolvedHost1 = "host1";
@Override
public List<String> resolve(List<String> hostList) {
@@ -43,7 +50,10 @@ public class TestRackResolver {
if (hostList.isEmpty()) {
return returnList;
}
- if (hostList.get(0).equals("host1")) {
+ LOG.info("Received resolve request for "
+ + hostList.get(0));
+ if (hostList.get(0).equals("host1")
+ || hostList.get(0).equals(resolvedHost1)) {
numHost1++;
returnList.add("/rack1");
}
@@ -53,6 +63,10 @@ public class TestRackResolver {
return returnList;
}
+ @Override
+ public void reloadCachedMappings() {
+ // nothing to do here, since RawScriptBasedMapping has no cache.
+ }
}
@Test
@@ -62,6 +76,12 @@ public class TestRackResolver {
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf);
+ try {
+ InetAddress iaddr = InetAddress.getByName("host1");
+ MyResolver.resolvedHost1 = iaddr.getHostAddress();
+ } catch (UnknownHostException e) {
+ // Ignore if not found
+ }
Node node = RackResolver.resolve("host1");
Assert.assertEquals("/rack1", node.getNetworkLocation());
node = RackResolver.resolve("host1");
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java Mon Apr 1 16:47:16 2013
@@ -18,10 +18,28 @@
package org.apache.hadoop.yarn.server.api.protocolrecords;
-import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
public interface NodeHeartbeatResponse {
- public abstract HeartbeatResponse getHeartbeatResponse();
+ int getResponseId();
+ NodeAction getNodeAction();
+
+ List<ContainerId> getContainersToCleanup();
+
+ List<ApplicationId> getApplicationsToCleanup();
+
+ void setResponseId(int responseId);
+ void setNodeAction(NodeAction action);
+
+ MasterKey getMasterKey();
+ void setMasterKey(MasterKey secretKey);
+
+ void addAllContainersToCleanup(List<ContainerId> containers);
- public abstract void setHeartbeatResponse(HeartbeatResponse heartbeatResponse);
+ void addAllApplicationsToCleanup(List<ApplicationId> applications);
}
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java Mon Apr 1 16:47:16 2013
@@ -18,11 +18,16 @@
package org.apache.hadoop.yarn.server.api.protocolrecords;
-import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
public interface RegisterNodeManagerResponse {
- public abstract RegistrationResponse getRegistrationResponse();
-
- public abstract void setRegistrationResponse(RegistrationResponse registrationResponse);
+ MasterKey getMasterKey();
+
+ void setMasterKey(MasterKey secretKey);
+
+ NodeAction getNodeAction();
+
+ void setNodeAction(NodeAction nodeAction);
}
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java Mon Apr 1 16:47:16 2013
@@ -18,14 +18,25 @@
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ProtoBase;
-import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.HeartbeatResponseProto;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
-import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
-import org.apache.hadoop.yarn.server.api.records.impl.pb.HeartbeatResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
@@ -34,8 +45,9 @@ public class NodeHeartbeatResponsePBImpl
NodeHeartbeatResponseProto.Builder builder = null;
boolean viaProto = false;
- private HeartbeatResponse heartbeatResponse = null;
-
+ private List<ContainerId> containersToCleanup = null;
+ private List<ApplicationId> applicationsToCleanup = null;
+ private MasterKey masterKey = null;
public NodeHeartbeatResponsePBImpl() {
builder = NodeHeartbeatResponseProto.newBuilder();
@@ -54,8 +66,14 @@ public class NodeHeartbeatResponsePBImpl
}
private void mergeLocalToBuilder() {
- if (this.heartbeatResponse != null) {
- builder.setHeartbeatResponse(convertToProtoFormat(this.heartbeatResponse));
+ if (this.containersToCleanup != null) {
+ addContainersToCleanupToProto();
+ }
+ if (this.applicationsToCleanup != null) {
+ addApplicationsToCleanupToProto();
+ }
+ if (this.masterKey != null) {
+ builder.setMasterKey(convertToProtoFormat(this.masterKey));
}
}
@@ -76,34 +94,213 @@ public class NodeHeartbeatResponsePBImpl
@Override
- public HeartbeatResponse getHeartbeatResponse() {
+ public int getResponseId() {
+ NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
+ return (p.getResponseId());
+ }
+
+ @Override
+ public void setResponseId(int responseId) {
+ maybeInitBuilder();
+ builder.setResponseId((responseId));
+ }
+
+ @Override
+ public MasterKey getMasterKey() {
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
- if (this.heartbeatResponse != null) {
- return this.heartbeatResponse;
+ if (this.masterKey != null) {
+ return this.masterKey;
}
- if (!p.hasHeartbeatResponse()) {
+ if (!p.hasMasterKey()) {
return null;
}
- this.heartbeatResponse = convertFromProtoFormat(p.getHeartbeatResponse());
- return this.heartbeatResponse;
+ this.masterKey = convertFromProtoFormat(p.getMasterKey());
+ return this.masterKey;
+ }
+
+ @Override
+ public void setMasterKey(MasterKey masterKey) {
+ maybeInitBuilder();
+ if (masterKey == null)
+ builder.clearMasterKey();
+ this.masterKey = masterKey;
+ }
+
+ @Override
+ public NodeAction getNodeAction() {
+ NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasNodeAction()) {
+ return null;
+ }
+ return (convertFromProtoFormat(p.getNodeAction()));
+ }
+
+ @Override
+ public void setNodeAction(NodeAction nodeAction) {
+ maybeInitBuilder();
+ if (nodeAction == null) {
+ builder.clearNodeAction();
+ return;
+ }
+ builder.setNodeAction(convertToProtoFormat(nodeAction));
+ }
+
+ @Override
+ public List<ContainerId> getContainersToCleanup() {
+ initContainersToCleanup();
+ return this.containersToCleanup;
+ }
+
+ private void initContainersToCleanup() {
+ if (this.containersToCleanup != null) {
+ return;
+ }
+ NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
+ List<ContainerIdProto> list = p.getContainersToCleanupList();
+ this.containersToCleanup = new ArrayList<ContainerId>();
+
+ for (ContainerIdProto c : list) {
+ this.containersToCleanup.add(convertFromProtoFormat(c));
+ }
}
@Override
- public void setHeartbeatResponse(HeartbeatResponse heartbeatResponse) {
+ public void addAllContainersToCleanup(
+ final List<ContainerId> containersToCleanup) {
+ if (containersToCleanup == null)
+ return;
+ initContainersToCleanup();
+ this.containersToCleanup.addAll(containersToCleanup);
+ }
+
+ private void addContainersToCleanupToProto() {
maybeInitBuilder();
- if (heartbeatResponse == null)
- builder.clearHeartbeatResponse();
- this.heartbeatResponse = heartbeatResponse;
+ builder.clearContainersToCleanup();
+ if (containersToCleanup == null)
+ return;
+ Iterable<ContainerIdProto> iterable = new Iterable<ContainerIdProto>() {
+
+ @Override
+ public Iterator<ContainerIdProto> iterator() {
+ return new Iterator<ContainerIdProto>() {
+
+ Iterator<ContainerId> iter = containersToCleanup.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public ContainerIdProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+
+ }
+ };
+
+ }
+ };
+ builder.addAllContainersToCleanup(iterable);
+ }
+
+ @Override
+ public List<ApplicationId> getApplicationsToCleanup() {
+ initApplicationsToCleanup();
+ return this.applicationsToCleanup;
}
- private HeartbeatResponsePBImpl convertFromProtoFormat(HeartbeatResponseProto p) {
- return new HeartbeatResponsePBImpl(p);
+ private void initApplicationsToCleanup() {
+ if (this.applicationsToCleanup != null) {
+ return;
+ }
+ NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
+ List<ApplicationIdProto> list = p.getApplicationsToCleanupList();
+ this.applicationsToCleanup = new ArrayList<ApplicationId>();
+
+ for (ApplicationIdProto c : list) {
+ this.applicationsToCleanup.add(convertFromProtoFormat(c));
+ }
}
- private HeartbeatResponseProto convertToProtoFormat(HeartbeatResponse t) {
- return ((HeartbeatResponsePBImpl)t).getProto();
+ @Override
+ public void addAllApplicationsToCleanup(
+ final List<ApplicationId> applicationsToCleanup) {
+ if (applicationsToCleanup == null)
+ return;
+ initApplicationsToCleanup();
+ this.applicationsToCleanup.addAll(applicationsToCleanup);
}
+ private void addApplicationsToCleanupToProto() {
+ maybeInitBuilder();
+ builder.clearApplicationsToCleanup();
+ if (applicationsToCleanup == null)
+ return;
+ Iterable<ApplicationIdProto> iterable = new Iterable<ApplicationIdProto>() {
+
+ @Override
+ public Iterator<ApplicationIdProto> iterator() {
+ return new Iterator<ApplicationIdProto>() {
+
+ Iterator<ApplicationId> iter = applicationsToCleanup.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public ApplicationIdProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+
+ }
+ };
+
+ }
+ };
+ builder.addAllApplicationsToCleanup(iterable);
+ }
+ private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
+ return new ContainerIdPBImpl(p);
+ }
+
+ private ContainerIdProto convertToProtoFormat(ContainerId t) {
+ return ((ContainerIdPBImpl) t).getProto();
+ }
+
+ private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
+ return new ApplicationIdPBImpl(p);
+ }
+
+ private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+ return ((ApplicationIdPBImpl) t).getProto();
+ }
+
+ private NodeAction convertFromProtoFormat(NodeActionProto p) {
+ return NodeAction.valueOf(p.name());
+ }
+
+ private NodeActionProto convertToProtoFormat(NodeAction t) {
+ return NodeActionProto.valueOf(t.name());
+ }
+
+ private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) {
+ return new MasterKeyPBImpl(p);
+ }
+
+ private MasterKeyProto convertToProtoFormat(MasterKey t) {
+ return ((MasterKeyPBImpl) t).getProto();
+ }
+}
-}
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java Mon Apr 1 16:47:16 2013
@@ -20,12 +20,14 @@ package org.apache.hadoop.yarn.server.ap
import org.apache.hadoop.yarn.api.records.ProtoBase;
-import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.RegistrationResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerResponseProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
-import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
-import org.apache.hadoop.yarn.server.api.records.impl.pb.RegistrationResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
@@ -34,7 +36,7 @@ public class RegisterNodeManagerResponse
RegisterNodeManagerResponseProto.Builder builder = null;
boolean viaProto = false;
- private RegistrationResponse registartionResponse = null;
+ private MasterKey masterKey = null;
private boolean rebuild = false;
@@ -56,9 +58,8 @@ public class RegisterNodeManagerResponse
}
private void mergeLocalToBuilder() {
- if (this.registartionResponse != null) {
- builder.setRegistrationResponse(convertToProtoFormat(this.registartionResponse));
- this.registartionResponse = null;
+ if (this.masterKey != null) {
+ builder.setMasterKey(convertToProtoFormat(this.masterKey));
}
}
@@ -77,39 +78,60 @@ public class RegisterNodeManagerResponse
}
viaProto = false;
}
-
-
+
@Override
- public RegistrationResponse getRegistrationResponse() {
+ public MasterKey getMasterKey() {
RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;
- if (this.registartionResponse != null) {
- return this.registartionResponse;
+ if (this.masterKey != null) {
+ return this.masterKey;
}
- if (!p.hasRegistrationResponse()) {
+ if (!p.hasMasterKey()) {
return null;
}
- this.registartionResponse = convertFromProtoFormat(p.getRegistrationResponse());
- rebuild = true;
- return this.registartionResponse;
+ this.masterKey = convertFromProtoFormat(p.getMasterKey());
+ return this.masterKey;
}
@Override
- public void setRegistrationResponse(RegistrationResponse registrationResponse) {
+ public void setMasterKey(MasterKey masterKey) {
maybeInitBuilder();
- if (registrationResponse == null)
- builder.clearRegistrationResponse();
- this.registartionResponse = registrationResponse;
- rebuild = true;
+ if (masterKey == null)
+ builder.clearMasterKey();
+ this.masterKey = masterKey;
}
- private RegistrationResponsePBImpl convertFromProtoFormat(RegistrationResponseProto p) {
- return new RegistrationResponsePBImpl(p);
+ @Override
+ public NodeAction getNodeAction() {
+ RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;
+ if(!p.hasNodeAction()) {
+ return null;
+ }
+ return convertFromProtoFormat(p.getNodeAction());
}
- private RegistrationResponseProto convertToProtoFormat(RegistrationResponse t) {
- return ((RegistrationResponsePBImpl)t).getProto();
+ @Override
+ public void setNodeAction(NodeAction nodeAction) {
+ maybeInitBuilder();
+ if (nodeAction == null) {
+ builder.clearNodeAction();
+ return;
+ }
+ builder.setNodeAction(convertToProtoFormat(nodeAction));
}
+ private NodeAction convertFromProtoFormat(NodeActionProto p) {
+ return NodeAction.valueOf(p.name());
+ }
+ private NodeActionProto convertToProtoFormat(NodeAction t) {
+ return NodeActionProto.valueOf(t.name());
+ }
+ private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) {
+ return new MasterKeyPBImpl(p);
+ }
+
+ private MasterKeyProto convertToProtoFormat(MasterKey t) {
+ return ((MasterKeyPBImpl)t).getProto();
+ }
}
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto Mon Apr 1 16:47:16 2013
@@ -42,16 +42,3 @@ message MasterKeyProto {
optional bytes bytes = 2;
}
-message RegistrationResponseProto {
- optional MasterKeyProto master_key = 1;
- optional NodeActionProto nodeAction = 2;
-}
-
-message HeartbeatResponseProto {
- optional int32 response_id = 1;
- optional MasterKeyProto master_key = 2;
- optional NodeActionProto nodeAction = 3;
- repeated ContainerIdProto containers_to_cleanup = 4;
- repeated ApplicationIdProto applications_to_cleanup = 5;
-}
-
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto Mon Apr 1 16:47:16 2013
@@ -29,8 +29,10 @@ message RegisterNodeManagerRequestProto
optional int32 http_port = 3;
optional ResourceProto resource = 4;
}
+
message RegisterNodeManagerResponseProto {
- optional RegistrationResponseProto registration_response = 1;
+ optional MasterKeyProto master_key = 1;
+ optional NodeActionProto nodeAction = 2;
}
message NodeHeartbeatRequestProto {
@@ -38,6 +40,11 @@ message NodeHeartbeatRequestProto {
optional MasterKeyProto last_known_master_key = 2;
}
+
message NodeHeartbeatResponseProto {
- optional HeartbeatResponseProto heartbeat_response = 1;
+ optional int32 response_id = 1;
+ optional MasterKeyProto master_key = 2;
+ optional NodeActionProto nodeAction = 3;
+ repeated ContainerIdProto containers_to_cleanup = 4;
+ repeated ApplicationIdProto applications_to_cleanup = 5;
}
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java Mon Apr 1 16:47:16 2013
@@ -25,8 +25,6 @@ import org.apache.hadoop.yarn.factories.
import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRequestPBImpl;
-import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
-import org.apache.hadoop.yarn.server.api.records.impl.pb.HeartbeatResponsePBImpl;
import org.junit.Test;
public class TestRecordFactory {
@@ -34,15 +32,6 @@ public class TestRecordFactory {
@Test
public void testPbRecordFactory() {
RecordFactory pbRecordFactory = RecordFactoryPBImpl.get();
-
- try {
- HeartbeatResponse response = pbRecordFactory.newRecordInstance(HeartbeatResponse.class);
- Assert.assertEquals(HeartbeatResponsePBImpl.class, response.getClass());
- } catch (YarnException e) {
- e.printStackTrace();
- Assert.fail("Failed to crete record");
- }
-
try {
NodeHeartbeatRequest request = pbRecordFactory.newRecordInstance(NodeHeartbeatRequest.class);
Assert.assertEquals(NodeHeartbeatRequestPBImpl.class, request.getClass());
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java Mon Apr 1 16:47:16 2013
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.no
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -35,8 +37,10 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
+import org.apache.hadoop.util.Shell;
public abstract class ContainerExecutor implements Configurable {
@@ -182,6 +186,54 @@ public abstract class ContainerExecutor
readLock.unlock();
}
}
+
+ /**
+ * Return a command to execute the given command in OS shell.
+ * On Windows, the passed in groupId can be used to launch
+ * and associate the given groupId in a process group. On
+ * non-Windows, groupId is ignored.
+ */
+ protected static String[] getRunCommand(String command, String groupId,
+ Configuration conf) {
+ boolean containerSchedPriorityIsSet = false;
+ int containerSchedPriorityAdjustment =
+ YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY;
+
+ if (conf.get(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY) !=
+ null) {
+ containerSchedPriorityIsSet = true;
+ containerSchedPriorityAdjustment = conf
+ .getInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY,
+ YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY);
+ }
+
+ if (Shell.WINDOWS) {
+ return new String[] { Shell.WINUTILS, "task", "create", groupId,
+ "cmd /c " + command };
+ } else {
+ List<String> retCommand = new ArrayList<String>();
+ if (containerSchedPriorityIsSet) {
+ retCommand.addAll(Arrays.asList("nice", "-n",
+ Integer.toString(containerSchedPriorityAdjustment)));
+ }
+ retCommand.addAll(Arrays.asList("bash", "-c", command));
+ return retCommand.toArray(new String[retCommand.size()]);
+ }
+
+ }
+
+ /** Return a command for determining if process with specified pid is alive. */
+ protected static String[] getCheckProcessIsAliveCommand(String pid) {
+ return Shell.WINDOWS ?
+ new String[] { Shell.WINUTILS, "task", "isAlive", pid } :
+ new String[] { "kill", "-0", pid };
+ }
+
+ /** Return a command to send a signal to a given pid */
+ protected static String[] getSignalKillCommand(int code, String pid) {
+ return Shell.WINDOWS ? new String[] { Shell.WINUTILS, "task", "kill", pid } :
+ new String[] { "kill", "-" + code, pid };
+ }
/**
* Is the container still active?
@@ -253,6 +305,9 @@ public abstract class ContainerExecutor
public static final boolean isSetsidAvailable = isSetsidSupported();
private static boolean isSetsidSupported() {
+ if (Shell.WINDOWS) {
+ return true;
+ }
ShellCommandExecutor shexec = null;
boolean setsidSupported = true;
try {
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java Mon Apr 1 16:47:16 2013
@@ -37,6 +37,8 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -53,10 +55,9 @@ public class DefaultContainerExecutor ex
private static final Log LOG = LogFactory
.getLog(DefaultContainerExecutor.class);
- private final FileContext lfs;
+ private static final int WIN_MAX_PATH = 260;
- private static final String WRAPPER_LAUNCH_SCRIPT =
- "default_container_executor.sh";
+ private final FileContext lfs;
public DefaultContainerExecutor() {
try {
@@ -145,15 +146,24 @@ public class DefaultContainerExecutor ex
lfs.util().copy(nmPrivateTokensPath, tokenDst);
// Create new local launch wrapper script
- Path wrapperScriptDst = new Path(containerWorkDir, WRAPPER_LAUNCH_SCRIPT);
- DataOutputStream wrapperScriptOutStream =
- lfs.create(wrapperScriptDst,
- EnumSet.of(CREATE, OVERWRITE));
+ LocalWrapperScriptBuilder sb = Shell.WINDOWS ?
+ new WindowsLocalWrapperScriptBuilder(containerIdStr, containerWorkDir) :
+ new UnixLocalWrapperScriptBuilder(containerWorkDir);
+
+ // Fail fast if attempting to launch the wrapper script would fail due to
+ // Windows path length limitation.
+ if (Shell.WINDOWS &&
+ sb.getWrapperScriptPath().toString().length() > WIN_MAX_PATH) {
+ throw new IOException(String.format(
+ "Cannot launch container using script at path %s, because it exceeds " +
+ "the maximum supported path length of %d characters. Consider " +
+ "configuring shorter directories in %s.", sb.getWrapperScriptPath(),
+ WIN_MAX_PATH, YarnConfiguration.NM_LOCAL_DIRS));
+ }
Path pidFile = getPidFilePath(containerId);
if (pidFile != null) {
- writeLocalWrapperScript(wrapperScriptOutStream, launchDst.toUri()
- .getPath().toString(), pidFile.toString());
+ sb.writeLocalWrapperScript(launchDst, pidFile);
} else {
LOG.info("Container " + containerIdStr
+ " was marked as inactive. Returning terminated error");
@@ -166,12 +176,13 @@ public class DefaultContainerExecutor ex
try {
lfs.setPermission(launchDst,
ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
- lfs.setPermission(wrapperScriptDst,
+ lfs.setPermission(sb.getWrapperScriptPath(),
ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
// Setup command to run
- String[] command = {"bash",
- wrapperScriptDst.toUri().getPath().toString()};
+ String[] command = getRunCommand(sb.getWrapperScriptPath().toString(),
+ containerIdStr, this.getConf());
+
LOG.info("launchContainer: " + Arrays.toString(command));
shExec = new ShellCommandExecutor(
command,
@@ -202,28 +213,85 @@ public class DefaultContainerExecutor ex
return 0;
}
- private void writeLocalWrapperScript(DataOutputStream out,
- String launchScriptDst, String pidFilePath) throws IOException {
- // We need to do a move as writing to a file is not atomic
- // Process reading a file being written to may get garbled data
- // hence write pid to tmp file first followed by a mv
- StringBuilder sb = new StringBuilder("#!/bin/bash\n\n");
- sb.append("echo $$ > " + pidFilePath + ".tmp\n");
- sb.append("/bin/mv -f " + pidFilePath + ".tmp " + pidFilePath + "\n");
- sb.append(ContainerExecutor.isSetsidAvailable? "exec setsid" : "exec");
- sb.append(" /bin/bash ");
- sb.append("\"");
- sb.append(launchScriptDst);
- sb.append("\"\n");
- PrintStream pout = null;
- try {
- pout = new PrintStream(out);
- pout.append(sb);
- } finally {
- if (out != null) {
- out.close();
+ private abstract class LocalWrapperScriptBuilder {
+
+ private final Path wrapperScriptPath;
+
+ public Path getWrapperScriptPath() {
+ return wrapperScriptPath;
+ }
+
+ public void writeLocalWrapperScript(Path launchDst, Path pidFile) throws IOException {
+ DataOutputStream out = null;
+ PrintStream pout = null;
+
+ try {
+ out = lfs.create(wrapperScriptPath, EnumSet.of(CREATE, OVERWRITE));
+ pout = new PrintStream(out);
+ writeLocalWrapperScript(launchDst, pidFile, pout);
+ } finally {
+ IOUtils.cleanup(LOG, pout, out);
}
}
+
+ protected abstract void writeLocalWrapperScript(Path launchDst, Path pidFile,
+ PrintStream pout);
+
+ protected LocalWrapperScriptBuilder(Path wrapperScriptPath) {
+ this.wrapperScriptPath = wrapperScriptPath;
+ }
+ }
+
+ private final class UnixLocalWrapperScriptBuilder
+ extends LocalWrapperScriptBuilder {
+
+ public UnixLocalWrapperScriptBuilder(Path containerWorkDir) {
+ super(new Path(containerWorkDir, "default_container_executor.sh"));
+ }
+
+ @Override
+ public void writeLocalWrapperScript(Path launchDst, Path pidFile,
+ PrintStream pout) {
+
+ // We need to do a move as writing to a file is not atomic
+ // Process reading a file being written to may get garbled data
+ // hence write pid to tmp file first followed by a mv
+ pout.println("#!/bin/bash");
+ pout.println();
+ pout.println("echo $$ > " + pidFile.toString() + ".tmp");
+ pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile);
+ String exec = ContainerExecutor.isSetsidAvailable? "exec setsid" : "exec";
+ pout.println(exec + " /bin/bash -c \"" +
+ launchDst.toUri().getPath().toString() + "\"");
+ }
+ }
+
+ private final class WindowsLocalWrapperScriptBuilder
+ extends LocalWrapperScriptBuilder {
+
+ private final String containerIdStr;
+
+ public WindowsLocalWrapperScriptBuilder(String containerIdStr,
+ Path containerWorkDir) {
+
+ super(new Path(containerWorkDir, "default_container_executor.cmd"));
+ this.containerIdStr = containerIdStr;
+ }
+
+ @Override
+ public void writeLocalWrapperScript(Path launchDst, Path pidFile,
+ PrintStream pout) {
+
+ // On Windows, the pid is the container ID, so that it can also serve as
+ // the name of the job object created by winutils for task management.
+ // Write to temp file followed by atomic move.
+ String normalizedPidFile = new File(pidFile.toString()).getPath();
+ pout.println("@echo " + containerIdStr + " > " + normalizedPidFile +
+ ".tmp");
+ pout.println("@move /Y " + normalizedPidFile + ".tmp " +
+ normalizedPidFile);
+ pout.println("@call " + launchDst.toString());
+ }
}
@Override
@@ -234,17 +302,13 @@ public class DefaultContainerExecutor ex
: pid;
LOG.debug("Sending signal " + signal.getValue() + " to pid " + sigpid
+ " as user " + user);
- try {
- sendSignal(sigpid, Signal.NULL);
- } catch (ExitCodeException e) {
+ if (!containerIsAlive(sigpid)) {
return false;
}
try {
- sendSignal(sigpid, signal);
+ killContainer(sigpid, signal);
} catch (IOException e) {
- try {
- sendSignal(sigpid, Signal.NULL);
- } catch (IOException ignore) {
+ if (!containerIsAlive(sigpid)) {
return false;
}
throw e;
@@ -253,17 +317,33 @@ public class DefaultContainerExecutor ex
}
/**
+ * Returns true if the process with the specified pid is alive.
+ *
+ * @param pid String pid
+ * @return boolean true if the process is alive
+ */
+ private boolean containerIsAlive(String pid) throws IOException {
+ try {
+ new ShellCommandExecutor(getCheckProcessIsAliveCommand(pid)).execute();
+ // successful execution means process is alive
+ return true;
+ }
+ catch (ExitCodeException e) {
+ // failure (non-zero exit code) means process is not alive
+ return false;
+ }
+ }
+
+ /**
* Send a specified signal to the specified pid
*
* @param pid the pid of the process [group] to signal.
* @param signal signal to send
* (for logging).
*/
- protected void sendSignal(String pid, Signal signal) throws IOException {
- ShellCommandExecutor shexec = null;
- String[] arg = { "kill", "-" + signal.getValue(), pid };
- shexec = new ShellCommandExecutor(arg);
- shexec.execute();
+ private void killContainer(String pid, Signal signal) throws IOException {
+ new ShellCommandExecutor(getSignalKillCommand(signal.getValue(), pid))
+ .execute();
}
@Override
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java Mon Apr 1 16:47:16 2013
@@ -50,6 +50,8 @@ public class LinuxContainerExecutor exte
private String containerExecutorExe;
private LCEResourcesHandler resourcesHandler;
+ private boolean containerSchedPriorityIsSet = false;
+ private int containerSchedPriorityAdjustment = 0;
@Override
@@ -61,6 +63,13 @@ public class LinuxContainerExecutor exte
conf.getClass(YarnConfiguration.NM_LINUX_CONTAINER_RESOURCES_HANDLER,
DefaultLCEResourcesHandler.class, LCEResourcesHandler.class), conf);
resourcesHandler.setConf(conf);
+
+ if (conf.get(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY) != null) {
+ containerSchedPriorityIsSet = true;
+ containerSchedPriorityAdjustment = conf
+ .getInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY,
+ YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY);
+ }
}
/**
@@ -114,6 +123,13 @@ public class LinuxContainerExecutor exte
: conf.get(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, defaultPath);
}
+ protected void addSchedPriorityCommand(List<String> command) {
+ if (containerSchedPriorityIsSet) {
+ command.addAll(Arrays.asList("nice", "-n",
+ Integer.toString(containerSchedPriorityAdjustment)));
+ }
+ }
+
@Override
public void init() throws IOException {
// Send command to executor which will just start up,
@@ -145,14 +161,15 @@ public class LinuxContainerExecutor exte
List<String> localDirs, List<String> logDirs)
throws IOException, InterruptedException {
- List<String> command = new ArrayList<String>(
- Arrays.asList(containerExecutorExe,
- user,
- Integer.toString(Commands.INITIALIZE_CONTAINER.getValue()),
- appId,
- nmPrivateContainerTokensPath.toUri().getPath().toString(),
- StringUtils.join(",", localDirs),
- StringUtils.join(",", logDirs)));
+ List<String> command = new ArrayList<String>();
+ addSchedPriorityCommand(command);
+ command.addAll(Arrays.asList(containerExecutorExe,
+ user,
+ Integer.toString(Commands.INITIALIZE_CONTAINER.getValue()),
+ appId,
+ nmPrivateContainerTokensPath.toUri().getPath().toString(),
+ StringUtils.join(",", localDirs),
+ StringUtils.join(",", logDirs)));
File jvm = // use same jvm as parent
new File(new File(System.getProperty("java.home"), "bin"), "java");
@@ -212,7 +229,9 @@ public class LinuxContainerExecutor exte
try {
Path pidFilePath = getPidFilePath(containerId);
if (pidFilePath != null) {
- List<String> command = new ArrayList<String>(Arrays.asList(
+ List<String> command = new ArrayList<String>();
+ addSchedPriorityCommand(command);
+ command.addAll(Arrays.asList(
containerExecutorExe, user, Integer
.toString(Commands.LAUNCH_CONTAINER.getValue()), appId,
containerIdStr, containerWorkDir.toString(),
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java Mon Apr 1 16:47:16 2013
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.no
import java.io.IOException;
import java.net.URI;
-import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
@@ -305,7 +304,7 @@ public class LocalDirsHandlerService ext
ArrayList<String> validPaths = new ArrayList<String>();
for (int i = 0; i < paths.length; ++i) {
try {
- URI uriPath = new URI(paths[i]);
+ URI uriPath = (new Path(paths[i])).toUri();
if (uriPath.getScheme() == null
|| uriPath.getScheme().equals(FILE_SCHEME)) {
validPaths.add(uriPath.getPath());
@@ -316,7 +315,7 @@ public class LocalDirsHandlerService ext
+ " is not a valid path. Path should be with " + FILE_SCHEME
+ " scheme or without scheme");
}
- } catch (URISyntaxException e) {
+ } catch (IllegalArgumentException e) {
LOG.warn(e.getMessage());
throw new YarnException(paths[i]
+ " is not a valid path. Path should be with " + FILE_SCHEME
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Mon Apr 1 16:47:16 2013
@@ -58,6 +58,8 @@ import org.apache.hadoop.yarn.service.Co
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.Records;
+import com.google.common.annotations.VisibleForTesting;
+
public class NodeManager extends CompositeService
implements EventHandler<NodeManagerEvent> {
@@ -113,6 +115,10 @@ public class NodeManager extends Composi
return new WebServer(nmContext, resourceView, aclsManager, dirsHandler);
}
+ protected DeletionService createDeletionService(ContainerExecutor exec) {
+ return new DeletionService(exec);
+ }
+
protected void doSecureLogin() throws IOException {
SecurityUtil.login(getConfig(), YarnConfiguration.NM_KEYTAB,
YarnConfiguration.NM_PRINCIPAL);
@@ -143,7 +149,7 @@ public class NodeManager extends Composi
} catch (IOException e) {
throw new YarnException("Failed to initialize container executor", e);
}
- DeletionService del = new DeletionService(exec);
+ DeletionService del = createDeletionService(exec);
addService(del);
// NodeManager level dispatcher
@@ -350,6 +356,11 @@ public class NodeManager extends Composi
ContainerManagerImpl getContainerManager() {
return containerManager;
}
+
+ @VisibleForTesting
+ Context getNMContext() {
+ return this.context;
+ }
public static void main(String[] args) {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Mon Apr 1 16:47:16 2013
@@ -50,12 +50,12 @@ import org.apache.hadoop.yarn.factory.pr
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
-import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
-import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.service.AbstractService;
@@ -151,7 +151,6 @@ public class NodeStatusUpdaterImpl exten
YarnConfiguration.DEFAULT_NM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_NM_WEBAPP_PORT);
try {
- // this.hostName = InetAddress.getLocalHost().getCanonicalHostName();
this.httpPort = httpBindAddress.getPort();
// Registration has to be in start so that ContainerManager can get the
// perNM tokens needed to authenticate ContainerTokens.
@@ -189,23 +188,91 @@ public class NodeStatusUpdaterImpl exten
}
private void registerWithRM() throws YarnRemoteException {
- this.resourceTracker = getRMClient();
- LOG.info("Connecting to ResourceManager at " + this.rmAddress);
-
- RegisterNodeManagerRequest request = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
+ Configuration conf = getConfig();
+ long rmConnectWaitMS =
+ conf.getInt(
+ YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
+ YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS)
+ * 1000;
+ long rmConnectionRetryIntervalMS =
+ conf.getLong(
+ YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
+ YarnConfiguration
+ .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS)
+ * 1000;
+
+ if(rmConnectionRetryIntervalMS < 0) {
+ throw new YarnException("Invalid Configuration. " +
+ YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS +
+ " should not be negative.");
+ }
+
+ boolean waitForEver = (rmConnectWaitMS == -1000);
+
+ if(! waitForEver) {
+ if(rmConnectWaitMS < 0) {
+ throw new YarnException("Invalid Configuration. " +
+ YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS +
+ " can be -1, but can not be other negative numbers");
+ }
+
+ //try connect once
+ if(rmConnectWaitMS < rmConnectionRetryIntervalMS) {
+ LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS
+ + " is smaller than "
+ + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS
+ + ". Only try connect once.");
+ rmConnectWaitMS = 0;
+ }
+ }
+
+ int rmRetryCount = 0;
+ long waitStartTime = System.currentTimeMillis();
+
+ RegisterNodeManagerRequest request =
+ recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
request.setHttpPort(this.httpPort);
request.setResource(this.totalResource);
request.setNodeId(this.nodeId);
- RegistrationResponse regResponse =
- this.resourceTracker.registerNodeManager(request).getRegistrationResponse();
+ RegisterNodeManagerResponse regNMResponse;
+
+ while(true) {
+ try {
+ rmRetryCount++;
+ LOG.info("Connecting to ResourceManager at " + this.rmAddress
+ + ". current no. of attempts is " + rmRetryCount);
+ this.resourceTracker = getRMClient();
+ regNMResponse =
+ this.resourceTracker.registerNodeManager(request);
+ break;
+ } catch(Throwable e) {
+ LOG.warn("Trying to connect to ResourceManager, " +
+ "current no. of failed attempts is "+rmRetryCount);
+ if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS
+ || waitForEver) {
+ try {
+ LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000
+ + " seconds before next connection retry to RM");
+ Thread.sleep(rmConnectionRetryIntervalMS);
+ } catch(InterruptedException ex) {
+ //done nothing
+ }
+ } else {
+ String errorMessage = "Failed to Connect to RM, " +
+ "no. of failed attempts is "+rmRetryCount;
+ LOG.error(errorMessage,e);
+ throw new YarnException(errorMessage,e);
+ }
+ }
+ }
// if the Resourcemanager instructs NM to shutdown.
- if (NodeAction.SHUTDOWN.equals(regResponse.getNodeAction())) {
+ if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) {
throw new YarnException(
"Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed");
}
if (UserGroupInformation.isSecurityEnabled()) {
- MasterKey masterKey = regResponse.getMasterKey();
+ MasterKey masterKey = regNMResponse.getMasterKey();
// do this now so that its set before we start heartbeating to RM
LOG.info("Security enabled - updating secret keys now");
// It is expected that status updater is started by this point and
@@ -340,8 +407,8 @@ public class NodeStatusUpdaterImpl exten
request.setLastKnownMasterKey(NodeStatusUpdaterImpl.this.context
.getContainerTokenSecretManager().getCurrentKey());
}
- HeartbeatResponse response =
- resourceTracker.nodeHeartbeat(request).getHeartbeatResponse();
+ NodeHeartbeatResponse response =
+ resourceTracker.nodeHeartbeat(request);
// See if the master-key has rolled over
if (isSecurityEnabled()) {
@@ -371,14 +438,14 @@ public class NodeStatusUpdaterImpl exten
lastHeartBeatID = response.getResponseId();
List<ContainerId> containersToCleanup = response
- .getContainersToCleanupList();
+ .getContainersToCleanup();
if (containersToCleanup.size() != 0) {
dispatcher.getEventHandler().handle(
new CMgrCompletedContainersEvent(containersToCleanup,
CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER));
}
List<ApplicationId> appsToCleanup =
- response.getApplicationsToCleanupList();
+ response.getApplicationsToCleanup();
//Only start tracking for keepAlive on FINISH_APP
trackAppsForKeepAlive(appsToCleanup);
if (appsToCleanup.size() != 0) {