You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by su...@apache.org on 2011/09/29 02:42:55 UTC
svn commit: r1177130 [3/7] - in
/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ conf/
hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apa...
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java Thu Sep 29 00:42:47 2011
@@ -206,6 +206,30 @@ public class JobReportPBImpl extends Pro
builder.setJobName((jobName));
}
+ @Override
+ public String getTrackingUrl() {
+ JobReportProtoOrBuilder p = viaProto ? proto : builder;
+ return (p.getTrackingUrl());
+ }
+
+ @Override
+ public void setTrackingUrl(String trackingUrl) {
+ maybeInitBuilder();
+ builder.setTrackingUrl(trackingUrl);
+ }
+
+ @Override
+ public String getDiagnostics() {
+ JobReportProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getDiagnostics();
+ }
+
+ @Override
+ public void setDiagnostics(String diagnostics) {
+ maybeInitBuilder();
+ builder.setDiagnostics(diagnostics);
+ }
+
private JobIdPBImpl convertFromProtoFormat(JobIdProto p) {
return new JobIdPBImpl(p);
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java Thu Sep 29 00:42:47 2011
@@ -489,7 +489,7 @@ public class JobHistoryUtils {
sb.append(address.getHostName());
}
sb.append(":").append(address.getPort());
- sb.append("/yarn/job/"); // TODO This will change when the history server
+ sb.append("/jobhistory/job/"); // TODO This will change when the history server
// understands apps.
// TOOD Use JobId toString once UI stops using _id_id
sb.append("job_").append(appId.getClusterTimestamp());
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Thu Sep 29 00:42:47 2011
@@ -39,14 +39,14 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
-import org.apache.hadoop.mapreduce.v2.MRConstants;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
-import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -167,7 +167,7 @@ public class MRApps extends Apps {
return TaskAttemptStateUI.valueOf(attemptStateStr);
}
- public static void setInitialClasspath(
+ private static void setMRFrameworkClasspath(
Map<String, String> environment) throws IOException {
InputStream classpathFileStream = null;
BufferedReader reader = null;
@@ -182,30 +182,17 @@ public class MRApps extends Apps {
reader = new BufferedReader(new InputStreamReader(classpathFileStream));
String cp = reader.readLine();
if (cp != null) {
- addToClassPath(environment, cp.trim());
+ addToEnvironment(environment, Environment.CLASSPATH.name(), cp.trim());
}
// Put the file itself on classpath for tasks.
- addToClassPath(environment,
+ addToEnvironment(
+ environment,
+ Environment.CLASSPATH.name(),
thisClassLoader.getResource(mrAppGeneratedClasspathFile).getFile());
- // If runtime env is different.
- if (System.getenv().get("YARN_HOME") != null) {
- ShellCommandExecutor exec =
- new ShellCommandExecutor(new String[] {
- System.getenv().get("YARN_HOME") + "/bin/yarn",
- "classpath" });
- exec.execute();
- addToClassPath(environment, exec.getOutput().trim());
- }
-
- // Get yarn mapreduce-app classpath
- if (System.getenv().get("HADOOP_MAPRED_HOME")!= null) {
- ShellCommandExecutor exec =
- new ShellCommandExecutor(new String[] {
- System.getenv().get("HADOOP_MAPRED_HOME") + "/bin/mapred",
- "classpath" });
- exec.execute();
- addToClassPath(environment, exec.getOutput().trim());
+ // Add standard Hadoop classes
+ for (String c : ApplicationConstants.APPLICATION_CLASSPATH) {
+ addToEnvironment(environment, Environment.CLASSPATH.name(), c);
}
} finally {
if (classpathFileStream != null) {
@@ -217,20 +204,35 @@ public class MRApps extends Apps {
}
// TODO: Remove duplicates.
}
+
+ private static final String SYSTEM_PATH_SEPARATOR =
+ System.getProperty("path.separator");
- public static void addToClassPath(
- Map<String, String> environment, String fileName) {
- String classpath = environment.get(CLASSPATH);
- if (classpath == null) {
- classpath = fileName;
+ public static void addToEnvironment(
+ Map<String, String> environment,
+ String variable, String value) {
+ String val = environment.get(variable);
+ if (val == null) {
+ val = value;
} else {
- classpath = classpath + ":" + fileName;
+ val = val + SYSTEM_PATH_SEPARATOR + value;
}
- environment.put(CLASSPATH, classpath);
+ environment.put(variable, val);
}
- public static final String CLASSPATH = "CLASSPATH";
-
+ public static void setClasspath(Map<String, String> environment)
+ throws IOException {
+ MRApps.addToEnvironment(
+ environment,
+ Environment.CLASSPATH.name(),
+ MRJobConfig.JOB_JAR);
+ MRApps.addToEnvironment(
+ environment,
+ Environment.CLASSPATH.name(),
+ Environment.PWD.$() + Path.SEPARATOR + "*");
+ MRApps.setMRFrameworkClasspath(environment);
+ }
+
private static final String STAGING_CONSTANT = ".staging";
public static Path getStagingAreaDir(Configuration conf, String user) {
return new Path(
@@ -241,7 +243,7 @@ public class MRApps extends Apps {
public static String getJobFile(Configuration conf, String user,
org.apache.hadoop.mapreduce.JobID jobId) {
Path jobFile = new Path(MRApps.getStagingAreaDir(conf, user),
- jobId.toString() + Path.SEPARATOR + MRConstants.JOB_CONF_FILE);
+ jobId.toString() + Path.SEPARATOR + MRJobConfig.JOB_CONF_FILE);
return jobFile.toString();
}
@@ -260,12 +262,11 @@ public class MRApps extends Apps {
public static void setupDistributedCache(
Configuration conf,
- Map<String, LocalResource> localResources,
- Map<String, String> env)
+ Map<String, LocalResource> localResources)
throws IOException {
// Cache archives
- parseDistributedCacheArtifacts(conf, localResources, env,
+ parseDistributedCacheArtifacts(conf, localResources,
LocalResourceType.ARCHIVE,
DistributedCache.getCacheArchives(conf),
parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)),
@@ -275,7 +276,7 @@ public class MRApps extends Apps {
// Cache files
parseDistributedCacheArtifacts(conf,
- localResources, env,
+ localResources,
LocalResourceType.FILE,
DistributedCache.getCacheFiles(conf),
parseTimeStamps(DistributedCache.getFileTimestamps(conf)),
@@ -290,7 +291,6 @@ public class MRApps extends Apps {
private static void parseDistributedCacheArtifacts(
Configuration conf,
Map<String, LocalResource> localResources,
- Map<String, String> env,
LocalResourceType type,
URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[],
Path[] pathsToPutOnClasspath) throws IOException {
@@ -339,9 +339,6 @@ public class MRApps extends Apps {
: LocalResourceVisibility.PRIVATE,
sizes[i], timestamps[i])
);
- if (classPaths.containsKey(u.getPath())) {
- MRApps.addToClassPath(env, linkName);
- }
}
}
}
@@ -358,6 +355,42 @@ public class MRApps extends Apps {
}
return result;
}
+
+ public static void setEnvFromInputString(Map<String, String> env,
+ String envString) {
+ if (envString != null && envString.length() > 0) {
+ String childEnvs[] = envString.split(",");
+ for (String cEnv : childEnvs) {
+ String[] parts = cEnv.split("="); // split on '='
+ String value = env.get(parts[0]);
+
+ if (value != null) {
+ // Replace $env with the child's env constructed by NM's
+ // For example: LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp
+ value = parts[1].replace("$" + parts[0], value);
+ } else {
+ // example PATH=$PATH:/tmp
+ value = System.getenv(parts[0]);
+ if (value != null) {
+ // the env key is present in the tt's env
+ value = parts[1].replace("$" + parts[0], value);
+ } else {
+ // check for simple variable substitution
+ // for e.g. ROOT=$HOME
+ String envValue = System.getenv(parts[1].substring(1));
+ if (envValue != null) {
+ value = envValue;
+ } else {
+ // the env key is note present anywhere .. simply set it
+ // example X=$X:/tmp or X=/tmp
+ value = parts[1].replace("$" + parts[0], "");
+ }
+ }
+ }
+ addToEnvironment(env, parts[0], value);
+ }
+ }
+ }
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java Thu Sep 29 00:42:47 2011
@@ -19,27 +19,25 @@
package org.apache.hadoop.mapreduce.v2.util;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.Records;
public class MRBuilderUtils {
- private static final RecordFactory recordFactory = RecordFactoryProvider
- .getRecordFactory(null);
-
public static JobId newJobId(ApplicationId appId, int id) {
- JobId jobId = recordFactory.newRecordInstance(JobId.class);
+ JobId jobId = Records.newRecord(JobId.class);
jobId.setAppId(appId);
jobId.setId(id);
return jobId;
}
public static TaskId newTaskId(JobId jobId, int id, TaskType taskType) {
- TaskId taskId = recordFactory.newRecordInstance(TaskId.class);
+ TaskId taskId = Records.newRecord(TaskId.class);
taskId.setJobId(jobId);
taskId.setId(id);
taskId.setTaskType(taskType);
@@ -48,9 +46,27 @@ public class MRBuilderUtils {
public static TaskAttemptId newTaskAttemptId(TaskId taskId, int attemptId) {
TaskAttemptId taskAttemptId =
- recordFactory.newRecordInstance(TaskAttemptId.class);
+ Records.newRecord(TaskAttemptId.class);
taskAttemptId.setTaskId(taskId);
taskAttemptId.setId(attemptId);
return taskAttemptId;
}
+
+ public static JobReport newJobReport(JobId jobId, String jobName,
+ String userName, JobState state, long startTime, long finishTime,
+ float setupProgress, float mapProgress, float reduceProgress,
+ float cleanupProgress) {
+ JobReport report = Records.newRecord(JobReport.class);
+ report.setJobId(jobId);
+ report.setJobName(jobName);
+ report.setUser(userName);
+ report.setJobState(state);
+ report.setStartTime(startTime);
+ report.setFinishTime(finishTime);
+ report.setSetupProgress(setupProgress);
+ report.setCleanupProgress(cleanupProgress);
+ report.setMapProgress(mapProgress);
+ report.setReduceProgress(reduceProgress);
+ return report;
+ }
}
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto Thu Sep 29 00:42:47 2011
@@ -143,6 +143,8 @@ message JobReportProto {
optional int64 finish_time = 8;
optional string user = 9;
optional string jobName = 10;
+ optional string trackingUrl = 11;
+ optional string diagnostics = 12;
}
enum TaskAttemptCompletionEventStatusProto {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java Thu Sep 29 00:42:47 2011
@@ -19,11 +19,14 @@ package org.apache.hadoop.mapreduce;
import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationState;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationReportPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.QueueInfoPBImpl;
+
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.junit.Test;
@@ -67,4 +70,14 @@ public class TestTypeConverter {
Assert.assertEquals("jobId set incorrectly", 6789, status.getJobID().getId());
Assert.assertEquals("state set incorrectly", JobStatus.State.KILLED, status.getState());
}
+
+ @Test
+ public void testFromYarnQueueInfo() {
+ org.apache.hadoop.yarn.api.records.QueueInfo queueInfo = new QueueInfoPBImpl();
+ queueInfo.setQueueState(org.apache.hadoop.yarn.api.records.QueueState.STOPPED);
+ org.apache.hadoop.mapreduce.QueueInfo returned =
+ TypeConverter.fromYarn(queueInfo, new Configuration());
+ Assert.assertEquals("queueInfo translation didn't work.",
+ returned.getState().toString(), queueInfo.getQueueState().toString().toLowerCase());
+ }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java Thu Sep 29 00:42:47 2011
@@ -25,7 +25,6 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
-import org.apache.hadoop.mapreduce.v2.MRConstants;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -115,7 +114,8 @@ public class TestMRApps {
@Test public void testGetJobFileWithUser() {
Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/my/path/to/staging");
- String jobFile = MRApps.getJobFile(conf, "dummy-user", new JobID("dummy-job", 12345));
+ String jobFile = MRApps.getJobFile(conf, "dummy-user",
+ new JobID("dummy-job", 12345));
assertNotNull("getJobFile results in null.", jobFile);
assertEquals("jobFile with specified user is not as expected.",
"/my/path/to/staging/dummy-user/.staging/job_dummy-job_12345/job.xml", jobFile);
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java Thu Sep 29 00:42:47 2011
@@ -41,6 +41,7 @@ import org.apache.hadoop.mapred.IFile.Re
import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.mapred.Merger.Segment;
import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
/**
@@ -560,7 +561,7 @@ public class BackupStore<K,V> {
private Writer<K,V> createSpillFile() throws IOException {
Path tmp =
- new Path(Constants.OUTPUT + "/backup_" + tid.getId() + "_"
+ new Path(MRJobConfig.OUTPUT + "/backup_" + tid.getId() + "_"
+ (spillNumber++) + ".out");
LOG.info("Created file: " + tmp);
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java Thu Sep 29 00:42:47 2011
@@ -348,6 +348,7 @@ public class JobConf extends Configurati
*/
public static final Level DEFAULT_LOG_LEVEL = Level.INFO;
+
/**
* Construct a map/reduce job configuration.
*/
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java Thu Sep 29 00:42:47 2011
@@ -321,6 +321,10 @@ public class JobStatus extends org.apach
super.setJobACLs(acls);
}
+ public synchronized void setFailureInfo(String failureInfo) {
+ super.setFailureInfo(failureInfo);
+ }
+
/**
* Set the priority of the job, defaulting to NORMAL.
* @param jp new job priority
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MRConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MRConstants.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MRConstants.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MRConstants.java Thu Sep 29 00:42:47 2011
@@ -17,11 +17,16 @@
*/
package org.apache.hadoop.mapred;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
/*******************************
* Some handy constants
*
*******************************/
-interface MRConstants {
+@Private
+@Unstable
+public interface MRConstants {
//
// Timeouts, constants
//
@@ -53,5 +58,6 @@ interface MRConstants {
*/
public static final String FOR_REDUCE_TASK = "for-reduce-task";
+ /** Used in MRv1, mostly in TaskTracker code **/
public static final String WORKDIR = "work";
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MROutputFiles.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MROutputFiles.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MROutputFiles.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MROutputFiles.java Thu Sep 29 00:42:47 2011
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configurab
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
/**
* Manipulate the working area for the transient store for maps and reduces.
@@ -54,7 +55,7 @@ public class MROutputFiles extends MapOu
@Override
public Path getOutputFile()
throws IOException {
- return lDirAlloc.getLocalPathToRead(Constants.OUTPUT + Path.SEPARATOR
+ return lDirAlloc.getLocalPathToRead(MRJobConfig.OUTPUT + Path.SEPARATOR
+ MAP_OUTPUT_FILENAME_STRING, getConf());
}
@@ -68,7 +69,7 @@ public class MROutputFiles extends MapOu
@Override
public Path getOutputFileForWrite(long size)
throws IOException {
- return lDirAlloc.getLocalPathForWrite(Constants.OUTPUT + Path.SEPARATOR
+ return lDirAlloc.getLocalPathForWrite(MRJobConfig.OUTPUT + Path.SEPARATOR
+ MAP_OUTPUT_FILENAME_STRING, size, getConf());
}
@@ -89,7 +90,7 @@ public class MROutputFiles extends MapOu
@Override
public Path getOutputIndexFile()
throws IOException {
- return lDirAlloc.getLocalPathToRead(Constants.OUTPUT + Path.SEPARATOR
+ return lDirAlloc.getLocalPathToRead(MRJobConfig.OUTPUT + Path.SEPARATOR
+ MAP_OUTPUT_FILENAME_STRING + MAP_OUTPUT_INDEX_SUFFIX_STRING,
getConf());
}
@@ -104,7 +105,7 @@ public class MROutputFiles extends MapOu
@Override
public Path getOutputIndexFileForWrite(long size)
throws IOException {
- return lDirAlloc.getLocalPathForWrite(Constants.OUTPUT + Path.SEPARATOR
+ return lDirAlloc.getLocalPathForWrite(MRJobConfig.OUTPUT + Path.SEPARATOR
+ MAP_OUTPUT_FILENAME_STRING + MAP_OUTPUT_INDEX_SUFFIX_STRING,
size, getConf());
}
@@ -128,7 +129,7 @@ public class MROutputFiles extends MapOu
@Override
public Path getSpillFile(int spillNumber)
throws IOException {
- return lDirAlloc.getLocalPathToRead(Constants.OUTPUT + "/spill"
+ return lDirAlloc.getLocalPathToRead(MRJobConfig.OUTPUT + "/spill"
+ spillNumber + ".out", getConf());
}
@@ -143,7 +144,7 @@ public class MROutputFiles extends MapOu
@Override
public Path getSpillFileForWrite(int spillNumber, long size)
throws IOException {
- return lDirAlloc.getLocalPathForWrite(Constants.OUTPUT + "/spill"
+ return lDirAlloc.getLocalPathForWrite(MRJobConfig.OUTPUT + "/spill"
+ spillNumber + ".out", size, getConf());
}
@@ -157,7 +158,7 @@ public class MROutputFiles extends MapOu
@Override
public Path getSpillIndexFile(int spillNumber)
throws IOException {
- return lDirAlloc.getLocalPathToRead(Constants.OUTPUT + "/spill"
+ return lDirAlloc.getLocalPathToRead(MRJobConfig.OUTPUT + "/spill"
+ spillNumber + ".out.index", getConf());
}
@@ -172,7 +173,7 @@ public class MROutputFiles extends MapOu
@Override
public Path getSpillIndexFileForWrite(int spillNumber, long size)
throws IOException {
- return lDirAlloc.getLocalPathForWrite(Constants.OUTPUT + "/spill"
+ return lDirAlloc.getLocalPathForWrite(MRJobConfig.OUTPUT + "/spill"
+ spillNumber + ".out.index", size, getConf());
}
@@ -187,7 +188,7 @@ public class MROutputFiles extends MapOu
public Path getInputFile(int mapId)
throws IOException {
return lDirAlloc.getLocalPathToRead(String.format(
- REDUCE_INPUT_FILE_FORMAT_STRING, Constants.OUTPUT, Integer
+ REDUCE_INPUT_FILE_FORMAT_STRING, MRJobConfig.OUTPUT, Integer
.valueOf(mapId)), getConf());
}
@@ -204,7 +205,7 @@ public class MROutputFiles extends MapOu
long size)
throws IOException {
return lDirAlloc.getLocalPathForWrite(String.format(
- REDUCE_INPUT_FILE_FORMAT_STRING, Constants.OUTPUT, mapId.getId()),
+ REDUCE_INPUT_FILE_FORMAT_STRING, MRJobConfig.OUTPUT, mapId.getId()),
size, getConf());
}
@@ -212,7 +213,7 @@ public class MROutputFiles extends MapOu
@Override
public void removeAll()
throws IOException {
- ((JobConf)getConf()).deleteLocalFiles(Constants.OUTPUT);
+ ((JobConf)getConf()).deleteLocalFiles(MRJobConfig.OUTPUT);
}
@Override
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java Thu Sep 29 00:42:47 2011
@@ -44,6 +44,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.util.ProcessTree;
import org.apache.hadoop.util.Shell;
import org.apache.log4j.Appender;
@@ -75,10 +76,18 @@ public class TaskLog {
}
}
}
-
+
+ public static String getMRv2LogDir() {
+ return System.getProperty(MRJobConfig.TASK_LOG_DIR);
+ }
+
public static File getTaskLogFile(TaskAttemptID taskid, boolean isCleanup,
LogName filter) {
- return new File(getAttemptDir(taskid, isCleanup), filter.toString());
+ if (getMRv2LogDir() != null) {
+ return new File(getMRv2LogDir(), filter.toString());
+ } else {
+ return new File(getAttemptDir(taskid, isCleanup), filter.toString());
+ }
}
static File getRealTaskLogFileLocation(TaskAttemptID taskid,
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java Thu Sep 29 00:42:47 2011
@@ -18,6 +18,7 @@
package org.apache.hadoop.mapred.pipes;
+import java.io.BufferedInputStream;
import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
@@ -26,6 +27,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Random;
import javax.crypto.SecretKey;
@@ -111,7 +113,6 @@ class Application<K1 extends WritableCom
if (interpretor != null) {
cmd.add(interpretor);
}
-
String executable = DistributedCache.getLocalCacheFiles(conf)[0].toString();
if (!new File(executable).canExecute()) {
// LinuxTaskController sets +x permissions on all distcache files already.
@@ -129,7 +130,7 @@ class Application<K1 extends WritableCom
long logLength = TaskLog.getTaskLogLength(conf);
cmd = TaskLog.captureOutAndError(null, cmd, stdout, stderr, logLength,
false);
-
+
process = runClient(cmd, env);
clientSocket = serverSocket.accept();
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java Thu Sep 29 00:42:47 2011
@@ -41,8 +41,8 @@ import org.apache.hadoop.mapreduce.util.
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
/**
* Provides a way to access information about the map/reduce cluster.
@@ -68,30 +68,41 @@ public class Cluster {
}
public Cluster(Configuration conf) throws IOException {
- this.conf = conf;
- this.ugi = UserGroupInformation.getCurrentUser();
- for (ClientProtocolProvider provider : ServiceLoader.load(ClientProtocolProvider.class)) {
- ClientProtocol clientProtocol = provider.create(conf);
- if (clientProtocol != null) {
- clientProtocolProvider = provider;
- client = clientProtocol;
- break;
- }
- }
+ this(null, conf);
}
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser();
- for (ClientProtocolProvider provider : ServiceLoader.load(ClientProtocolProvider.class)) {
- ClientProtocol clientProtocol = provider.create(jobTrackAddr, conf);
+ initialize(jobTrackAddr, conf);
+ }
+
+ private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
+ throws IOException {
+
+ for (ClientProtocolProvider provider : ServiceLoader
+ .load(ClientProtocolProvider.class)) {
+ ClientProtocol clientProtocol = null;
+ if (jobTrackAddr == null) {
+ clientProtocol = provider.create(conf);
+ } else {
+ clientProtocol = provider.create(jobTrackAddr, conf);
+ }
+
if (clientProtocol != null) {
clientProtocolProvider = provider;
client = clientProtocol;
break;
}
}
+
+ if (null == clientProtocolProvider || null == client) {
+ throw new IOException(
+ "Cannot initialize Cluster. Please check your configuration for "
+ + MRConfig.FRAMEWORK_NAME
+ + " and the correspond server addresses.");
+ }
}
ClientProtocol getClient() {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java Thu Sep 29 00:42:47 2011
@@ -1239,7 +1239,8 @@ public class Job extends JobContextImpl
if (success) {
LOG.info("Job " + jobId + " completed successfully");
} else {
- LOG.info("Job " + jobId + " failed with state " + status.getState());
+ LOG.info("Job " + jobId + " failed with state " + status.getState() +
+ " due to: " + status.getFailureInfo());
}
Counters counters = getCounters();
if (counters != null) {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java Thu Sep 29 00:42:47 2011
@@ -81,6 +81,7 @@ public class JobStatus implements Writab
private String queue;
private JobPriority priority;
private String schedulingInfo="NA";
+ private String failureInfo = "NA";
private Map<JobACL, AccessControlList> jobACLs =
new HashMap<JobACL, AccessControlList>();
@@ -279,6 +280,14 @@ public class JobStatus implements Writab
}
/**
+ * Set diagnostic information.
+ * @param failureInfo diagnostic information
+ */
+ protected synchronized void setFailureInfo(String failureInfo) {
+ this.failureInfo = failureInfo;
+ }
+
+ /**
* Get queue name
* @return queue name
*/
@@ -359,6 +368,15 @@ public class JobStatus implements Writab
*/
public synchronized JobPriority getPriority() { return priority; }
+ /**
+ * Gets any available info on the reason of failure of the job.
+ * @return diagnostic information on why a job might have failed.
+ */
+ public synchronized String getFailureInfo() {
+ return this.failureInfo;
+ }
+
+
/**
* Returns true if the status is for a completed job.
*/
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Thu Sep 29 00:42:47 2011
@@ -210,6 +210,8 @@ public interface MRJobConfig {
public static final String REDUCE_LOG_LEVEL = "mapreduce.reduce.log.level";
+ public static final String DEFAULT_LOG_LEVEL = "INFO";
+
public static final String REDUCE_MERGE_INMEM_THRESHOLD = "mapreduce.reduce.merge.inmem.threshold";
public static final String REDUCE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.input.buffer.percent";
@@ -330,9 +332,15 @@ public interface MRJobConfig {
MR_AM_PREFIX+"num-progress-splits";
public static final int DEFAULT_MR_AM_NUM_PROGRESS_SPLITS = 12;
- /** Number of threads user to launch containers in the app master.*/
- public static final String MR_AM_CONTAINERLAUNCHER_THREAD_COUNT =
- MR_AM_PREFIX+"containerlauncher.thread-count";
+ /**
+ * Upper limit on the number of threads user to launch containers in the app
+ * master. Expect level config, you shouldn't be needing it in most cases.
+ */
+ public static final String MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT =
+ MR_AM_PREFIX+"containerlauncher.thread-count-limit";
+
+ public static final int DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT =
+ 500;
/** Number of threads to handle job client RPC requests.*/
public static final String MR_AM_JOB_CLIENT_THREAD_COUNT =
@@ -400,4 +408,69 @@ public interface MRJobConfig {
*/
public static final String MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR =
MR_AM_PREFIX + "create-intermediate-jh-base-dir";
+
+ public static final String MAPRED_MAP_ADMIN_JAVA_OPTS =
+ "mapreduce.admin.map.child.java.opts";
+
+ public static final String MAPRED_REDUCE_ADMIN_JAVA_OPTS =
+ "mapreduce.admin.reduce.child.java.opts";
+
+ public static final String DEFAULT_MAPRED_ADMIN_JAVA_OPTS =
+ "-Djava.net.preferIPv4Stack=true " +
+ "-Dhadoop.metrics.log.level=WARN ";
+
+ public static final String MAPRED_ADMIN_USER_SHELL =
+ "mapreduce.admin.user.shell";
+
+ public static final String DEFAULT_SHELL = "/bin/bash";
+
+ public static final String MAPRED_ADMIN_USER_ENV =
+ "mapreduce.admin.user.env";
+
+ public static final String DEFAULT_MAPRED_ADMIN_USER_ENV =
+ "LD_LIBRARY_PATH=$HADOOP_COMMON_HOME/lib";
+
+ public static final String WORKDIR = "work";
+
+ public static final String OUTPUT = "output";
+
+ public static final String HADOOP_WORK_DIR = "HADOOP_WORK_DIR";
+
+ public static final String STDOUT_LOGFILE_ENV = "STDOUT_LOGFILE_ENV";
+
+ public static final String STDERR_LOGFILE_ENV = "STDERR_LOGFILE_ENV";
+
+ // This should be the directory where splits file gets localized on the node
+ // running ApplicationMaster.
+ public static final String JOB_SUBMIT_DIR = "jobSubmitDir";
+
+ // This should be the name of the localized job-configuration file on the node
+ // running ApplicationMaster and Task
+ public static final String JOB_CONF_FILE = "job.xml";
+
+ // This should be the name of the localized job-jar file on the node running
+ // individual containers/tasks.
+ public static final String JOB_JAR = "job.jar";
+
+ public static final String JOB_SPLIT = "job.split";
+
+ public static final String JOB_SPLIT_METAINFO = "job.splitmetainfo";
+
+ public static final String APPLICATION_MASTER_CLASS =
+ "org.apache.hadoop.mapreduce.v2.app.MRAppMaster";
+
+ // The token file for the application. Should contain tokens for access to
+ // remote file system and may optionally contain application specific tokens.
+ // For now, generated by the AppManagers and used by NodeManagers and the
+ // Containers.
+ public static final String APPLICATION_TOKENS_FILE = "appTokens";
+
+ /** The log directory for the containers */
+ public static final String TASK_LOG_DIR = MR_PREFIX + "container.log.dir";
+
+ public static final String TASK_LOG_SIZE = MR_PREFIX + "container.log.filesize";
+
+ public static final String MAPREDUCE_V2_CHILD_CLASS =
+ "org.apache.hadoop.mapred.YarnChild";
+
}
Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Sep 29 00:42:47 2011
@@ -1,3 +1,3 @@
-/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:1166973-1173011
+/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:1166973-1177128
/hadoop/core/branches/branch-0.19/mapred/src/java/mapred-default.xml:713112
/hadoop/core/trunk/src/mapred/mapred-default.xml:776175-785643
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java Thu Sep 29 00:42:47 2011
@@ -135,7 +135,7 @@ public class HistoryClientService extend
webApp = new HsWebApp(history);
String bindAddress = conf.get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_ADDRESS);
- WebApps.$for("yarn", this).at(bindAddress).start(webApp);
+ WebApps.$for("jobhistory", this).at(bindAddress).start(webApp);
}
@Override
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java Thu Sep 29 00:42:47 2011
@@ -22,7 +22,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -84,25 +83,6 @@ public class JobHistory extends Abstract
private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class);
- /*
- * TODO Get rid of this once JobId has it's own comparator
- */
- private static final Comparator<JobId> JOB_ID_COMPARATOR =
- new Comparator<JobId>() {
- @Override
- public int compare(JobId o1, JobId o2) {
- if (o1.getAppId().getClusterTimestamp() >
- o2.getAppId().getClusterTimestamp()) {
- return 1;
- } else if (o1.getAppId().getClusterTimestamp() <
- o2.getAppId().getClusterTimestamp()) {
- return -1;
- } else {
- return o1.getId() - o2.getId();
- }
- }
- };
-
private static String DONE_BEFORE_SERIAL_TAIL =
JobHistoryUtils.doneSubdirsBeforeSerialTail();
@@ -118,19 +98,19 @@ public class JobHistory extends Abstract
//Maintains minimal details for recent jobs (parsed from history file name).
//Sorted on Job Completion Time.
private final SortedMap<JobId, MetaInfo> jobListCache =
- new ConcurrentSkipListMap<JobId, MetaInfo>(JOB_ID_COMPARATOR);
+ new ConcurrentSkipListMap<JobId, MetaInfo>();
// Re-use exisiting MetaInfo objects if they exist for the specific JobId. (synchronization on MetaInfo)
// Check for existance of the object when using iterators.
private final SortedMap<JobId, MetaInfo> intermediateListCache =
- new ConcurrentSkipListMap<JobId, JobHistory.MetaInfo>(JOB_ID_COMPARATOR);
+ new ConcurrentSkipListMap<JobId, JobHistory.MetaInfo>();
//Maintains a list of known done subdirectories. Not currently used.
private final Set<Path> existingDoneSubdirs = new HashSet<Path>();
private final SortedMap<JobId, Job> loadedJobCache =
- new ConcurrentSkipListMap<JobId, Job>(JOB_ID_COMPARATOR);
+ new ConcurrentSkipListMap<JobId, Job>();
/**
* Maintains a mapping between intermediate user directories and the last
@@ -673,7 +653,7 @@ public class JobHistory extends Abstract
private Map<JobId, Job> getAllJobsInternal() {
//TODO This should ideally be using getAllJobsMetaInfo
// or get rid of that method once Job has APIs for user, finishTime etc.
- SortedMap<JobId, Job> result = new TreeMap<JobId, Job>(JOB_ID_COMPARATOR);
+ SortedMap<JobId, Job> result = new TreeMap<JobId, Job>();
try {
scanIntermediateDirectory();
} catch (IOException e) {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml Thu Sep 29 00:42:47 2011
@@ -66,6 +66,12 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>
<scope>test</scope>
</dependency>
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java Thu Sep 29 00:42:47 2011
@@ -1,20 +1,20 @@
/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.mapred;
@@ -42,29 +42,29 @@ public class ClientCache {
private final Configuration conf;
private final ResourceMgrDelegate rm;
-
+
private static final Log LOG = LogFactory.getLog(ClientCache.class);
private Map<JobID, ClientServiceDelegate> cache =
- new HashMap<JobID, ClientServiceDelegate>();
-
+ new HashMap<JobID, ClientServiceDelegate>();
+
private MRClientProtocol hsProxy;
- ClientCache(Configuration conf, ResourceMgrDelegate rm) {
+ public ClientCache(Configuration conf, ResourceMgrDelegate rm) {
this.conf = conf;
this.rm = rm;
}
//TODO: evict from the cache on some threshold
- synchronized ClientServiceDelegate getClient(JobID jobId) {
- if (hsProxy == null) {
+ public synchronized ClientServiceDelegate getClient(JobID jobId) {
+ if (hsProxy == null) {
try {
- hsProxy = instantiateHistoryProxy();
- } catch (IOException e) {
- LOG.warn("Could not connect to History server.", e);
- throw new YarnException("Could not connect to History server.", e);
- }
- }
+ hsProxy = instantiateHistoryProxy();
+ } catch (IOException e) {
+ LOG.warn("Could not connect to History server.", e);
+ throw new YarnException("Could not connect to History server.", e);
+ }
+ }
ClientServiceDelegate client = cache.get(jobId);
if (client == null) {
client = new ClientServiceDelegate(conf, rm, jobId, hsProxy);
@@ -74,7 +74,7 @@ public class ClientCache {
}
private MRClientProtocol instantiateHistoryProxy()
- throws IOException {
+ throws IOException {
final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS);
if (StringUtils.isEmpty(serviceAddr)) {
return null;
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Thu Sep 29 00:42:47 2011
@@ -70,7 +70,7 @@ import org.apache.hadoop.yarn.ipc.YarnRP
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
-class ClientServiceDelegate {
+public class ClientServiceDelegate {
private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
// Caches for per-user NotRunningJobs
@@ -87,7 +87,7 @@ class ClientServiceDelegate {
private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private static String UNKNOWN_USER = "Unknown User";
- ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm,
+ public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm,
JobID jobId, MRClientProtocol historyServerProxy) {
this.conf = new Configuration(conf); // Cloning for modifying.
// For faster redirects from AM to HS.
@@ -101,16 +101,20 @@ class ClientServiceDelegate {
// Get the instance of the NotRunningJob corresponding to the specified
// user and state
- private NotRunningJob getNotRunningJob(String user, JobState state) {
+ private NotRunningJob getNotRunningJob(ApplicationReport applicationReport,
+ JobState state) {
synchronized (notRunningJobs) {
HashMap<String, NotRunningJob> map = notRunningJobs.get(state);
if (map == null) {
map = new HashMap<String, NotRunningJob>();
notRunningJobs.put(state, map);
}
+ String user =
+ (applicationReport == null) ?
+ UNKNOWN_USER : applicationReport.getUser();
NotRunningJob notRunningJob = map.get(user);
if (notRunningJob == null) {
- notRunningJob = new NotRunningJob(user, state);
+ notRunningJob = new NotRunningJob(applicationReport, state);
map.put(user, notRunningJob);
}
return notRunningJob;
@@ -130,7 +134,7 @@ class ClientServiceDelegate {
if (application == null) {
LOG.info("Could not get Job info from RM for job " + jobId
+ ". Redirecting to job history server.");
- return checkAndGetHSProxy(UNKNOWN_USER, JobState.NEW);
+ return checkAndGetHSProxy(null, JobState.NEW);
}
try {
if (application.getHost() == null || "".equals(application.getHost())) {
@@ -171,7 +175,7 @@ class ClientServiceDelegate {
if (application == null) {
LOG.info("Could not get Job info from RM for job " + jobId
+ ". Redirecting to job history server.");
- return checkAndGetHSProxy(UNKNOWN_USER, JobState.RUNNING);
+ return checkAndGetHSProxy(null, JobState.RUNNING);
}
} catch (InterruptedException e) {
LOG.warn("getProxy() call interruped", e);
@@ -191,17 +195,17 @@ class ClientServiceDelegate {
if (application.getState() == ApplicationState.NEW ||
application.getState() == ApplicationState.SUBMITTED) {
realProxy = null;
- return getNotRunningJob(user, JobState.NEW);
+ return getNotRunningJob(application, JobState.NEW);
}
if (application.getState() == ApplicationState.FAILED) {
realProxy = null;
- return getNotRunningJob(user, JobState.FAILED);
+ return getNotRunningJob(application, JobState.FAILED);
}
if (application.getState() == ApplicationState.KILLED) {
realProxy = null;
- return getNotRunningJob(user, JobState.KILLED);
+ return getNotRunningJob(application, JobState.KILLED);
}
//History server can serve a job only if application
@@ -209,15 +213,16 @@ class ClientServiceDelegate {
if (application.getState() == ApplicationState.SUCCEEDED) {
LOG.info("Application state is completed. " +
"Redirecting to job history server");
- realProxy = checkAndGetHSProxy(user, JobState.SUCCEEDED);
+ realProxy = checkAndGetHSProxy(application, JobState.SUCCEEDED);
}
return realProxy;
}
- private MRClientProtocol checkAndGetHSProxy(String user, JobState state) {
+ private MRClientProtocol checkAndGetHSProxy(
+ ApplicationReport applicationReport, JobState state) {
if (null == historyServerProxy) {
LOG.warn("Job History Server is not configured.");
- return getNotRunningJob(user, state);
+ return getNotRunningJob(applicationReport, state);
}
return historyServerProxy;
}
@@ -274,7 +279,7 @@ class ClientServiceDelegate {
}
}
- org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
+ public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
InterruptedException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0);
GetCountersRequest request = recordFactory.newRecordInstance(GetCountersRequest.class);
@@ -285,7 +290,7 @@ class ClientServiceDelegate {
}
- TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1, int arg2)
+ public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1, int arg2)
throws IOException, InterruptedException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter
.toYarn(arg0);
@@ -303,7 +308,7 @@ class ClientServiceDelegate {
.toArray(new org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[0]));
}
- String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID arg0)
+ public String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID arg0)
throws IOException, InterruptedException {
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter
@@ -321,24 +326,25 @@ class ClientServiceDelegate {
return result;
}
- JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException {
+ public JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID);
- GetJobReportRequest request = recordFactory.newRecordInstance(GetJobReportRequest.class);
+ GetJobReportRequest request =
+ recordFactory.newRecordInstance(GetJobReportRequest.class);
request.setJobId(jobId);
JobReport report = ((GetJobReportResponse) invoke("getJobReport",
GetJobReportRequest.class, request)).getJobReport();
String jobFile = MRApps.getJobFile(conf, report.getUser(), oldJobID);
- //TODO: add tracking url in JobReport
- return TypeConverter.fromYarn(report, jobFile, "");
+ return TypeConverter.fromYarn(report, jobFile);
}
- org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType)
+ public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType)
throws YarnRemoteException, YarnRemoteException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID);
- GetTaskReportsRequest request = recordFactory.newRecordInstance(GetTaskReportsRequest.class);
+ GetTaskReportsRequest request =
+ recordFactory.newRecordInstance(GetTaskReportsRequest.class);
request.setJobId(jobId);
request.setTaskType(TypeConverter.toYarn(taskType));
@@ -350,7 +356,7 @@ class ClientServiceDelegate {
(taskReports).toArray(new org.apache.hadoop.mapreduce.TaskReport[0]);
}
- boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
+ public boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
throws YarnRemoteException {
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID
= TypeConverter.toYarn(taskAttemptID);
@@ -366,7 +372,7 @@ class ClientServiceDelegate {
return true;
}
- boolean killJob(JobID oldJobID)
+ public boolean killJob(JobID oldJobID)
throws YarnRemoteException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId
= TypeConverter.toYarn(oldJobID);
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java Thu Sep 29 00:42:47 2011
@@ -22,6 +22,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
@@ -53,20 +55,41 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
public class NotRunningJob implements MRClientProtocol {
+ private static final Log LOG = LogFactory.getLog(NotRunningJob.class);
+
private RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private final JobState jobState;
- private final String user;
-
- NotRunningJob(String username, JobState jobState) {
- this.user = username;
+ private final ApplicationReport applicationReport;
+
+
+ private ApplicationReport getUnknownApplicationReport() {
+ ApplicationReport unknown =
+ recordFactory.newRecordInstance(ApplicationReport.class);
+ unknown.setUser("N/A");
+ unknown.setHost("N/A");
+ unknown.setName("N/A");
+ unknown.setQueue("N/A");
+ unknown.setStartTime(0);
+ unknown.setFinishTime(0);
+ unknown.setTrackingUrl("N/A");
+ unknown.setDiagnostics("N/A");
+ LOG.info("getUnknownApplicationReport");
+ return unknown;
+ }
+
+ NotRunningJob(ApplicationReport applicationReport, JobState jobState) {
+ this.applicationReport =
+ (applicationReport == null) ?
+ getUnknownApplicationReport() : applicationReport;
this.jobState = jobState;
}
@@ -101,15 +124,19 @@ public class NotRunningJob implements MR
@Override
public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws YarnRemoteException {
- GetJobReportResponse resp =
- recordFactory.newRecordInstance(GetJobReportResponse.class);
JobReport jobReport =
recordFactory.newRecordInstance(JobReport.class);
jobReport.setJobId(request.getJobId());
- jobReport.setJobState(this.jobState);
+ jobReport.setJobState(jobState);
+ jobReport.setUser(applicationReport.getUser());
+ jobReport.setStartTime(applicationReport.getStartTime());
+ jobReport.setDiagnostics(applicationReport.getDiagnostics());
+ jobReport.setJobName(applicationReport.getName());
+ jobReport.setTrackingUrl(applicationReport.getTrackingUrl());
+ jobReport.setFinishTime(applicationReport.getFinishTime());
- jobReport.setUser(this.user);
- // TODO: Add jobName & other job information that is available
+ GetJobReportResponse resp =
+ recordFactory.newRecordInstance(GetJobReportResponse.class);
resp.setJobReport(jobReport);
return resp;
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Thu Sep 29 00:42:47 2011
@@ -32,19 +32,19 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.QueueAclsInfo;
import org.apache.hadoop.mapreduce.QueueInfo;
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.mapreduce.v2.MRConstants;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -53,7 +53,7 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
@@ -79,6 +79,10 @@ public class ResourceMgrDelegate {
private ApplicationId applicationId;
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+ /**
+ * Delegate responsible for communicating with the Resource Manager's {@link ClientRMProtocol}.
+ * @param conf the configuration object.
+ */
public ResourceMgrDelegate(YarnConfiguration conf) {
this.conf = conf;
YarnRPC rpc = YarnRPC.create(this.conf);
@@ -97,6 +101,16 @@ public class ResourceMgrDelegate {
LOG.info("Connected to ResourceManager at " + rmAddress);
}
+ /**
+ * Used for injecting applicationsManager, mostly for testing.
+ * @param conf the configuration object
+ * @param applicationsManager the handle to talk the resource managers {@link ClientRMProtocol}.
+ */
+ public ResourceMgrDelegate(YarnConfiguration conf, ClientRMProtocol applicationsManager) {
+ this.conf = conf;
+ this.applicationsManager = applicationsManager;
+ }
+
public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
throws IOException, InterruptedException {
return;
@@ -155,8 +169,8 @@ public class ResourceMgrDelegate {
}
public JobID getNewJobID() throws IOException, InterruptedException {
- GetNewApplicationIdRequest request = recordFactory.newRecordInstance(GetNewApplicationIdRequest.class);
- applicationId = applicationsManager.getNewApplicationId(request).getApplicationId();
+ GetNewApplicationRequest request = recordFactory.newRecordInstance(GetNewApplicationRequest.class);
+ applicationId = applicationsManager.getNewApplication(request).getApplicationId();
return TypeConverter.fromYarn(applicationId);
}
@@ -254,7 +268,7 @@ public class ResourceMgrDelegate {
public String getSystemDir() throws IOException, InterruptedException {
- Path sysDir = new Path(MRConstants.JOB_SUBMIT_DIR);
+ Path sysDir = new Path(MRJobConfig.JOB_SUBMIT_DIR);
//FileContext.getFileContext(conf).delete(sysDir, true);
return sysDir.toString();
}
@@ -294,9 +308,9 @@ public class ResourceMgrDelegate {
}
public void killApplication(ApplicationId applicationId) throws IOException {
- FinishApplicationRequest request = recordFactory.newRecordInstance(FinishApplicationRequest.class);
+ KillApplicationRequest request = recordFactory.newRecordInstance(KillApplicationRequest.class);
request.setApplicationId(applicationId);
- applicationsManager.finishApplication(request);
+ applicationsManager.forceKillApplication(request);
LOG.info("Killing application " + applicationId);
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1177130&r1=1177129&r2=1177130&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Thu Sep 29 00:42:47 2011
@@ -51,7 +51,6 @@ import org.apache.hadoop.mapreduce.TaskT
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.mapreduce.v2.MRConstants;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.Credentials;
@@ -60,6 +59,7 @@ import org.apache.hadoop.security.author
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationState;
@@ -105,10 +105,22 @@ public class YARNRunner implements Clien
* @param resMgrDelegate the resourcemanager client handle.
*/
public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {
+ this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate));
+ }
+
+ /**
+ * Similar to {@link YARNRunner#YARNRunner(Configuration, ResourceMgrDelegate)}
+ * but allowing injecting {@link ClientCache}. Enable mocking and testing.
+ * @param conf the configuration object
+ * @param resMgrDelegate the resource manager delegate
+ * @param clientCache the client cache object.
+ */
+ public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
+ ClientCache clientCache) {
this.conf = conf;
try {
this.resMgrDelegate = resMgrDelegate;
- this.clientCache = new ClientCache(this.conf, resMgrDelegate);
+ this.clientCache = clientCache;
this.defaultFileContext = FileContext.getFileContext(this.conf);
} catch (UnsupportedFileSystemException ufe) {
throw new RuntimeException("Error in instantiating YarnClient", ufe);
@@ -210,7 +222,7 @@ public class YARNRunner implements Clien
// Upload only in security mode: TODO
Path applicationTokensFile =
- new Path(jobSubmitDir, MRConstants.APPLICATION_TOKENS_FILE);
+ new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
try {
ts.writeTokenStorageFile(applicationTokensFile, conf);
} catch (IOException e) {
@@ -226,7 +238,9 @@ public class YARNRunner implements Clien
ApplicationReport appMaster = resMgrDelegate
.getApplicationReport(applicationId);
- String diagnostics = (appMaster == null ? "application report is null" : appMaster.getDiagnostics());
+ String diagnostics =
+ (appMaster == null ?
+ "application report is null" : appMaster.getDiagnostics());
if (appMaster == null || appMaster.getState() == ApplicationState.FAILED
|| appMaster.getState() == ApplicationState.KILLED) {
throw new IOException("Failed to run job : " +
@@ -263,7 +277,7 @@ public class YARNRunner implements Clien
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
- Path jobConfPath = new Path(jobSubmitDir, MRConstants.JOB_CONF_FILE);
+ Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE);
URL yarnUrlForJobSubmitDir = ConverterUtils
.getYarnUrlFromPath(defaultFileContext.getDefaultFileSystem()
@@ -272,13 +286,13 @@ public class YARNRunner implements Clien
LOG.debug("Creating setup context, jobSubmitDir url is "
+ yarnUrlForJobSubmitDir);
- localResources.put(MRConstants.JOB_CONF_FILE,
+ localResources.put(MRJobConfig.JOB_CONF_FILE,
createApplicationResource(defaultFileContext,
jobConfPath));
if (jobConf.get(MRJobConfig.JAR) != null) {
- localResources.put(MRConstants.JOB_JAR,
+ localResources.put(MRJobConfig.JOB_JAR,
createApplicationResource(defaultFileContext,
- new Path(jobSubmitDir, MRConstants.JOB_JAR)));
+ new Path(jobSubmitDir, MRJobConfig.JOB_JAR)));
} else {
// Job jar may be null. For e.g, for pipes, the job jar is the hadoop
// mapreduce jar itself which is already on the classpath.
@@ -287,10 +301,12 @@ public class YARNRunner implements Clien
}
// TODO gross hack
- for (String s : new String[] { "job.split", "job.splitmetainfo",
- MRConstants.APPLICATION_TOKENS_FILE }) {
+ for (String s : new String[] {
+ MRJobConfig.JOB_SPLIT,
+ MRJobConfig.JOB_SPLIT_METAINFO,
+ MRJobConfig.APPLICATION_TOKENS_FILE }) {
localResources.put(
- MRConstants.JOB_SUBMIT_DIR + "/" + s,
+ MRJobConfig.JOB_SUBMIT_DIR + "/" + s,
createApplicationResource(defaultFileContext,
new Path(jobSubmitDir, s)));
}
@@ -304,22 +320,24 @@ public class YARNRunner implements Clien
}
// Setup the command to run the AM
- String javaHome = "$JAVA_HOME";
Vector<CharSequence> vargs = new Vector<CharSequence>(8);
- vargs.add(javaHome + "/bin/java");
- vargs.add("-Dhadoop.root.logger="
- + conf.get(MRJobConfig.MR_AM_LOG_OPTS,
- MRJobConfig.DEFAULT_MR_AM_LOG_OPTS) + ",console");
+ vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
+
+ long logSize = TaskLog.getTaskLogLength(new JobConf(conf));
+ vargs.add("-Dlog4j.configuration=container-log4j.properties");
+ vargs.add("-D" + MRJobConfig.TASK_LOG_DIR + "="
+ + ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+ vargs.add("-D" + MRJobConfig.TASK_LOG_SIZE + "=" + logSize);
vargs.add(conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS));
- vargs.add("org.apache.hadoop.mapreduce.v2.app.MRAppMaster");
- vargs.add(String.valueOf(applicationId.getClusterTimestamp()));
- vargs.add(String.valueOf(applicationId.getId()));
- vargs.add(ApplicationConstants.AM_FAIL_COUNT_STRING);
- vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
- vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
+ vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+ Path.SEPARATOR + ApplicationConstants.STDOUT);
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+ Path.SEPARATOR + ApplicationConstants.STDERR);
+
Vector<String> vargsFinal = new Vector<String>(8);
// Final commmand
@@ -332,15 +350,13 @@ public class YARNRunner implements Clien
LOG.info("Command to launch container for ApplicationMaster is : "
+ mergedCommand);
- // Setup the environment - Add { job jar, MR app jar } to classpath.
+ // Setup the CLASSPATH in environment
+ // i.e. add { job jar, CWD, Hadoop jars} to classpath.
Map<String, String> environment = new HashMap<String, String>();
- MRApps.setInitialClasspath(environment);
- MRApps.addToClassPath(environment, MRConstants.JOB_JAR);
- MRApps.addToClassPath(environment,
- MRConstants.YARN_MAPREDUCE_APP_JAR_PATH);
-
+ MRApps.setClasspath(environment);
+
// Parse distributed cache
- MRApps.setupDistributedCache(jobConf, localResources, environment);
+ MRApps.setupDistributedCache(jobConf, localResources);
// Setup ContainerLaunchContext for AM container
ContainerLaunchContext amContainer =
@@ -425,9 +441,35 @@ public class YARNRunner implements Clien
@Override
public void killJob(JobID arg0) throws IOException, InterruptedException {
- if (!clientCache.getClient(arg0).killJob(arg0)) {
- resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
- }
+ /* check if the status is not running, if not send kill to RM */
+ JobStatus status = clientCache.getClient(arg0).getJobStatus(arg0);
+ if (status.getState() != JobStatus.State.RUNNING) {
+ resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
+ return;
+ }
+
+ try {
+ /* send a kill to the AM */
+ clientCache.getClient(arg0).killJob(arg0);
+ long currentTimeMillis = System.currentTimeMillis();
+ long timeKillIssued = currentTimeMillis;
+ while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState()
+ != JobStatus.State.KILLED)) {
+ try {
+ Thread.sleep(1000L);
+ } catch(InterruptedException ie) {
+ /** interrupted, just break */
+ break;
+ }
+ currentTimeMillis = System.currentTimeMillis();
+ status = clientCache.getClient(arg0).getJobStatus(arg0);
+ }
+ } catch(IOException io) {
+ LOG.debug("Error when checking for application status", io);
+ }
+ if (status.getState() != JobStatus.State.KILLED) {
+ resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
+ }
}
@Override