You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by su...@apache.org on 2011/09/29 02:33:42 UTC
svn commit: r1177127 [3/6] - in
/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ conf/
hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apa...
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1177127&r1=1177126&r2=1177127&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Thu Sep 29 00:33:34 2011
@@ -39,14 +39,14 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.v2.MRConstants;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -167,7 +167,7 @@ public class MRApps extends Apps {
return TaskAttemptStateUI.valueOf(attemptStateStr);
}
- private static void setMRFrameworkClasspath(
+ public static void setInitialClasspath(
Map<String, String> environment) throws IOException {
InputStream classpathFileStream = null;
BufferedReader reader = null;
@@ -182,17 +182,30 @@ public class MRApps extends Apps {
reader = new BufferedReader(new InputStreamReader(classpathFileStream));
String cp = reader.readLine();
if (cp != null) {
- addToEnvironment(environment, Environment.CLASSPATH.name(), cp.trim());
+ addToClassPath(environment, cp.trim());
}
// Put the file itself on classpath for tasks.
- addToEnvironment(
- environment,
- Environment.CLASSPATH.name(),
+ addToClassPath(environment,
thisClassLoader.getResource(mrAppGeneratedClasspathFile).getFile());
- // Add standard Hadoop classes
- for (String c : ApplicationConstants.APPLICATION_CLASSPATH) {
- addToEnvironment(environment, Environment.CLASSPATH.name(), c);
+ // If runtime env is different.
+ if (System.getenv().get("YARN_HOME") != null) {
+ ShellCommandExecutor exec =
+ new ShellCommandExecutor(new String[] {
+ System.getenv().get("YARN_HOME") + "/bin/yarn",
+ "classpath" });
+ exec.execute();
+ addToClassPath(environment, exec.getOutput().trim());
+ }
+
+ // Get yarn mapreduce-app classpath
+ if (System.getenv().get("HADOOP_MAPRED_HOME")!= null) {
+ ShellCommandExecutor exec =
+ new ShellCommandExecutor(new String[] {
+ System.getenv().get("HADOOP_MAPRED_HOME") + "/bin/mapred",
+ "classpath" });
+ exec.execute();
+ addToClassPath(environment, exec.getOutput().trim());
}
} finally {
if (classpathFileStream != null) {
@@ -204,35 +217,20 @@ public class MRApps extends Apps {
}
// TODO: Remove duplicates.
}
-
- private static final String SYSTEM_PATH_SEPARATOR =
- System.getProperty("path.separator");
- public static void addToEnvironment(
- Map<String, String> environment,
- String variable, String value) {
- String val = environment.get(variable);
- if (val == null) {
- val = value;
+ public static void addToClassPath(
+ Map<String, String> environment, String fileName) {
+ String classpath = environment.get(CLASSPATH);
+ if (classpath == null) {
+ classpath = fileName;
} else {
- val = val + SYSTEM_PATH_SEPARATOR + value;
+ classpath = classpath + ":" + fileName;
}
- environment.put(variable, val);
+ environment.put(CLASSPATH, classpath);
}
- public static void setClasspath(Map<String, String> environment)
- throws IOException {
- MRApps.addToEnvironment(
- environment,
- Environment.CLASSPATH.name(),
- MRJobConfig.JOB_JAR);
- MRApps.addToEnvironment(
- environment,
- Environment.CLASSPATH.name(),
- Environment.PWD.$() + Path.SEPARATOR + "*");
- MRApps.setMRFrameworkClasspath(environment);
- }
-
+ public static final String CLASSPATH = "CLASSPATH";
+
private static final String STAGING_CONSTANT = ".staging";
public static Path getStagingAreaDir(Configuration conf, String user) {
return new Path(
@@ -243,7 +241,7 @@ public class MRApps extends Apps {
public static String getJobFile(Configuration conf, String user,
org.apache.hadoop.mapreduce.JobID jobId) {
Path jobFile = new Path(MRApps.getStagingAreaDir(conf, user),
- jobId.toString() + Path.SEPARATOR + MRJobConfig.JOB_CONF_FILE);
+ jobId.toString() + Path.SEPARATOR + MRConstants.JOB_CONF_FILE);
return jobFile.toString();
}
@@ -262,11 +260,12 @@ public class MRApps extends Apps {
public static void setupDistributedCache(
Configuration conf,
- Map<String, LocalResource> localResources)
+ Map<String, LocalResource> localResources,
+ Map<String, String> env)
throws IOException {
// Cache archives
- parseDistributedCacheArtifacts(conf, localResources,
+ parseDistributedCacheArtifacts(conf, localResources, env,
LocalResourceType.ARCHIVE,
DistributedCache.getCacheArchives(conf),
parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)),
@@ -276,7 +275,7 @@ public class MRApps extends Apps {
// Cache files
parseDistributedCacheArtifacts(conf,
- localResources,
+ localResources, env,
LocalResourceType.FILE,
DistributedCache.getCacheFiles(conf),
parseTimeStamps(DistributedCache.getFileTimestamps(conf)),
@@ -291,6 +290,7 @@ public class MRApps extends Apps {
private static void parseDistributedCacheArtifacts(
Configuration conf,
Map<String, LocalResource> localResources,
+ Map<String, String> env,
LocalResourceType type,
URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[],
Path[] pathsToPutOnClasspath) throws IOException {
@@ -339,6 +339,9 @@ public class MRApps extends Apps {
: LocalResourceVisibility.PRIVATE,
sizes[i], timestamps[i])
);
+ if (classPaths.containsKey(u.getPath())) {
+ MRApps.addToClassPath(env, linkName);
+ }
}
}
}
@@ -355,42 +358,6 @@ public class MRApps extends Apps {
}
return result;
}
-
- public static void setEnvFromInputString(Map<String, String> env,
- String envString) {
- if (envString != null && envString.length() > 0) {
- String childEnvs[] = envString.split(",");
- for (String cEnv : childEnvs) {
- String[] parts = cEnv.split("="); // split on '='
- String value = env.get(parts[0]);
-
- if (value != null) {
- // Replace $env with the child's env constructed by NM's
- // For example: LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp
- value = parts[1].replace("$" + parts[0], value);
- } else {
- // example PATH=$PATH:/tmp
- value = System.getenv(parts[0]);
- if (value != null) {
- // the env key is present in the tt's env
- value = parts[1].replace("$" + parts[0], value);
- } else {
- // check for simple variable substitution
- // for e.g. ROOT=$HOME
- String envValue = System.getenv(parts[1].substring(1));
- if (envValue != null) {
- value = envValue;
- } else {
- // the env key is note present anywhere .. simply set it
- // example X=$X:/tmp or X=/tmp
- value = parts[1].replace("$" + parts[0], "");
- }
- }
- }
- addToEnvironment(env, parts[0], value);
- }
- }
- }
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java?rev=1177127&r1=1177126&r2=1177127&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java Thu Sep 29 00:33:34 2011
@@ -19,25 +19,27 @@
package org.apache.hadoop.mapreduce.v2.util;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
-import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
-import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
public class MRBuilderUtils {
+ private static final RecordFactory recordFactory = RecordFactoryProvider
+ .getRecordFactory(null);
+
public static JobId newJobId(ApplicationId appId, int id) {
- JobId jobId = Records.newRecord(JobId.class);
+ JobId jobId = recordFactory.newRecordInstance(JobId.class);
jobId.setAppId(appId);
jobId.setId(id);
return jobId;
}
public static TaskId newTaskId(JobId jobId, int id, TaskType taskType) {
- TaskId taskId = Records.newRecord(TaskId.class);
+ TaskId taskId = recordFactory.newRecordInstance(TaskId.class);
taskId.setJobId(jobId);
taskId.setId(id);
taskId.setTaskType(taskType);
@@ -46,27 +48,9 @@ public class MRBuilderUtils {
public static TaskAttemptId newTaskAttemptId(TaskId taskId, int attemptId) {
TaskAttemptId taskAttemptId =
- Records.newRecord(TaskAttemptId.class);
+ recordFactory.newRecordInstance(TaskAttemptId.class);
taskAttemptId.setTaskId(taskId);
taskAttemptId.setId(attemptId);
return taskAttemptId;
}
-
- public static JobReport newJobReport(JobId jobId, String jobName,
- String userName, JobState state, long startTime, long finishTime,
- float setupProgress, float mapProgress, float reduceProgress,
- float cleanupProgress) {
- JobReport report = Records.newRecord(JobReport.class);
- report.setJobId(jobId);
- report.setJobName(jobName);
- report.setUser(userName);
- report.setJobState(state);
- report.setStartTime(startTime);
- report.setFinishTime(finishTime);
- report.setSetupProgress(setupProgress);
- report.setCleanupProgress(cleanupProgress);
- report.setMapProgress(mapProgress);
- report.setReduceProgress(reduceProgress);
- return report;
- }
}
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto?rev=1177127&r1=1177126&r2=1177127&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto Thu Sep 29 00:33:34 2011
@@ -143,8 +143,6 @@ message JobReportProto {
optional int64 finish_time = 8;
optional string user = 9;
optional string jobName = 10;
- optional string trackingUrl = 11;
- optional string diagnostics = 12;
}
enum TaskAttemptCompletionEventStatusProto {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java?rev=1177127&r1=1177126&r2=1177127&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java Thu Sep 29 00:33:34 2011
@@ -19,14 +19,11 @@ package org.apache.hadoop.mapreduce;
import junit.framework.Assert;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationState;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationReportPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.QueueInfoPBImpl;
-
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.junit.Test;
@@ -70,14 +67,4 @@ public class TestTypeConverter {
Assert.assertEquals("jobId set incorrectly", 6789, status.getJobID().getId());
Assert.assertEquals("state set incorrectly", JobStatus.State.KILLED, status.getState());
}
-
- @Test
- public void testFromYarnQueueInfo() {
- org.apache.hadoop.yarn.api.records.QueueInfo queueInfo = new QueueInfoPBImpl();
- queueInfo.setQueueState(org.apache.hadoop.yarn.api.records.QueueState.STOPPED);
- org.apache.hadoop.mapreduce.QueueInfo returned =
- TypeConverter.fromYarn(queueInfo, new Configuration());
- Assert.assertEquals("queueInfo translation didn't work.",
- returned.getState().toString(), queueInfo.getQueueState().toString().toLowerCase());
- }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java?rev=1177127&r1=1177126&r2=1177127&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java Thu Sep 29 00:33:34 2011
@@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.MRConstants;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -114,8 +115,7 @@ public class TestMRApps {
@Test public void testGetJobFileWithUser() {
Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/my/path/to/staging");
- String jobFile = MRApps.getJobFile(conf, "dummy-user",
- new JobID("dummy-job", 12345));
+ String jobFile = MRApps.getJobFile(conf, "dummy-user", new JobID("dummy-job", 12345));
assertNotNull("getJobFile results in null.", jobFile);
assertEquals("jobFile with specified user is not as expected.",
"/my/path/to/staging/dummy-user/.staging/job_dummy-job_12345/job.xml", jobFile);
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java?rev=1177127&r1=1177126&r2=1177127&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java Thu Sep 29 00:33:34 2011
@@ -41,7 +41,6 @@ import org.apache.hadoop.mapred.IFile.Re
import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.mapred.Merger.Segment;
import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
/**
@@ -561,7 +560,7 @@ public class BackupStore<K,V> {
private Writer<K,V> createSpillFile() throws IOException {
Path tmp =
- new Path(MRJobConfig.OUTPUT + "/backup_" + tid.getId() + "_"
+ new Path(Constants.OUTPUT + "/backup_" + tid.getId() + "_"
+ (spillNumber++) + ".out");
LOG.info("Created file: " + tmp);
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java?rev=1177127&r1=1177126&r2=1177127&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java Thu Sep 29 00:33:34 2011
@@ -348,7 +348,6 @@ public class JobConf extends Configurati
*/
public static final Level DEFAULT_LOG_LEVEL = Level.INFO;
-
/**
* Construct a map/reduce job configuration.
*/
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java?rev=1177127&r1=1177126&r2=1177127&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java Thu Sep 29 00:33:34 2011
@@ -321,10 +321,6 @@ public class JobStatus extends org.apach
super.setJobACLs(acls);
}
- public synchronized void setFailureInfo(String failureInfo) {
- super.setFailureInfo(failureInfo);
- }
-
/**
* Set the priority of the job, defaulting to NORMAL.
* @param jp new job priority
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MRConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MRConstants.java?rev=1177127&r1=1177126&r2=1177127&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MRConstants.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MRConstants.java Thu Sep 29 00:33:34 2011
@@ -17,16 +17,11 @@
*/
package org.apache.hadoop.mapred;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-
/*******************************
* Some handy constants
*
*******************************/
-@Private
-@Unstable
-public interface MRConstants {
+interface MRConstants {
//
// Timeouts, constants
//
@@ -58,6 +53,5 @@ public interface MRConstants {
*/
public static final String FOR_REDUCE_TASK = "for-reduce-task";
- /** Used in MRv1, mostly in TaskTracker code **/
public static final String WORKDIR = "work";
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MROutputFiles.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MROutputFiles.java?rev=1177127&r1=1177126&r2=1177127&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MROutputFiles.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MROutputFiles.java Thu Sep 29 00:33:34 2011
@@ -27,7 +27,6 @@ import org.apache.hadoop.conf.Configurab
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.MRJobConfig;
/**
* Manipulate the working area for the transient store for maps and reduces.
@@ -55,7 +54,7 @@ public class MROutputFiles extends MapOu
@Override
public Path getOutputFile()
throws IOException {
- return lDirAlloc.getLocalPathToRead(MRJobConfig.OUTPUT + Path.SEPARATOR
+ return lDirAlloc.getLocalPathToRead(Constants.OUTPUT + Path.SEPARATOR
+ MAP_OUTPUT_FILENAME_STRING, getConf());
}
@@ -69,7 +68,7 @@ public class MROutputFiles extends MapOu
@Override
public Path getOutputFileForWrite(long size)
throws IOException {
- return lDirAlloc.getLocalPathForWrite(MRJobConfig.OUTPUT + Path.SEPARATOR
+ return lDirAlloc.getLocalPathForWrite(Constants.OUTPUT + Path.SEPARATOR
+ MAP_OUTPUT_FILENAME_STRING, size, getConf());
}
@@ -90,7 +89,7 @@ public class MROutputFiles extends MapOu
@Override
public Path getOutputIndexFile()
throws IOException {
- return lDirAlloc.getLocalPathToRead(MRJobConfig.OUTPUT + Path.SEPARATOR
+ return lDirAlloc.getLocalPathToRead(Constants.OUTPUT + Path.SEPARATOR
+ MAP_OUTPUT_FILENAME_STRING + MAP_OUTPUT_INDEX_SUFFIX_STRING,
getConf());
}
@@ -105,7 +104,7 @@ public class MROutputFiles extends MapOu
@Override
public Path getOutputIndexFileForWrite(long size)
throws IOException {
- return lDirAlloc.getLocalPathForWrite(MRJobConfig.OUTPUT + Path.SEPARATOR
+ return lDirAlloc.getLocalPathForWrite(Constants.OUTPUT + Path.SEPARATOR
+ MAP_OUTPUT_FILENAME_STRING + MAP_OUTPUT_INDEX_SUFFIX_STRING,
size, getConf());
}
@@ -129,7 +128,7 @@ public class MROutputFiles extends MapOu
@Override
public Path getSpillFile(int spillNumber)
throws IOException {
- return lDirAlloc.getLocalPathToRead(MRJobConfig.OUTPUT + "/spill"
+ return lDirAlloc.getLocalPathToRead(Constants.OUTPUT + "/spill"
+ spillNumber + ".out", getConf());
}
@@ -144,7 +143,7 @@ public class MROutputFiles extends MapOu
@Override
public Path getSpillFileForWrite(int spillNumber, long size)
throws IOException {
- return lDirAlloc.getLocalPathForWrite(MRJobConfig.OUTPUT + "/spill"
+ return lDirAlloc.getLocalPathForWrite(Constants.OUTPUT + "/spill"
+ spillNumber + ".out", size, getConf());
}
@@ -158,7 +157,7 @@ public class MROutputFiles extends MapOu
@Override
public Path getSpillIndexFile(int spillNumber)
throws IOException {
- return lDirAlloc.getLocalPathToRead(MRJobConfig.OUTPUT + "/spill"
+ return lDirAlloc.getLocalPathToRead(Constants.OUTPUT + "/spill"
+ spillNumber + ".out.index", getConf());
}
@@ -173,7 +172,7 @@ public class MROutputFiles extends MapOu
@Override
public Path getSpillIndexFileForWrite(int spillNumber, long size)
throws IOException {
- return lDirAlloc.getLocalPathForWrite(MRJobConfig.OUTPUT + "/spill"
+ return lDirAlloc.getLocalPathForWrite(Constants.OUTPUT + "/spill"
+ spillNumber + ".out.index", size, getConf());
}
@@ -188,7 +187,7 @@ public class MROutputFiles extends MapOu
public Path getInputFile(int mapId)
throws IOException {
return lDirAlloc.getLocalPathToRead(String.format(
- REDUCE_INPUT_FILE_FORMAT_STRING, MRJobConfig.OUTPUT, Integer
+ REDUCE_INPUT_FILE_FORMAT_STRING, Constants.OUTPUT, Integer
.valueOf(mapId)), getConf());
}
@@ -205,7 +204,7 @@ public class MROutputFiles extends MapOu
long size)
throws IOException {
return lDirAlloc.getLocalPathForWrite(String.format(
- REDUCE_INPUT_FILE_FORMAT_STRING, MRJobConfig.OUTPUT, mapId.getId()),
+ REDUCE_INPUT_FILE_FORMAT_STRING, Constants.OUTPUT, mapId.getId()),
size, getConf());
}
@@ -213,7 +212,7 @@ public class MROutputFiles extends MapOu
@Override
public void removeAll()
throws IOException {
- ((JobConf)getConf()).deleteLocalFiles(MRJobConfig.OUTPUT);
+ ((JobConf)getConf()).deleteLocalFiles(Constants.OUTPUT);
}
@Override
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java?rev=1177127&r1=1177126&r2=1177127&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java Thu Sep 29 00:33:34 2011
@@ -44,7 +44,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.util.ProcessTree;
import org.apache.hadoop.util.Shell;
import org.apache.log4j.Appender;
@@ -76,18 +75,10 @@ public class TaskLog {
}
}
}
-
- public static String getMRv2LogDir() {
- return System.getProperty(MRJobConfig.TASK_LOG_DIR);
- }
-
+
public static File getTaskLogFile(TaskAttemptID taskid, boolean isCleanup,
LogName filter) {
- if (getMRv2LogDir() != null) {
- return new File(getMRv2LogDir(), filter.toString());
- } else {
- return new File(getAttemptDir(taskid, isCleanup), filter.toString());
- }
+ return new File(getAttemptDir(taskid, isCleanup), filter.toString());
}
static File getRealTaskLogFileLocation(TaskAttemptID taskid,
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java?rev=1177127&r1=1177126&r2=1177127&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java Thu Sep 29 00:33:34 2011
@@ -18,7 +18,6 @@
package org.apache.hadoop.mapred.pipes;
-import java.io.BufferedInputStream;
import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
@@ -27,7 +26,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Random;
import javax.crypto.SecretKey;
@@ -113,6 +111,7 @@ class Application<K1 extends WritableCom
if (interpretor != null) {
cmd.add(interpretor);
}
+
String executable = DistributedCache.getLocalCacheFiles(conf)[0].toString();
if (!new File(executable).canExecute()) {
// LinuxTaskController sets +x permissions on all distcache files already.
@@ -130,7 +129,7 @@ class Application<K1 extends WritableCom
long logLength = TaskLog.getTaskLogLength(conf);
cmd = TaskLog.captureOutAndError(null, cmd, stdout, stderr, logLength,
false);
-
+
process = runClient(cmd, env);
clientSocket = serverSocket.accept();
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java?rev=1177127&r1=1177126&r2=1177127&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java Thu Sep 29 00:33:34 2011
@@ -41,8 +41,8 @@ import org.apache.hadoop.mapreduce.util.
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
/**
* Provides a way to access information about the map/reduce cluster.
@@ -68,41 +68,30 @@ public class Cluster {
}
public Cluster(Configuration conf) throws IOException {
- this(null, conf);
+ this.conf = conf;
+ this.ugi = UserGroupInformation.getCurrentUser();
+ for (ClientProtocolProvider provider : ServiceLoader.load(ClientProtocolProvider.class)) {
+ ClientProtocol clientProtocol = provider.create(conf);
+ if (clientProtocol != null) {
+ clientProtocolProvider = provider;
+ client = clientProtocol;
+ break;
+ }
+ }
}
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser();
- initialize(jobTrackAddr, conf);
- }
-
- private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
- throws IOException {
-
- for (ClientProtocolProvider provider : ServiceLoader
- .load(ClientProtocolProvider.class)) {
- ClientProtocol clientProtocol = null;
- if (jobTrackAddr == null) {
- clientProtocol = provider.create(conf);
- } else {
- clientProtocol = provider.create(jobTrackAddr, conf);
- }
-
+ for (ClientProtocolProvider provider : ServiceLoader.load(ClientProtocolProvider.class)) {
+ ClientProtocol clientProtocol = provider.create(jobTrackAddr, conf);
if (clientProtocol != null) {
clientProtocolProvider = provider;
client = clientProtocol;
break;
}
}
-
- if (null == clientProtocolProvider || null == client) {
- throw new IOException(
- "Cannot initialize Cluster. Please check your configuration for "
- + MRConfig.FRAMEWORK_NAME
- + " and the correspond server addresses.");
- }
}
ClientProtocol getClient() {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java?rev=1177127&r1=1177126&r2=1177127&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java Thu Sep 29 00:33:34 2011
@@ -1239,8 +1239,7 @@ public class Job extends JobContextImpl
if (success) {
LOG.info("Job " + jobId + " completed successfully");
} else {
- LOG.info("Job " + jobId + " failed with state " + status.getState() +
- " due to: " + status.getFailureInfo());
+ LOG.info("Job " + jobId + " failed with state " + status.getState());
}
Counters counters = getCounters();
if (counters != null) {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java?rev=1177127&r1=1177126&r2=1177127&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java Thu Sep 29 00:33:34 2011
@@ -81,7 +81,6 @@ public class JobStatus implements Writab
private String queue;
private JobPriority priority;
private String schedulingInfo="NA";
- private String failureInfo = "NA";
private Map<JobACL, AccessControlList> jobACLs =
new HashMap<JobACL, AccessControlList>();
@@ -280,14 +279,6 @@ public class JobStatus implements Writab
}
/**
- * Set diagnostic information.
- * @param failureInfo diagnostic information
- */
- protected synchronized void setFailureInfo(String failureInfo) {
- this.failureInfo = failureInfo;
- }
-
- /**
* Get queue name
* @return queue name
*/
@@ -368,15 +359,6 @@ public class JobStatus implements Writab
*/
public synchronized JobPriority getPriority() { return priority; }
- /**
- * Gets any available info on the reason of failure of the job.
- * @return diagnostic information on why a job might have failed.
- */
- public synchronized String getFailureInfo() {
- return this.failureInfo;
- }
-
-
/**
* Returns true if the status is for a completed job.
*/
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1177127&r1=1177126&r2=1177127&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Thu Sep 29 00:33:34 2011
@@ -210,8 +210,6 @@ public interface MRJobConfig {
public static final String REDUCE_LOG_LEVEL = "mapreduce.reduce.log.level";
- public static final String DEFAULT_LOG_LEVEL = "INFO";
-
public static final String REDUCE_MERGE_INMEM_THRESHOLD = "mapreduce.reduce.merge.inmem.threshold";
public static final String REDUCE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.input.buffer.percent";
@@ -332,15 +330,9 @@ public interface MRJobConfig {
MR_AM_PREFIX+"num-progress-splits";
public static final int DEFAULT_MR_AM_NUM_PROGRESS_SPLITS = 12;
- /**
- * Upper limit on the number of threads user to launch containers in the app
- * master. Expect level config, you shouldn't be needing it in most cases.
- */
- public static final String MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT =
- MR_AM_PREFIX+"containerlauncher.thread-count-limit";
-
- public static final int DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT =
- 500;
+ /** Number of threads user to launch containers in the app master.*/
+ public static final String MR_AM_CONTAINERLAUNCHER_THREAD_COUNT =
+ MR_AM_PREFIX+"containerlauncher.thread-count";
/** Number of threads to handle job client RPC requests.*/
public static final String MR_AM_JOB_CLIENT_THREAD_COUNT =
@@ -408,69 +400,4 @@ public interface MRJobConfig {
*/
public static final String MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR =
MR_AM_PREFIX + "create-intermediate-jh-base-dir";
-
- public static final String MAPRED_MAP_ADMIN_JAVA_OPTS =
- "mapreduce.admin.map.child.java.opts";
-
- public static final String MAPRED_REDUCE_ADMIN_JAVA_OPTS =
- "mapreduce.admin.reduce.child.java.opts";
-
- public static final String DEFAULT_MAPRED_ADMIN_JAVA_OPTS =
- "-Djava.net.preferIPv4Stack=true " +
- "-Dhadoop.metrics.log.level=WARN ";
-
- public static final String MAPRED_ADMIN_USER_SHELL =
- "mapreduce.admin.user.shell";
-
- public static final String DEFAULT_SHELL = "/bin/bash";
-
- public static final String MAPRED_ADMIN_USER_ENV =
- "mapreduce.admin.user.env";
-
- public static final String DEFAULT_MAPRED_ADMIN_USER_ENV =
- "LD_LIBRARY_PATH=$HADOOP_COMMON_HOME/lib";
-
- public static final String WORKDIR = "work";
-
- public static final String OUTPUT = "output";
-
- public static final String HADOOP_WORK_DIR = "HADOOP_WORK_DIR";
-
- public static final String STDOUT_LOGFILE_ENV = "STDOUT_LOGFILE_ENV";
-
- public static final String STDERR_LOGFILE_ENV = "STDERR_LOGFILE_ENV";
-
- // This should be the directory where splits file gets localized on the node
- // running ApplicationMaster.
- public static final String JOB_SUBMIT_DIR = "jobSubmitDir";
-
- // This should be the name of the localized job-configuration file on the node
- // running ApplicationMaster and Task
- public static final String JOB_CONF_FILE = "job.xml";
-
- // This should be the name of the localized job-jar file on the node running
- // individual containers/tasks.
- public static final String JOB_JAR = "job.jar";
-
- public static final String JOB_SPLIT = "job.split";
-
- public static final String JOB_SPLIT_METAINFO = "job.splitmetainfo";
-
- public static final String APPLICATION_MASTER_CLASS =
- "org.apache.hadoop.mapreduce.v2.app.MRAppMaster";
-
- // The token file for the application. Should contain tokens for access to
- // remote file system and may optionally contain application specific tokens.
- // For now, generated by the AppManagers and used by NodeManagers and the
- // Containers.
- public static final String APPLICATION_TOKENS_FILE = "appTokens";
-
- /** The log directory for the containers */
- public static final String TASK_LOG_DIR = MR_PREFIX + "container.log.dir";
-
- public static final String TASK_LOG_SIZE = MR_PREFIX + "container.log.filesize";
-
- public static final String MAPREDUCE_V2_CHILD_CLASS =
- "org.apache.hadoop.mapred.YarnChild";
-
}
Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Sep 29 00:33:34 2011
@@ -1,3 +1,3 @@
-/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:1166973-1177115
+/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:1166973-1173011
/hadoop/core/branches/branch-0.19/mapred/src/java/mapred-default.xml:713112
/hadoop/core/trunk/src/mapred/mapred-default.xml:776175-785643
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java?rev=1177127&r1=1177126&r2=1177127&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java Thu Sep 29 00:33:34 2011
@@ -135,7 +135,7 @@ public class HistoryClientService extend
webApp = new HsWebApp(history);
String bindAddress = conf.get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_ADDRESS);
- WebApps.$for("jobhistory", this).at(bindAddress).start(webApp);
+ WebApps.$for("yarn", this).at(bindAddress).start(webApp);
}
@Override
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1177127&r1=1177126&r2=1177127&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java Thu Sep 29 00:33:34 2011
@@ -22,6 +22,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -83,6 +84,25 @@ public class JobHistory extends Abstract
private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class);
+ /*
+ * TODO Get rid of this once JobId has it's own comparator
+ */
+ private static final Comparator<JobId> JOB_ID_COMPARATOR =
+ new Comparator<JobId>() {
+ @Override
+ public int compare(JobId o1, JobId o2) {
+ if (o1.getAppId().getClusterTimestamp() >
+ o2.getAppId().getClusterTimestamp()) {
+ return 1;
+ } else if (o1.getAppId().getClusterTimestamp() <
+ o2.getAppId().getClusterTimestamp()) {
+ return -1;
+ } else {
+ return o1.getId() - o2.getId();
+ }
+ }
+ };
+
private static String DONE_BEFORE_SERIAL_TAIL =
JobHistoryUtils.doneSubdirsBeforeSerialTail();
@@ -98,19 +118,19 @@ public class JobHistory extends Abstract
//Maintains minimal details for recent jobs (parsed from history file name).
//Sorted on Job Completion Time.
private final SortedMap<JobId, MetaInfo> jobListCache =
- new ConcurrentSkipListMap<JobId, MetaInfo>();
+ new ConcurrentSkipListMap<JobId, MetaInfo>(JOB_ID_COMPARATOR);
// Re-use exisiting MetaInfo objects if they exist for the specific JobId. (synchronization on MetaInfo)
// Check for existance of the object when using iterators.
private final SortedMap<JobId, MetaInfo> intermediateListCache =
- new ConcurrentSkipListMap<JobId, JobHistory.MetaInfo>();
+ new ConcurrentSkipListMap<JobId, JobHistory.MetaInfo>(JOB_ID_COMPARATOR);
//Maintains a list of known done subdirectories. Not currently used.
private final Set<Path> existingDoneSubdirs = new HashSet<Path>();
private final SortedMap<JobId, Job> loadedJobCache =
- new ConcurrentSkipListMap<JobId, Job>();
+ new ConcurrentSkipListMap<JobId, Job>(JOB_ID_COMPARATOR);
/**
* Maintains a mapping between intermediate user directories and the last
@@ -653,7 +673,7 @@ public class JobHistory extends Abstract
private Map<JobId, Job> getAllJobsInternal() {
//TODO This should ideally be using getAllJobsMetaInfo
// or get rid of that method once Job has APIs for user, finishTime etc.
- SortedMap<JobId, Job> result = new TreeMap<JobId, Job>();
+ SortedMap<JobId, Job> result = new TreeMap<JobId, Job>(JOB_ID_COMPARATOR);
try {
scanIntermediateDirectory();
} catch (IOException e) {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml?rev=1177127&r1=1177126&r2=1177127&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml Thu Sep 29 00:33:34 2011
@@ -66,12 +66,6 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>
<scope>test</scope>
</dependency>
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java?rev=1177127&r1=1177126&r2=1177127&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java Thu Sep 29 00:33:34 2011
@@ -1,20 +1,20 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
package org.apache.hadoop.mapred;
@@ -42,29 +42,29 @@ public class ClientCache {
private final Configuration conf;
private final ResourceMgrDelegate rm;
-
+
private static final Log LOG = LogFactory.getLog(ClientCache.class);
private Map<JobID, ClientServiceDelegate> cache =
- new HashMap<JobID, ClientServiceDelegate>();
-
+ new HashMap<JobID, ClientServiceDelegate>();
+
private MRClientProtocol hsProxy;
- public ClientCache(Configuration conf, ResourceMgrDelegate rm) {
+ ClientCache(Configuration conf, ResourceMgrDelegate rm) {
this.conf = conf;
this.rm = rm;
}
//TODO: evict from the cache on some threshold
- public synchronized ClientServiceDelegate getClient(JobID jobId) {
- if (hsProxy == null) {
+ synchronized ClientServiceDelegate getClient(JobID jobId) {
+ if (hsProxy == null) {
try {
- hsProxy = instantiateHistoryProxy();
- } catch (IOException e) {
- LOG.warn("Could not connect to History server.", e);
- throw new YarnException("Could not connect to History server.", e);
- }
- }
+ hsProxy = instantiateHistoryProxy();
+ } catch (IOException e) {
+ LOG.warn("Could not connect to History server.", e);
+ throw new YarnException("Could not connect to History server.", e);
+ }
+ }
ClientServiceDelegate client = cache.get(jobId);
if (client == null) {
client = new ClientServiceDelegate(conf, rm, jobId, hsProxy);
@@ -74,7 +74,7 @@ public class ClientCache {
}
private MRClientProtocol instantiateHistoryProxy()
- throws IOException {
+ throws IOException {
final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS);
if (StringUtils.isEmpty(serviceAddr)) {
return null;
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1177127&r1=1177126&r2=1177127&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Thu Sep 29 00:33:34 2011
@@ -70,7 +70,7 @@ import org.apache.hadoop.yarn.ipc.YarnRP
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
-public class ClientServiceDelegate {
+class ClientServiceDelegate {
private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
// Caches for per-user NotRunningJobs
@@ -87,7 +87,7 @@ public class ClientServiceDelegate {
private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private static String UNKNOWN_USER = "Unknown User";
- public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm,
+ ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm,
JobID jobId, MRClientProtocol historyServerProxy) {
this.conf = new Configuration(conf); // Cloning for modifying.
// For faster redirects from AM to HS.
@@ -101,20 +101,16 @@ public class ClientServiceDelegate {
// Get the instance of the NotRunningJob corresponding to the specified
// user and state
- private NotRunningJob getNotRunningJob(ApplicationReport applicationReport,
- JobState state) {
+ private NotRunningJob getNotRunningJob(String user, JobState state) {
synchronized (notRunningJobs) {
HashMap<String, NotRunningJob> map = notRunningJobs.get(state);
if (map == null) {
map = new HashMap<String, NotRunningJob>();
notRunningJobs.put(state, map);
}
- String user =
- (applicationReport == null) ?
- UNKNOWN_USER : applicationReport.getUser();
NotRunningJob notRunningJob = map.get(user);
if (notRunningJob == null) {
- notRunningJob = new NotRunningJob(applicationReport, state);
+ notRunningJob = new NotRunningJob(user, state);
map.put(user, notRunningJob);
}
return notRunningJob;
@@ -134,7 +130,7 @@ public class ClientServiceDelegate {
if (application == null) {
LOG.info("Could not get Job info from RM for job " + jobId
+ ". Redirecting to job history server.");
- return checkAndGetHSProxy(null, JobState.NEW);
+ return checkAndGetHSProxy(UNKNOWN_USER, JobState.NEW);
}
try {
if (application.getHost() == null || "".equals(application.getHost())) {
@@ -175,7 +171,7 @@ public class ClientServiceDelegate {
if (application == null) {
LOG.info("Could not get Job info from RM for job " + jobId
+ ". Redirecting to job history server.");
- return checkAndGetHSProxy(null, JobState.RUNNING);
+ return checkAndGetHSProxy(UNKNOWN_USER, JobState.RUNNING);
}
} catch (InterruptedException e) {
LOG.warn("getProxy() call interruped", e);
@@ -195,17 +191,17 @@ public class ClientServiceDelegate {
if (application.getState() == ApplicationState.NEW ||
application.getState() == ApplicationState.SUBMITTED) {
realProxy = null;
- return getNotRunningJob(application, JobState.NEW);
+ return getNotRunningJob(user, JobState.NEW);
}
if (application.getState() == ApplicationState.FAILED) {
realProxy = null;
- return getNotRunningJob(application, JobState.FAILED);
+ return getNotRunningJob(user, JobState.FAILED);
}
if (application.getState() == ApplicationState.KILLED) {
realProxy = null;
- return getNotRunningJob(application, JobState.KILLED);
+ return getNotRunningJob(user, JobState.KILLED);
}
//History server can serve a job only if application
@@ -213,16 +209,15 @@ public class ClientServiceDelegate {
if (application.getState() == ApplicationState.SUCCEEDED) {
LOG.info("Application state is completed. " +
"Redirecting to job history server");
- realProxy = checkAndGetHSProxy(application, JobState.SUCCEEDED);
+ realProxy = checkAndGetHSProxy(user, JobState.SUCCEEDED);
}
return realProxy;
}
- private MRClientProtocol checkAndGetHSProxy(
- ApplicationReport applicationReport, JobState state) {
+ private MRClientProtocol checkAndGetHSProxy(String user, JobState state) {
if (null == historyServerProxy) {
LOG.warn("Job History Server is not configured.");
- return getNotRunningJob(applicationReport, state);
+ return getNotRunningJob(user, state);
}
return historyServerProxy;
}
@@ -279,7 +274,7 @@ public class ClientServiceDelegate {
}
}
- public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
+ org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
InterruptedException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0);
GetCountersRequest request = recordFactory.newRecordInstance(GetCountersRequest.class);
@@ -290,7 +285,7 @@ public class ClientServiceDelegate {
}
- public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1, int arg2)
+ TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1, int arg2)
throws IOException, InterruptedException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter
.toYarn(arg0);
@@ -308,7 +303,7 @@ public class ClientServiceDelegate {
.toArray(new org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[0]));
}
- public String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID arg0)
+ String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID arg0)
throws IOException, InterruptedException {
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter
@@ -326,25 +321,24 @@ public class ClientServiceDelegate {
return result;
}
- public JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException {
+ JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID);
- GetJobReportRequest request =
- recordFactory.newRecordInstance(GetJobReportRequest.class);
+ GetJobReportRequest request = recordFactory.newRecordInstance(GetJobReportRequest.class);
request.setJobId(jobId);
JobReport report = ((GetJobReportResponse) invoke("getJobReport",
GetJobReportRequest.class, request)).getJobReport();
String jobFile = MRApps.getJobFile(conf, report.getUser(), oldJobID);
- return TypeConverter.fromYarn(report, jobFile);
+ //TODO: add tracking url in JobReport
+ return TypeConverter.fromYarn(report, jobFile, "");
}
- public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType)
+ org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType)
throws YarnRemoteException, YarnRemoteException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID);
- GetTaskReportsRequest request =
- recordFactory.newRecordInstance(GetTaskReportsRequest.class);
+ GetTaskReportsRequest request = recordFactory.newRecordInstance(GetTaskReportsRequest.class);
request.setJobId(jobId);
request.setTaskType(TypeConverter.toYarn(taskType));
@@ -356,7 +350,7 @@ public class ClientServiceDelegate {
(taskReports).toArray(new org.apache.hadoop.mapreduce.TaskReport[0]);
}
- public boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
+ boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
throws YarnRemoteException {
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID
= TypeConverter.toYarn(taskAttemptID);
@@ -372,7 +366,7 @@ public class ClientServiceDelegate {
return true;
}
- public boolean killJob(JobID oldJobID)
+ boolean killJob(JobID oldJobID)
throws YarnRemoteException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId
= TypeConverter.toYarn(oldJobID);
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java?rev=1177127&r1=1177126&r2=1177127&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java Thu Sep 29 00:33:34 2011
@@ -22,8 +22,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import org.apache.commons.lang.NotImplementedException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
@@ -55,41 +53,20 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
public class NotRunningJob implements MRClientProtocol {
- private static final Log LOG = LogFactory.getLog(NotRunningJob.class);
-
private RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private final JobState jobState;
- private final ApplicationReport applicationReport;
-
-
- private ApplicationReport getUnknownApplicationReport() {
- ApplicationReport unknown =
- recordFactory.newRecordInstance(ApplicationReport.class);
- unknown.setUser("N/A");
- unknown.setHost("N/A");
- unknown.setName("N/A");
- unknown.setQueue("N/A");
- unknown.setStartTime(0);
- unknown.setFinishTime(0);
- unknown.setTrackingUrl("N/A");
- unknown.setDiagnostics("N/A");
- LOG.info("getUnknownApplicationReport");
- return unknown;
- }
-
- NotRunningJob(ApplicationReport applicationReport, JobState jobState) {
- this.applicationReport =
- (applicationReport == null) ?
- getUnknownApplicationReport() : applicationReport;
+ private final String user;
+
+ NotRunningJob(String username, JobState jobState) {
+ this.user = username;
this.jobState = jobState;
}
@@ -124,19 +101,15 @@ public class NotRunningJob implements MR
@Override
public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws YarnRemoteException {
+ GetJobReportResponse resp =
+ recordFactory.newRecordInstance(GetJobReportResponse.class);
JobReport jobReport =
recordFactory.newRecordInstance(JobReport.class);
jobReport.setJobId(request.getJobId());
- jobReport.setJobState(jobState);
- jobReport.setUser(applicationReport.getUser());
- jobReport.setStartTime(applicationReport.getStartTime());
- jobReport.setDiagnostics(applicationReport.getDiagnostics());
- jobReport.setJobName(applicationReport.getName());
- jobReport.setTrackingUrl(applicationReport.getTrackingUrl());
- jobReport.setFinishTime(applicationReport.getFinishTime());
+ jobReport.setJobState(this.jobState);
- GetJobReportResponse resp =
- recordFactory.newRecordInstance(GetJobReportResponse.class);
+ jobReport.setUser(this.user);
+ // TODO: Add jobName & other job information that is available
resp.setJobReport(jobReport);
return resp;
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1177127&r1=1177126&r2=1177127&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Thu Sep 29 00:33:34 2011
@@ -32,19 +32,19 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.QueueAclsInfo;
import org.apache.hadoop.mapreduce.QueueInfo;
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.MRConstants;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -53,7 +53,7 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
@@ -79,10 +79,6 @@ public class ResourceMgrDelegate {
private ApplicationId applicationId;
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
- /**
- * Delegate responsible for communicating with the Resource Manager's {@link ClientRMProtocol}.
- * @param conf the configuration object.
- */
public ResourceMgrDelegate(YarnConfiguration conf) {
this.conf = conf;
YarnRPC rpc = YarnRPC.create(this.conf);
@@ -101,16 +97,6 @@ public class ResourceMgrDelegate {
LOG.info("Connected to ResourceManager at " + rmAddress);
}
- /**
- * Used for injecting applicationsManager, mostly for testing.
- * @param conf the configuration object
- * @param applicationsManager the handle to talk the resource managers {@link ClientRMProtocol}.
- */
- public ResourceMgrDelegate(YarnConfiguration conf, ClientRMProtocol applicationsManager) {
- this.conf = conf;
- this.applicationsManager = applicationsManager;
- }
-
public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
throws IOException, InterruptedException {
return;
@@ -169,8 +155,8 @@ public class ResourceMgrDelegate {
}
public JobID getNewJobID() throws IOException, InterruptedException {
- GetNewApplicationRequest request = recordFactory.newRecordInstance(GetNewApplicationRequest.class);
- applicationId = applicationsManager.getNewApplication(request).getApplicationId();
+ GetNewApplicationIdRequest request = recordFactory.newRecordInstance(GetNewApplicationIdRequest.class);
+ applicationId = applicationsManager.getNewApplicationId(request).getApplicationId();
return TypeConverter.fromYarn(applicationId);
}
@@ -268,7 +254,7 @@ public class ResourceMgrDelegate {
public String getSystemDir() throws IOException, InterruptedException {
- Path sysDir = new Path(MRJobConfig.JOB_SUBMIT_DIR);
+ Path sysDir = new Path(MRConstants.JOB_SUBMIT_DIR);
//FileContext.getFileContext(conf).delete(sysDir, true);
return sysDir.toString();
}
@@ -308,9 +294,9 @@ public class ResourceMgrDelegate {
}
public void killApplication(ApplicationId applicationId) throws IOException {
- KillApplicationRequest request = recordFactory.newRecordInstance(KillApplicationRequest.class);
+ FinishApplicationRequest request = recordFactory.newRecordInstance(FinishApplicationRequest.class);
request.setApplicationId(applicationId);
- applicationsManager.forceKillApplication(request);
+ applicationsManager.finishApplication(request);
LOG.info("Killing application " + applicationId);
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1177127&r1=1177126&r2=1177127&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Thu Sep 29 00:33:34 2011
@@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.TaskT
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.MRConstants;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.Credentials;
@@ -59,7 +60,6 @@ import org.apache.hadoop.security.author
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationState;
@@ -105,22 +105,10 @@ public class YARNRunner implements Clien
* @param resMgrDelegate the resourcemanager client handle.
*/
public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {
- this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate));
- }
-
- /**
- * Similar to {@link YARNRunner#YARNRunner(Configuration, ResourceMgrDelegate)}
- * but allowing injecting {@link ClientCache}. Enable mocking and testing.
- * @param conf the configuration object
- * @param resMgrDelegate the resource manager delegate
- * @param clientCache the client cache object.
- */
- public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
- ClientCache clientCache) {
this.conf = conf;
try {
this.resMgrDelegate = resMgrDelegate;
- this.clientCache = clientCache;
+ this.clientCache = new ClientCache(this.conf, resMgrDelegate);
this.defaultFileContext = FileContext.getFileContext(this.conf);
} catch (UnsupportedFileSystemException ufe) {
throw new RuntimeException("Error in instantiating YarnClient", ufe);
@@ -222,7 +210,7 @@ public class YARNRunner implements Clien
// Upload only in security mode: TODO
Path applicationTokensFile =
- new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
+ new Path(jobSubmitDir, MRConstants.APPLICATION_TOKENS_FILE);
try {
ts.writeTokenStorageFile(applicationTokensFile, conf);
} catch (IOException e) {
@@ -238,9 +226,7 @@ public class YARNRunner implements Clien
ApplicationReport appMaster = resMgrDelegate
.getApplicationReport(applicationId);
- String diagnostics =
- (appMaster == null ?
- "application report is null" : appMaster.getDiagnostics());
+ String diagnostics = (appMaster == null ? "application report is null" : appMaster.getDiagnostics());
if (appMaster == null || appMaster.getState() == ApplicationState.FAILED
|| appMaster.getState() == ApplicationState.KILLED) {
throw new IOException("Failed to run job : " +
@@ -277,7 +263,7 @@ public class YARNRunner implements Clien
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
- Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE);
+ Path jobConfPath = new Path(jobSubmitDir, MRConstants.JOB_CONF_FILE);
URL yarnUrlForJobSubmitDir = ConverterUtils
.getYarnUrlFromPath(defaultFileContext.getDefaultFileSystem()
@@ -286,13 +272,13 @@ public class YARNRunner implements Clien
LOG.debug("Creating setup context, jobSubmitDir url is "
+ yarnUrlForJobSubmitDir);
- localResources.put(MRJobConfig.JOB_CONF_FILE,
+ localResources.put(MRConstants.JOB_CONF_FILE,
createApplicationResource(defaultFileContext,
jobConfPath));
if (jobConf.get(MRJobConfig.JAR) != null) {
- localResources.put(MRJobConfig.JOB_JAR,
+ localResources.put(MRConstants.JOB_JAR,
createApplicationResource(defaultFileContext,
- new Path(jobSubmitDir, MRJobConfig.JOB_JAR)));
+ new Path(jobSubmitDir, MRConstants.JOB_JAR)));
} else {
// Job jar may be null. For e.g, for pipes, the job jar is the hadoop
// mapreduce jar itself which is already on the classpath.
@@ -301,12 +287,10 @@ public class YARNRunner implements Clien
}
// TODO gross hack
- for (String s : new String[] {
- MRJobConfig.JOB_SPLIT,
- MRJobConfig.JOB_SPLIT_METAINFO,
- MRJobConfig.APPLICATION_TOKENS_FILE }) {
+ for (String s : new String[] { "job.split", "job.splitmetainfo",
+ MRConstants.APPLICATION_TOKENS_FILE }) {
localResources.put(
- MRJobConfig.JOB_SUBMIT_DIR + "/" + s,
+ MRConstants.JOB_SUBMIT_DIR + "/" + s,
createApplicationResource(defaultFileContext,
new Path(jobSubmitDir, s)));
}
@@ -320,24 +304,22 @@ public class YARNRunner implements Clien
}
// Setup the command to run the AM
+ String javaHome = "$JAVA_HOME";
Vector<CharSequence> vargs = new Vector<CharSequence>(8);
- vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
-
- long logSize = TaskLog.getTaskLogLength(new JobConf(conf));
- vargs.add("-Dlog4j.configuration=container-log4j.properties");
- vargs.add("-D" + MRJobConfig.TASK_LOG_DIR + "="
- + ApplicationConstants.LOG_DIR_EXPANSION_VAR);
- vargs.add("-D" + MRJobConfig.TASK_LOG_SIZE + "=" + logSize);
+ vargs.add(javaHome + "/bin/java");
+ vargs.add("-Dhadoop.root.logger="
+ + conf.get(MRJobConfig.MR_AM_LOG_OPTS,
+ MRJobConfig.DEFAULT_MR_AM_LOG_OPTS) + ",console");
vargs.add(conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS));
- vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
- vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
- Path.SEPARATOR + ApplicationConstants.STDOUT);
- vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
- Path.SEPARATOR + ApplicationConstants.STDERR);
-
+ vargs.add("org.apache.hadoop.mapreduce.v2.app.MRAppMaster");
+ vargs.add(String.valueOf(applicationId.getClusterTimestamp()));
+ vargs.add(String.valueOf(applicationId.getId()));
+ vargs.add(ApplicationConstants.AM_FAIL_COUNT_STRING);
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
Vector<String> vargsFinal = new Vector<String>(8);
// Final commmand
@@ -350,13 +332,15 @@ public class YARNRunner implements Clien
LOG.info("Command to launch container for ApplicationMaster is : "
+ mergedCommand);
- // Setup the CLASSPATH in environment
- // i.e. add { job jar, CWD, Hadoop jars} to classpath.
+ // Setup the environment - Add { job jar, MR app jar } to classpath.
Map<String, String> environment = new HashMap<String, String>();
- MRApps.setClasspath(environment);
-
+ MRApps.setInitialClasspath(environment);
+ MRApps.addToClassPath(environment, MRConstants.JOB_JAR);
+ MRApps.addToClassPath(environment,
+ MRConstants.YARN_MAPREDUCE_APP_JAR_PATH);
+
// Parse distributed cache
- MRApps.setupDistributedCache(jobConf, localResources);
+ MRApps.setupDistributedCache(jobConf, localResources, environment);
// Setup ContainerLaunchContext for AM container
ContainerLaunchContext amContainer =
@@ -441,35 +425,9 @@ public class YARNRunner implements Clien
@Override
public void killJob(JobID arg0) throws IOException, InterruptedException {
- /* check if the status is not running, if not send kill to RM */
- JobStatus status = clientCache.getClient(arg0).getJobStatus(arg0);
- if (status.getState() != JobStatus.State.RUNNING) {
- resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
- return;
- }
-
- try {
- /* send a kill to the AM */
- clientCache.getClient(arg0).killJob(arg0);
- long currentTimeMillis = System.currentTimeMillis();
- long timeKillIssued = currentTimeMillis;
- while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState()
- != JobStatus.State.KILLED)) {
- try {
- Thread.sleep(1000L);
- } catch(InterruptedException ie) {
- /** interrupted, just break */
- break;
- }
- currentTimeMillis = System.currentTimeMillis();
- status = clientCache.getClient(arg0).getJobStatus(arg0);
- }
- } catch(IOException io) {
- LOG.debug("Error when checking for application status", io);
- }
- if (status.getState() != JobStatus.State.KILLED) {
- resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
- }
+ if (!clientCache.getClient(arg0).killJob(arg0)) {
+ resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
+ }
}
@Override
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java?rev=1177127&r1=1177126&r2=1177127&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java Thu Sep 29 00:33:34 2011
@@ -68,8 +68,8 @@ import org.apache.hadoop.metrics2.lib.De
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -78,8 +78,8 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
@@ -245,7 +245,7 @@ public class TestClientRedirect {
}
@Override
- public GetNewApplicationResponse getNewApplication(GetNewApplicationRequest request) throws YarnRemoteException {
+ public GetNewApplicationIdResponse getNewApplicationId(GetNewApplicationIdRequest request) throws YarnRemoteException {
return null;
}
@@ -267,13 +267,6 @@ public class TestClientRedirect {
application.setHost(split[0]);
application.setRpcPort(Integer.parseInt(split[1]));
application.setUser("TestClientRedirect-user");
- application.setName("N/A");
- application.setQueue("N/A");
- application.setStartTime(0);
- application.setFinishTime(0);
- application.setTrackingUrl("N/A");
- application.setDiagnostics("N/A");
-
GetApplicationReportResponse response = recordFactory
.newRecordInstance(GetApplicationReportResponse.class);
response.setApplicationReport(application);
@@ -288,9 +281,9 @@ public class TestClientRedirect {
}
@Override
- public KillApplicationResponse forceKillApplication(
- KillApplicationRequest request) throws YarnRemoteException {
- return recordFactory.newRecordInstance(KillApplicationResponse.class);
+ public FinishApplicationResponse finishApplication(
+ FinishApplicationRequest request) throws YarnRemoteException {
+ return null;
}
@Override
@@ -451,7 +444,7 @@ public class TestClientRedirect {
@Override
public KillJobResponse killJob(KillJobRequest request)
throws YarnRemoteException {
- return recordFactory.newRecordInstance(KillJobResponse.class);
+ return null;
}
@Override
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java?rev=1177127&r1=1177126&r2=1177127&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java Thu Sep 29 00:33:34 2011
@@ -109,7 +109,7 @@ public class TestClientServiceDelegate {
ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
null, getRMDelegate());
JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
- Assert.assertEquals("N/A", jobStatus.getUsername());
+ Assert.assertEquals("Unknown User", jobStatus.getUsername());
Assert.assertEquals(JobStatus.State.PREP, jobStatus.getState());
//RM has app report and job History Server is not configured
@@ -145,13 +145,6 @@ public class TestClientServiceDelegate {
.newRecord(ApplicationReport.class);
applicationReport.setState(ApplicationState.SUCCEEDED);
applicationReport.setUser("root");
- applicationReport.setHost("N/A");
- applicationReport.setName("N/A");
- applicationReport.setQueue("N/A");
- applicationReport.setStartTime(0);
- applicationReport.setFinishTime(0);
- applicationReport.setTrackingUrl("N/A");
- applicationReport.setDiagnostics("N/A");
return applicationReport;
}