You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:26:22 UTC
svn commit: r885145 [28/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233:
./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/
src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/
src/benchmarks/gridmix2/ src/benchmarks/gridmix2/sr...
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java Sat Nov 28 20:26:01 2009
@@ -64,7 +64,7 @@
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
- new OutputLogFilter()));
+ new Utils.OutputFileUtils.OutputFilesFilter()));
if (outputFiles.length > 0) {
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestCommandLineJobSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestCommandLineJobSubmission.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestCommandLineJobSubmission.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestCommandLineJobSubmission.java Sat Nov 28 20:26:01 2009
@@ -1,4 +1,5 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -18,11 +19,13 @@
import java.io.File;
import java.io.FileOutputStream;
+import java.io.IOException;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -59,17 +62,59 @@
FileOutputStream fstream = new FileOutputStream(f);
fstream.write("somestrings".getBytes());
fstream.close();
- String[] args = new String[8];
+ File f1 = new File(thisbuildDir, "files_tmp1");
+ fstream = new FileOutputStream(f1);
+ fstream.write("somestrings".getBytes());
+ fstream.close();
+
+ // copy files to dfs
+ Path cachePath = new Path("/cacheDir");
+ if (!fs.mkdirs(cachePath)) {
+ throw new IOException(
+ "Mkdirs failed to create " + cachePath.toString());
+ }
+ Path localCachePath = new Path(System.getProperty("test.cache.data"));
+ Path txtPath = new Path(localCachePath, new Path("test.txt"));
+ Path jarPath = new Path(localCachePath, new Path("test.jar"));
+ Path zipPath = new Path(localCachePath, new Path("test.zip"));
+ Path tarPath = new Path(localCachePath, new Path("test.tar"));
+ Path tgzPath = new Path(localCachePath, new Path("test.tgz"));
+ fs.copyFromLocalFile(txtPath, cachePath);
+ fs.copyFromLocalFile(jarPath, cachePath);
+ fs.copyFromLocalFile(zipPath, cachePath);
+
+ // construct options for -files
+ String[] files = new String[3];
+ files[0] = f.toString();
+ files[1] = f1.toString() + "#localfilelink";
+ files[2] =
+ fs.getUri().resolve(cachePath + "/test.txt#dfsfilelink").toString();
+
+ // construct options for -libjars
+ String[] libjars = new String[2];
+ libjars[0] = "build/test/mapred/testjar/testjob.jar";
+ libjars[1] = fs.getUri().resolve(cachePath + "/test.jar").toString();
+
+ // construct options for archives
+ String[] archives = new String[3];
+ archives[0] = tgzPath.toString();
+ archives[1] = tarPath + "#tarlink";
+ archives[2] =
+ fs.getUri().resolve(cachePath + "/test.zip#ziplink").toString();
+
+ String[] args = new String[10];
args[0] = "-files";
- args[1] = f.toString();
+ args[1] = StringUtils.arrayToString(files);
args[2] = "-libjars";
// the testjob.jar as a temporary jar file
// rather than creating its own
- args[3] = "build/test/mapred/testjar/testjob.jar";
- args[4] = "-D";
- args[5] = "mapred.output.committer.class=testjar.CustomOutputCommitter";
- args[6] = input.toString();
- args[7] = output.toString();
+ args[3] = StringUtils.arrayToString(libjars);
+ args[4] = "-archives";
+ args[5] = StringUtils.arrayToString(archives);
+ args[6] = "-D";
+ args[7] = "mapred.output.committer.class=testjar.CustomOutputCommitter";
+ args[8] = input.toString();
+ args[9] = output.toString();
JobConf jobConf = mr.createJobConf();
//before running the job, verify that libjar is not in client classpath
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestComparators.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestComparators.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestComparators.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestComparators.java Sat Nov 28 20:26:01 2009
@@ -127,7 +127,7 @@
public void reduce(IntWritable key, Iterator<Writable> values,
OutputCollector<IntWritable, Text> out,
Reporter reporter) throws IOException {
- int currentKey = ((IntWritable)(key)).get();
+ int currentKey = key.get();
// keys should be in descending order
if (currentKey > lastKey) {
fail("Keys not in sorted descending order");
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestCompressedEmptyMapOutputs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestCompressedEmptyMapOutputs.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestCompressedEmptyMapOutputs.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestCompressedEmptyMapOutputs.java Sat Nov 28 20:26:01 2009
@@ -49,8 +49,8 @@
throws Exception {
// Scale down the default settings for RandomWriter for the test-case
// Generates NUM_HADOOP_SLAVES * RW_MAPS_PER_HOST * RW_BYTES_PER_MAP -> 1MB
- job.setInt("test.randomwrite.bytes_per_map", RW_BYTES_PER_MAP);
- job.setInt("test.randomwriter.maps_per_host", RW_MAPS_PER_HOST);
+ job.setInt(RandomWriter.BYTES_PER_MAP, RW_BYTES_PER_MAP);
+ job.setInt(RandomWriter.MAPS_PER_HOST, RW_MAPS_PER_HOST);
String[] rwArgs = {sortInput.toString()};
// Run RandomWriter
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestControlledMapReduceJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestControlledMapReduceJob.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestControlledMapReduceJob.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestControlledMapReduceJob.java Sat Nov 28 20:26:01 2009
@@ -23,6 +23,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.ControlledMapReduceJob.ControlledMapReduceJobRunner;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
/**
* Test to verify the controlled behavior of a ControlledMapReduceJob.
@@ -42,8 +43,8 @@
throws Exception {
Properties props = new Properties();
- props.setProperty("mapred.tasktracker.map.tasks.maximum", "2");
- props.setProperty("mapred.tasktracker.reduce.tasks.maximum", "2");
+ props.setProperty(TTConfig.TT_MAP_SLOTS, "2");
+ props.setProperty(TTConfig.TT_REDUCE_SLOTS, "2");
startCluster(true, props);
LOG.info("Started the cluster");
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestEmptyJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestEmptyJob.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestEmptyJob.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestEmptyJob.java Sat Nov 28 20:26:01 2009
@@ -34,6 +34,8 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
/**
* A JUnit test to test Map-Reduce empty jobs.
@@ -48,11 +50,11 @@
MiniMRCluster mr = null;
- /** Committer with cleanup waiting on a signal
+ /** Committer with commit waiting on a signal
*/
- static class CommitterWithDelayCleanup extends FileOutputCommitter {
+ static class CommitterWithDelayCommit extends FileOutputCommitter {
@Override
- public void cleanupJob(JobContext context) throws IOException {
+ public void commitJob(JobContext context) throws IOException {
Configuration conf = context.getConfiguration();
Path share = new Path(conf.get("share"));
FileSystem fs = FileSystem.get(conf);
@@ -64,7 +66,7 @@
}
UtilsForTests.waitFor(100);
}
- super.cleanupJob(context);
+ super.commitJob(context);
}
}
@@ -101,7 +103,7 @@
conf.setJobName("empty");
// use an InputFormat which returns no split
conf.setInputFormat(EmptyInputFormat.class);
- conf.setOutputCommitter(CommitterWithDelayCleanup.class);
+ conf.setOutputCommitter(CommitterWithDelayCommit.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(IdentityMapper.class);
@@ -195,7 +197,8 @@
+ " and not 1.0", runningJob.cleanupProgress() == 1.0);
assertTrue("Job output directory doesn't exit!", fs.exists(outDir));
- FileStatus[] list = fs.listStatus(outDir, new OutputLogFilter());
+ FileStatus[] list = fs.listStatus(outDir,
+ new Utils.OutputFileUtils.OutputFilesFilter());
assertTrue("Number of part-files is " + list.length + " and not "
+ numReduces, list.length == numReduces);
@@ -221,10 +224,10 @@
JobConf conf = new JobConf();
fileSys = FileSystem.get(conf);
- conf.set("mapred.job.tracker.handler.count", "1");
- conf.set("mapred.job.tracker", "127.0.0.1:0");
- conf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
- conf.set("mapred.task.tracker.http.address", "127.0.0.1:0");
+ conf.set(JTConfig.JT_IPC_HANDLER_COUNT, "1");
+ conf.set(JTConfig.JT_IPC_ADDRESS, "127.0.0.1:0");
+ conf.set(JTConfig.JT_HTTP_ADDRESS, "127.0.0.1:0");
+ conf.set(TTConfig.TT_HTTP_ADDRESS, "127.0.0.1:0");
mr =
new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1,
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestFieldSelection.java Sat Nov 28 20:26:01 2009
@@ -72,9 +72,9 @@
job.setOutputFormat(TextOutputFormat.class);
job.setNumReduceTasks(1);
- job.set("mapred.data.field.separator", "-");
- job.set("map.output.key.value.fields.spec", "6,5,1-3:0-");
- job.set("reduce.output.key.value.fields.spec", ":4,3,2,1,0,0-");
+ job.set("mapreduce.fieldsel.data.field.separator", "-");
+ job.set("mapreduce.fieldsel.mapreduce.fieldsel.map.output.key.value.fields.spec", "6,5,1-3:0-");
+ job.set("mapreduce.fieldsel.mapreduce.fieldsel.reduce.output.key.value.fields.spec", ":4,3,2,1,0,0-");
JobClient.runJob(job);
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java Sat Nov 28 20:26:01 2009
@@ -23,6 +23,8 @@
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
public class TestFileOutputCommitter extends TestCase {
private static Path outDir = new Path(
@@ -35,11 +37,11 @@
@SuppressWarnings("unchecked")
public void testCommitter() throws Exception {
JobConf job = new JobConf();
- job.set("mapred.task.id", attempt);
+ job.set(JobContext.TASK_ATTEMPT_ID, attempt);
job.setOutputCommitter(FileOutputCommitter.class);
FileOutputFormat.setOutputPath(job, outDir);
- JobContext jContext = new JobContext(job, taskID.getJobID());
- TaskAttemptContext tContext = new TaskAttemptContext(job, taskID);
+ JobContext jContext = new JobContextImpl(job, taskID.getJobID());
+ TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID);
FileOutputCommitter committer = new FileOutputCommitter();
FileOutputFormat.setWorkOutputPath(job,
committer.getTempTaskOutputPath(tContext));
@@ -73,7 +75,7 @@
theRecordWriter.close(reporter);
}
committer.commitTask(tContext);
- committer.cleanupJob(jContext);
+ committer.commitJob(jContext);
File expectedFile = new File(new Path(outDir, file).toString());
StringBuffer expectedOutput = new StringBuffer();
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestIndexCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestIndexCache.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestIndexCache.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestIndexCache.java Sat Nov 28 20:26:01 2009
@@ -29,6 +29,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import junit.framework.TestCase;
@@ -44,7 +45,7 @@
Path p = new Path(System.getProperty("test.build.data", "/tmp"),
"cache").makeQualified(fs);
fs.delete(p, true);
- conf.setInt("mapred.tasktracker.indexcache.mb", 1);
+ conf.setInt(TTConfig.TT_INDEX_CACHE, 1);
final int partsPerMap = 1000;
final int bytesPerFile = partsPerMap * 24;
IndexCache cache = new IndexCache(conf);
@@ -111,7 +112,7 @@
Path p = new Path(System.getProperty("test.build.data", "/tmp"),
"cache").makeQualified(fs);
fs.delete(p, true);
- conf.setInt("mapred.tasktracker.indexcache.mb", 1);
+ conf.setInt(TTConfig.TT_INDEX_CACHE, 1);
IndexCache cache = new IndexCache(conf);
Path f = new Path(p, "badindex");
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java Sat Nov 28 20:26:01 2009
@@ -23,6 +23,8 @@
import java.io.IOException;
import java.util.UUID;
+import javax.security.auth.login.LoginException;
+
import junit.framework.TestCase;
import org.apache.hadoop.fs.FileStatus;
@@ -32,7 +34,9 @@
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.security.UserGroupInformation;
/**
* Re-runs a map task using the IsolationRunner.
@@ -104,15 +108,20 @@
}
private Path getAttemptJobXml(JobConf conf, JobID jobId, TaskType taskType)
- throws IOException {
+ throws IOException,
+ LoginException {
String taskid =
new TaskAttemptID(new TaskID(jobId, taskType, 0), 0).toString();
- return new LocalDirAllocator("mapred.local.dir").getLocalPathToRead(
- TaskTracker.getTaskConfFile(jobId.toString(), taskid, false), conf);
+ return new LocalDirAllocator(MRConfig.LOCAL_DIR).getLocalPathToRead(
+ TaskTracker.getTaskConfFile(UserGroupInformation.login(conf)
+ .getUserName(), jobId.toString(), taskid, false), conf);
}
- public void testIsolationRunOfMapTask() throws
- IOException, InterruptedException, ClassNotFoundException {
+ public void testIsolationRunOfMapTask()
+ throws IOException,
+ InterruptedException,
+ ClassNotFoundException,
+ LoginException {
MiniMRCluster mr = null;
try {
mr = new MiniMRCluster(1, "file:///", 4);
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJavaSerialization.java Sat Nov 28 20:26:01 2009
@@ -116,7 +116,8 @@
JobClient.runJob(conf);
Path[] outputFiles = FileUtil.stat2Paths(
- fs.listStatus(OUTPUT_DIR, new OutputLogFilter()));
+ fs.listStatus(OUTPUT_DIR,
+ new Utils.OutputFileUtils.OutputFilesFilter()));
assertEquals(1, outputFiles.length);
InputStream is = fs.open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
@@ -161,7 +162,8 @@
JobClient.runJob(conf);
Path[] outputFiles = FileUtil.stat2Paths(
- fs.listStatus(OUTPUT_DIR, new OutputLogFilter()));
+ fs.listStatus(OUTPUT_DIR,
+ new Utils.OutputFileUtils.OutputFilesFilter()));
assertEquals(1, outputFiles.length);
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobClient.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobClient.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobClient.java Sat Nov 28 20:26:01 2009
@@ -17,32 +17,24 @@
*/
package org.apache.hadoop.mapred;
-import java.io.BufferedReader;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
-import java.io.PrintStream;
import java.io.Writer;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.TestMRJobClient;
+import org.apache.hadoop.mapreduce.tools.CLI;
import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-public class TestJobClient extends ClusterMapReduceTestCase {
-
- private static final Log LOG = LogFactory.getLog(TestJobClient.class);
+public class TestJobClient extends TestMRJobClient {
private String runJob() throws Exception {
- OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));
+ OutputStream os = getFileSystem().create(new Path(getInputDir(),
+ "text.txt"));
Writer wr = new OutputStreamWriter(os);
wr.write("hello1\n");
wr.write("hello2\n");
@@ -71,60 +63,27 @@
return JobClient.runJob(conf).getID().toString();
}
- static int runTool(Configuration conf, Tool tool, String[] args, OutputStream out) throws Exception {
- PrintStream oldOut = System.out;
- PrintStream newOut = new PrintStream(out, true);
- try {
- System.setOut(newOut);
- return ToolRunner.run(conf, tool, args);
- } finally {
- System.setOut(oldOut);
- }
+ public static int runTool(Configuration conf, Tool tool, String[] args,
+ OutputStream out) throws Exception {
+ return TestMRJobClient.runTool(conf, tool, args, out);
}
-
- public void testGetCounter() throws Exception {
- String jobId = runJob();
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- int exitCode = runTool(createJobConf(), new JobClient(),
- new String[] { "-counter", jobId,
- "org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS" },
- out);
- assertEquals("Exit code", 0, exitCode);
- assertEquals("Counter", "3", out.toString().trim());
+
+ static void verifyJobPriority(String jobId, String priority,
+ JobConf conf) throws Exception {
+ TestJobClient test = new TestJobClient();
+ test.verifyJobPriority(jobId, priority, conf, test.createJobClient());
}
-
- public void testJobList() throws Exception {
+
+ public void testJobClient() throws Exception {
+ Configuration conf = createJobConf();
String jobId = runJob();
- verifyJobPriority(jobId, "HIGH", createJobConf());
- }
-
- static void verifyJobPriority(String jobId, String priority, JobConf conf)
- throws Exception {
- PipedInputStream pis = new PipedInputStream();
- PipedOutputStream pos = new PipedOutputStream(pis);
- int exitCode = runTool(conf, new JobClient(),
- new String[] { "-list", "all" },
- pos);
- assertEquals("Exit code", 0, exitCode);
- BufferedReader br = new BufferedReader(new InputStreamReader(pis));
- String line = null;
- while ((line=br.readLine()) != null) {
- LOG.info("line = " + line);
- if (!line.startsWith(jobId)) {
- continue;
- }
- assertTrue(line.contains(priority));
- break;
- }
- pis.close();
+ testGetCounter(jobId, conf);
+ testJobList(jobId, conf);
+ testChangingJobPriority(jobId, conf);
}
- public void testChangingJobPriority() throws Exception {
- String jobId = runJob();
- int exitCode = runTool(createJobConf(), new JobClient(),
- new String[] { "-set-priority", jobId, "VERY_LOW" },
- new ByteArrayOutputStream());
- assertEquals("Exit code", 0, exitCode);
- verifyJobPriority(jobId, "VERY_LOW", createJobConf());
+ protected CLI createJobClient()
+ throws IOException {
+ return new JobClient();
}
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java Sat Nov 28 20:26:01 2009
@@ -25,9 +25,10 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.examples.SleepJob;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.util.ToolRunner;
public class TestJobDirCleanup extends TestCase {
@@ -51,7 +52,7 @@
final int taskTrackers = 10;
Configuration conf = new Configuration();
JobConf mrConf = new JobConf();
- mrConf.set("mapred.tasktracker.reduce.tasks.maximum", "1");
+ mrConf.set(TTConfig.TT_REDUCE_SLOTS, "1");
dfs = new MiniDFSCluster(conf, 1, true, null);
fileSys = dfs.getFileSystem();
namenode = fileSys.getUri().toString();
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java Sat Nov 28 20:26:01 2009
@@ -20,9 +20,9 @@
import java.io.IOException;
-import org.apache.hadoop.examples.SleepJob;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.util.ToolRunner;
/**
@@ -64,7 +64,7 @@
// Run a job with jvm reuse
JobConf myConf = getClusterConf();
- myConf.set("mapred.job.reuse.jvm.num.tasks", "-1");
+ myConf.set(JobContext.JVM_NUMTASKS_TORUN, "-1");
String[] args = { "-m", "6", "-r", "3", "-mt", "1000", "-rt", "1000" };
assertEquals(0, ToolRunner.run(myConf, new SleepJob(), args));
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java Sat Nov 28 20:26:01 2009
@@ -20,46 +20,51 @@
import java.io.File;
import java.io.IOException;
-import java.text.ParseException;
import java.util.ArrayList;
-import java.util.List;
import java.util.HashMap;
-import java.util.Map;
import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.mapred.JobHistory.*;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
+import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
/**
- * Tests the JobHistory files - to catch any changes to JobHistory that can
- * cause issues for the execution of JobTracker.RecoveryManager, HistoryViewer.
*
* testJobHistoryFile
* Run a job that will be succeeded and validate its history file format and
* content.
*
- * testJobHistoryUserLogLocation
- * Run jobs with the given values of hadoop.job.history.user.location as
- * (1)null(default case), (2)"none", and (3)some user specified dir.
- * Validate user history file location in each case.
- *
* testJobHistoryJobStatus
* Run jobs that will be (1) succeeded (2) failed (3) killed.
* Validate job status read from history file in each case.
*
- * Future changes to job history are to be reflected here in this file.
*/
public class TestJobHistory extends TestCase {
private static final Log LOG = LogFactory.getLog(TestJobHistory.class);
@@ -67,9 +72,8 @@
private static String TEST_ROOT_DIR = new File(System.getProperty(
"test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
- private static final Pattern digitsPattern =
- Pattern.compile(JobHistory.DIGITS);
-
+ private static final String DIGITS = "[0-9]+";
+
// hostname like /default-rack/host1.foo.com OR host1.foo.com
private static final Pattern hostNamePattern = Pattern.compile(
"(/(([\\w\\-\\.]+)/)+)?([\\w\\-\\.]+)");
@@ -77,10 +81,10 @@
private static final String IP_ADDR =
"\\d\\d?\\d?\\.\\d\\d?\\d?\\.\\d\\d?\\d?\\.\\d\\d?\\d?";
- // hostname like /default-rack/host1.foo.com OR host1.foo.com
private static final Pattern trackerNamePattern = Pattern.compile(
- "tracker_" + hostNamePattern + ":([\\w\\-\\.]+)/" +
- IP_ADDR + ":" + JobHistory.DIGITS);
+ "tracker_" + hostNamePattern + ":([\\w\\-\\.]+)/" +
+ IP_ADDR + ":" + DIGITS);
+
private static final Pattern splitsPattern = Pattern.compile(
hostNamePattern + "(," + hostNamePattern + ")*");
@@ -91,202 +95,25 @@
//Each Task End seen from history file is added here
private static List<String> taskEnds = new ArrayList<String>();
- // List of tasks that appear in history file after JT reatart. This is to
- // allow START_TIME=0 for these tasks.
- private static List<String> ignoreStartTimeOfTasks = new ArrayList<String>();
-
- // List of potential tasks whose start time can be 0 because of JT restart
- private static List<String> tempIgnoreStartTimeOfTasks = new ArrayList<String>();
-
- /**
- * Listener for history log file, it populates JobHistory.JobInfo
- * object with data from log file and validates the data.
- */
- static class TestListener
- extends DefaultJobHistoryParser.JobTasksParseListener {
- int lineNum;//line number of history log file
- boolean isJobLaunched;
- boolean isJTRestarted;
-
- TestListener(JobInfo job) {
- super(job);
- lineNum = 0;
- isJobLaunched = false;
- isJTRestarted = false;
- }
-
- // TestListener implementation
- public void handle(RecordTypes recType, Map<Keys, String> values)
- throws IOException {
-
- lineNum++;
-
- // Check if the record is of type Meta
- if (recType == JobHistory.RecordTypes.Meta) {
- long version = Long.parseLong(values.get(Keys.VERSION));
- assertTrue("Unexpected job history version ",
- (version >= 0 && version <= JobHistory.VERSION));
- }
- else if (recType.equals(RecordTypes.Job)) {
- String jobid = values.get(Keys.JOBID);
- assertTrue("record type 'Job' is seen without JOBID key" +
- " in history file at line " + lineNum, jobid != null);
- JobID id = JobID.forName(jobid);
- assertTrue("JobID in history file is in unexpected format " +
- "at line " + lineNum, id != null);
- String time = values.get(Keys.LAUNCH_TIME);
- if (time != null) {
- if (isJobLaunched) {
- // We assume that if we see LAUNCH_TIME again, it is because of JT restart
- isJTRestarted = true;
- }
- else {// job launched first time
- isJobLaunched = true;
- }
- }
- time = values.get(Keys.FINISH_TIME);
- if (time != null) {
- assertTrue ("Job FINISH_TIME is seen in history file at line " +
- lineNum + " before LAUNCH_TIME is seen", isJobLaunched);
- }
- }
- else if (recType.equals(RecordTypes.Task)) {
- String taskid = values.get(Keys.TASKID);
- assertTrue("record type 'Task' is seen without TASKID key" +
- " in history file at line " + lineNum, taskid != null);
- TaskID id = TaskID.forName(taskid);
- assertTrue("TaskID in history file is in unexpected format " +
- "at line " + lineNum, id != null);
-
- String time = values.get(Keys.START_TIME);
- if (time != null) {
- List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
- assertTrue("Duplicate START_TIME seen for task " + taskid +
- " in history file at line " + lineNum, attemptIDs == null);
- attemptIDs = new ArrayList<String>();
- taskIDsToAttemptIDs.put(taskid, attemptIDs);
-
- if (isJTRestarted) {
- // This maintains a potential ignoreStartTimeTasks list
- tempIgnoreStartTimeOfTasks.add(taskid);
- }
- }
-
- time = values.get(Keys.FINISH_TIME);
- if (time != null) {
- String s = values.get(Keys.TASK_STATUS);
- if (s != null) {
- List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
- assertTrue ("Task FINISH_TIME is seen in history file at line " +
- lineNum + " before START_TIME is seen", attemptIDs != null);
-
- // Check if all the attemptIDs of this task are finished
- assertTrue("TaskId " + taskid + " is finished at line " +
- lineNum + " but its attemptID is not finished.",
- (attemptIDs.size() <= 1));
-
- // Check if at least 1 attempt of this task is seen
- assertTrue("TaskId " + taskid + " is finished at line " +
- lineNum + " but no attemptID is seen before this.",
- attemptIDs.size() == 1);
-
- if (s.equals("KILLED") || s.equals("FAILED")) {
- // Task End with KILLED/FAILED status in history file is
- // considered as TaskEnd, TaskStart. This is useful in checking
- // the order of history lines.
- attemptIDs = new ArrayList<String>();
- taskIDsToAttemptIDs.put(taskid, attemptIDs);
- }
- else {
- taskEnds.add(taskid);
- }
- }
- else {
- // This line of history file could be just an update to finish time
- }
- }
- }
- else if (recType.equals(RecordTypes.MapAttempt) ||
- recType.equals(RecordTypes.ReduceAttempt)) {
- String taskid = values.get(Keys.TASKID);
- assertTrue("record type " + recType + " is seen without TASKID key" +
- " in history file at line " + lineNum, taskid != null);
-
- String attemptId = values.get(Keys.TASK_ATTEMPT_ID);
- TaskAttemptID id = TaskAttemptID.forName(attemptId);
- assertTrue("AttemptID in history file is in unexpected format " +
- "at line " + lineNum, id != null);
-
- String time = values.get(Keys.START_TIME);
- if (time != null) {
- List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
- assertTrue ("TaskAttempt is seen in history file at line " + lineNum +
- " before Task is seen", attemptIDs != null);
- assertFalse ("Duplicate TaskAttempt START_TIME is seen in history " +
- "file at line " + lineNum, attemptIDs.remove(attemptId));
-
- if (attemptIDs.isEmpty()) {
- //just a boolean whether any attempt is seen or not
- attemptIDs.add("firstAttemptIsSeen");
- }
- attemptIDs.add(attemptId);
-
- if (tempIgnoreStartTimeOfTasks.contains(taskid) &&
- (id.getId() < 1000)) {
- // If Task line of this attempt is seen in history file after
- // JT restart and if this attempt is < 1000(i.e. attempt is noti
- // started after JT restart) - assuming single JT restart happened
- ignoreStartTimeOfTasks.add(taskid);
- }
- }
-
- time = values.get(Keys.FINISH_TIME);
- if (time != null) {
- List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
- assertTrue ("TaskAttempt FINISH_TIME is seen in history file at line "
- + lineNum + " before Task is seen", attemptIDs != null);
-
- assertTrue ("TaskAttempt FINISH_TIME is seen in history file at line "
- + lineNum + " before TaskAttempt START_TIME is seen",
- attemptIDs.remove(attemptId));
- }
- }
- super.handle(recType, values);
- }
- }
-
- // Check if the time is in the expected format
- private static boolean isTimeValid(String time) {
- Matcher m = digitsPattern.matcher(time);
- return m.matches() && (Long.parseLong(time) > 0);
- }
-
- private static boolean areTimesInOrder(String time1, String time2) {
- return (Long.parseLong(time1) <= Long.parseLong(time2));
- }
// Validate Format of Job Level Keys, Values read from history file
- private static void validateJobLevelKeyValuesFormat(Map<Keys, String> values,
+ private static void validateJobLevelKeyValuesFormat(JobInfo jobInfo,
String status) {
- String time = values.get(Keys.SUBMIT_TIME);
- assertTrue("Job SUBMIT_TIME is in unexpected format:" + time +
- " in history file", isTimeValid(time));
-
- time = values.get(Keys.LAUNCH_TIME);
- assertTrue("Job LAUNCH_TIME is in unexpected format:" + time +
- " in history file", isTimeValid(time));
-
- String time1 = values.get(Keys.FINISH_TIME);
- assertTrue("Job FINISH_TIME is in unexpected format:" + time1 +
- " in history file", isTimeValid(time1));
- assertTrue("Job FINISH_TIME is < LAUNCH_TIME in history file",
- areTimesInOrder(time, time1));
+ long submitTime = jobInfo.getSubmitTime();
+ long launchTime = jobInfo.getLaunchTime();
+ long finishTime = jobInfo.getFinishTime();
+
+ assertTrue("Invalid submit time", submitTime > 0);
+ assertTrue("SubmitTime > LaunchTime", submitTime <= launchTime);
+ assertTrue("LaunchTime > FinishTime", launchTime <= finishTime);
+
+ String stat = jobInfo.getJobStatus();
- String stat = values.get(Keys.JOB_STATUS);
assertTrue("Unexpected JOB_STATUS \"" + stat + "\" is seen in" +
" history file", (status.equals(stat)));
+ String priority = jobInfo.getPriority();
- String priority = values.get(Keys.JOB_PRIORITY);
+ assertNotNull(priority);
assertTrue("Unknown priority for the job in history file",
(priority.equals("HIGH") ||
priority.equals("LOW") || priority.equals("NORMAL") ||
@@ -296,34 +123,30 @@
// Validate Format of Task Level Keys, Values read from history file
private static void validateTaskLevelKeyValuesFormat(JobInfo job,
boolean splitsCanBeEmpty) {
- Map<String, JobHistory.Task> tasks = job.getAllTasks();
+ Map<TaskID, TaskInfo> tasks = job.getAllTasks();
// validate info of each task
- for (JobHistory.Task task : tasks.values()) {
+ for (TaskInfo task : tasks.values()) {
- String tid = task.get(Keys.TASKID);
- String time = task.get(Keys.START_TIME);
- // We allow START_TIME=0 for tasks seen in history after JT restart
- if (!ignoreStartTimeOfTasks.contains(tid) || (Long.parseLong(time) != 0)) {
- assertTrue("Task START_TIME of " + tid + " is in unexpected format:" +
- time + " in history file", isTimeValid(time));
- }
-
- String time1 = task.get(Keys.FINISH_TIME);
- assertTrue("Task FINISH_TIME of " + tid + " is in unexpected format:" +
- time1 + " in history file", isTimeValid(time1));
+ TaskID tid = task.getTaskId();
+ long startTime = task.getStartTime();
+ assertTrue("Invalid Start time", startTime > 0);
+
+ long finishTime = task.getFinishTime();
assertTrue("Task FINISH_TIME is < START_TIME in history file",
- areTimesInOrder(time, time1));
+ startTime < finishTime);
// Make sure that the Task type exists and it is valid
- String type = task.get(Keys.TASK_TYPE);
+ TaskType type = task.getTaskType();
assertTrue("Unknown Task type \"" + type + "\" is seen in " +
"history file for task " + tid,
- (type.equals("MAP") || type.equals("REDUCE") ||
- type.equals("SETUP") || type.equals("CLEANUP")));
+ (type.equals(TaskType.MAP) ||
+ type.equals(TaskType.REDUCE) ||
+ type.equals(TaskType.JOB_CLEANUP) ||
+ type.equals(TaskType.JOB_SETUP)));
- if (type.equals("MAP")) {
- String splits = task.get(Keys.SPLITS);
+ if (type.equals(TaskType.MAP)) {
+ String splits = task.getSplitLocations();
//order in the condition OR check is important here
if (!splitsCanBeEmpty || splits.length() != 0) {
Matcher m = splitsPattern.matcher(splits);
@@ -333,103 +156,85 @@
}
// Validate task status
- String status = task.get(Keys.TASK_STATUS);
+ String status = task.getTaskStatus();
assertTrue("Unexpected TASK_STATUS \"" + status + "\" is seen in" +
- " history file for task " + tid, (status.equals("SUCCESS") ||
+ " history file for task " + tid, (status.equals("SUCCEEDED") ||
status.equals("FAILED") || status.equals("KILLED")));
}
}
// Validate foramt of Task Attempt Level Keys, Values read from history file
private static void validateTaskAttemptLevelKeyValuesFormat(JobInfo job) {
- Map<String, JobHistory.Task> tasks = job.getAllTasks();
+ Map<TaskID, TaskInfo> tasks = job.getAllTasks();
// For each task
- for (JobHistory.Task task : tasks.values()) {
+ for (TaskInfo task : tasks.values()) {
// validate info of each attempt
- for (JobHistory.TaskAttempt attempt : task.getTaskAttempts().values()) {
+ for (TaskAttemptInfo attempt : task.getAllTaskAttempts().values()) {
+
+ TaskAttemptID id = attempt.getAttemptId();
+ assertNotNull(id);
+
+ long startTime = attempt.getStartTime();
+ assertTrue("Invalid Start time", startTime > 0);
- String id = attempt.get(Keys.TASK_ATTEMPT_ID);
- String time = attempt.get(Keys.START_TIME);
- assertTrue("START_TIME of task attempt " + id +
- " is in unexpected format:" + time +
- " in history file", isTimeValid(time));
-
- String time1 = attempt.get(Keys.FINISH_TIME);
- assertTrue("FINISH_TIME of task attempt " + id +
- " is in unexpected format:" + time1 +
- " in history file", isTimeValid(time1));
+ long finishTime = attempt.getFinishTime();
assertTrue("Task FINISH_TIME is < START_TIME in history file",
- areTimesInOrder(time, time1));
+ startTime < finishTime);
// Make sure that the Task type exists and it is valid
- String type = attempt.get(Keys.TASK_TYPE);
+ TaskType type = attempt.getTaskType();
assertTrue("Unknown Task type \"" + type + "\" is seen in " +
"history file for task attempt " + id,
- (type.equals("MAP") || type.equals("REDUCE") ||
- type.equals("SETUP") || type.equals("CLEANUP")));
+ (type.equals(TaskType.MAP) || type.equals(TaskType.REDUCE) ||
+ type.equals(TaskType.JOB_CLEANUP) ||
+ type.equals(TaskType.JOB_SETUP)));
// Validate task status
- String status = attempt.get(Keys.TASK_STATUS);
+ String status = attempt.getTaskStatus();
assertTrue("Unexpected TASK_STATUS \"" + status + "\" is seen in" +
" history file for task attempt " + id,
- (status.equals("SUCCESS") || status.equals("FAILED") ||
- status.equals("KILLED")));
+ (status.equals(TaskStatus.State.SUCCEEDED.toString()) ||
+ status.equals(TaskStatus.State.FAILED.toString()) ||
+ status.equals(TaskStatus.State.KILLED.toString())));
// Successful Reduce Task Attempts should have valid SHUFFLE_FINISHED
// time and SORT_FINISHED time
- if (type.equals("REDUCE") && status.equals("SUCCESS")) {
- time1 = attempt.get(Keys.SHUFFLE_FINISHED);
- assertTrue("SHUFFLE_FINISHED time of task attempt " + id +
- " is in unexpected format:" + time1 +
- " in history file", isTimeValid(time1));
- assertTrue("Reduce Task SHUFFLE_FINISHED time is < START_TIME " +
- "in history file", areTimesInOrder(time, time1));
- time = attempt.get(Keys.SORT_FINISHED);
- assertTrue("SORT_FINISHED of task attempt " + id +
- " is in unexpected format:" + time +
- " in history file", isTimeValid(time));
- assertTrue("Reduce Task SORT_FINISHED time is < SORT_FINISHED time" +
- " in history file", areTimesInOrder(time1, time));
+ if (type.equals(TaskType.REDUCE) &&
+ status.equals(TaskStatus.State.SUCCEEDED.toString())) {
+ long shuffleFinishTime = attempt.getShuffleFinishTime();
+ assertTrue(startTime < shuffleFinishTime);
+
+ long sortFinishTime = attempt.getSortFinishTime();
+ assertTrue(shuffleFinishTime < sortFinishTime);
}
- else if (type.equals("MAP") && status.equals("SUCCESS")) {
+ else if (type.equals(TaskType.MAP) &&
+ status.equals(TaskStatus.State.SUCCEEDED.toString())) {
// Successful MAP Task Attempts should have valid MAP_FINISHED time
- time1 = attempt.get(Keys.MAP_FINISHED);
- assertTrue("MAP_FINISHED time of task attempt " + id +
- " is in unexpected format:" + time1 +
- " in history file", isTimeValid(time1));
- assertTrue("MAP_FINISHED time of map task is < START_TIME " +
- "in history file", areTimesInOrder(time, time1));
+ long mapFinishTime = attempt.getMapFinishTime();
+ assertTrue(startTime < mapFinishTime);
}
// check if hostname is valid
- String hostname = attempt.get(Keys.HOSTNAME);
+ String hostname = attempt.getHostname();
Matcher m = hostNamePattern.matcher(hostname);
assertTrue("Unexpected Host name of task attempt " + id, m.matches());
// check if trackername is valid
- String trackerName = attempt.get(Keys.TRACKER_NAME);
+ String trackerName = attempt.getTrackerName();
m = trackerNamePattern.matcher(trackerName);
assertTrue("Unexpected tracker name of task attempt " + id,
m.matches());
if (!status.equals("KILLED")) {
// check if http port is valid
- String httpPort = attempt.get(Keys.HTTP_PORT);
- m = digitsPattern.matcher(httpPort);
- assertTrue("Unexpected http port of task attempt " + id, m.matches());
+ int httpPort = attempt.getHttpPort();
+ assertTrue(httpPort > 0);
}
// check if counters are parsable
- String counters = attempt.get(Keys.COUNTERS);
- try {
- Counters readCounters = Counters.fromEscapedCompactString(counters);
- assertTrue("Counters of task attempt " + id + " are not parsable",
- readCounters != null);
- } catch (ParseException pe) {
- LOG.warn("While trying to parse counters of task attempt " + id +
- ", " + pe);
- }
+ Counters counters = attempt.getCounters();
+ assertNotNull(counters);
}
}
}
@@ -443,9 +248,8 @@
String parts[] = path.getName().split("_");
//TODO this is a hack :(
// jobtracker-hostname_jobtracker-identifier_
- String id = parts[2] + "_" + parts[3] + "_" + parts[4];
- String jobUniqueString = parts[0] + "_" + parts[1] + "_" + id;
- return new Path(dir, jobUniqueString + "_conf.xml");
+ String id = parts[0] + "_" + parts[1] + "_" + parts[2];
+ return new Path(dir, id + "_conf.xml");
}
/**
@@ -470,12 +274,13 @@
* @param id job id
* @param conf job conf
*/
- static void validateJobHistoryFileFormat(JobID id, JobConf conf,
+ public static void validateJobHistoryFileFormat(JobHistory jobHistory,
+ JobID id, JobConf conf,
String status, boolean splitsCanBeEmpty) throws IOException {
// Get the history file name
- Path dir = JobHistory.getCompletedJobHistoryLocation();
- String logFileName = getDoneFile(conf, id, dir);
+ Path dir = jobHistory.getCompletedJobHistoryLocation();
+ String logFileName = getDoneFile(jobHistory, conf, id, dir);
// Framework history log file location
Path logFile = new Path(dir, logFileName);
@@ -484,20 +289,12 @@
// Check if the history file exists
assertTrue("History file does not exist", fileSys.exists(logFile));
-
- // check if the history file is parsable
- String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName(
- logFileName).split("_");
-
- String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4];
- JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId);
-
- TestListener l = new TestListener(jobInfo);
- JobHistory.parseHistoryFromFS(logFile.toUri().getPath(), l, fileSys);
-
+ JobHistoryParser parser = new JobHistoryParser(fileSys,
+ logFile.toUri().getPath());
+ JobHistoryParser.JobInfo jobInfo = parser.parse();
// validate format of job level key, values
- validateJobLevelKeyValuesFormat(jobInfo.getValues(), status);
+ validateJobLevelKeyValuesFormat(jobInfo, status);
// validate format of task level key, values
validateTaskLevelKeyValuesFormat(jobInfo, splitsCanBeEmpty);
@@ -507,9 +304,10 @@
// check if all the TaskAttempts, Tasks started are finished for
// successful jobs
- if (status.equals("SUCCESS")) {
+ if (status.equals("SUCCEEDED")) {
// Make sure that the lists in taskIDsToAttemptIDs are empty.
- for(Iterator<String> it = taskIDsToAttemptIDs.keySet().iterator();it.hasNext();) {
+ for(Iterator<String> it =
+ taskIDsToAttemptIDs.keySet().iterator();it.hasNext();) {
String taskid = it.next();
assertTrue("There are some Tasks which are not finished in history " +
"file.", taskEnds.contains(taskid));
@@ -530,73 +328,68 @@
JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
JobInProgress jip = jt.getJob(job.getID());
- Map<Keys, String> values = jobInfo.getValues();
-
assertTrue("SUBMIT_TIME of job obtained from history file did not " +
"match the expected value", jip.getStartTime() ==
- Long.parseLong(values.get(Keys.SUBMIT_TIME)));
+ jobInfo.getSubmitTime());
assertTrue("LAUNCH_TIME of job obtained from history file did not " +
"match the expected value", jip.getLaunchTime() ==
- Long.parseLong(values.get(Keys.LAUNCH_TIME)));
+ jobInfo.getLaunchTime());
assertTrue("FINISH_TIME of job obtained from history file did not " +
"match the expected value", jip.getFinishTime() ==
- Long.parseLong(values.get(Keys.FINISH_TIME)));
+ jobInfo.getFinishTime());
assertTrue("Job Status of job obtained from history file did not " +
"match the expected value",
- values.get(Keys.JOB_STATUS).equals("SUCCESS"));
+ jobInfo.getJobStatus().equals("SUCCEEDED"));
assertTrue("Job Priority of job obtained from history file did not " +
"match the expected value", jip.getPriority().toString().equals(
- values.get(Keys.JOB_PRIORITY)));
+ jobInfo.getPriority()));
assertTrue("Job Name of job obtained from history file did not " +
- "match the expected value", JobInfo.getJobName(conf).equals(
- values.get(Keys.JOBNAME)));
+ "match the expected value",
+ conf.getJobName().equals(
+ jobInfo.getJobname()));
assertTrue("User Name of job obtained from history file did not " +
- "match the expected value", JobInfo.getUserName(conf).equals(
- values.get(Keys.USER)));
+ "match the expected value",
+ conf.getUser().equals(
+ jobInfo.getUsername()));
// Validate job counters
- Counters c = jip.getCounters();
+ Counters c = new Counters(jip.getCounters());
+ Counters jiCounters = jobInfo.getTotalCounters();
assertTrue("Counters of job obtained from history file did not " +
"match the expected value",
- c.makeEscapedCompactString().equals(values.get(Keys.COUNTERS)));
+ c.equals(jiCounters));
// Validate number of total maps, total reduces, finished maps,
// finished reduces, failed maps, failed recudes
- String totalMaps = values.get(Keys.TOTAL_MAPS);
assertTrue("Unexpected number of total maps in history file",
- Integer.parseInt(totalMaps) == jip.desiredMaps());
+ jobInfo.getTotalMaps() == jip.desiredMaps());
- String totalReduces = values.get(Keys.TOTAL_REDUCES);
assertTrue("Unexpected number of total reduces in history file",
- Integer.parseInt(totalReduces) == jip.desiredReduces());
+ jobInfo.getTotalReduces() == jip.desiredReduces());
- String finMaps = values.get(Keys.FINISHED_MAPS);
assertTrue("Unexpected number of finished maps in history file",
- Integer.parseInt(finMaps) == jip.finishedMaps());
+ jobInfo.getFinishedMaps() == jip.finishedMaps());
- String finReduces = values.get(Keys.FINISHED_REDUCES);
assertTrue("Unexpected number of finished reduces in history file",
- Integer.parseInt(finReduces) == jip.finishedReduces());
+ jobInfo.getFinishedReduces() == jip.finishedReduces());
- String failedMaps = values.get(Keys.FAILED_MAPS);
assertTrue("Unexpected number of failed maps in history file",
- Integer.parseInt(failedMaps) == jip.failedMapTasks);
+ jobInfo.getFailedMaps() == jip.failedMapTasks);
- String failedReduces = values.get(Keys.FAILED_REDUCES);
assertTrue("Unexpected number of failed reduces in history file",
- Integer.parseInt(failedReduces) == jip.failedReduceTasks);
+ jobInfo.getFailedReduces() == jip.failedReduceTasks);
}
// Validate Task Level Keys, Values read from history file by
// comparing them with the actual values from JT.
private static void validateTaskLevelKeyValues(MiniMRCluster mr,
- RunningJob job, JobInfo jobInfo) throws IOException {
+ RunningJob job, JobInfo jobInfo) throws IOException {
JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
JobInProgress jip = jt.getJob(job.getID());
@@ -606,7 +399,7 @@
TaskID mapTaskId = new TaskID(job.getID(), TaskType.MAP, 0);
TaskID reduceTaskId = new TaskID(job.getID(), TaskType.REDUCE, 0);
- TaskInProgress cleanups[] = jip.getCleanupTasks();
+ TaskInProgress cleanups[] = jip.cleanup;
TaskID cleanupTaskId;
if (cleanups[0].isComplete()) {
cleanupTaskId = cleanups[0].getTIPId();
@@ -615,7 +408,7 @@
cleanupTaskId = cleanups[1].getTIPId();
}
- TaskInProgress setups[] = jip.getSetupTasks();
+ TaskInProgress setups[] = jip.setup;
TaskID setupTaskId;
if (setups[0].isComplete()) {
setupTaskId = setups[0].getTIPId();
@@ -624,53 +417,45 @@
setupTaskId = setups[1].getTIPId();
}
- Map<String, JobHistory.Task> tasks = jobInfo.getAllTasks();
+ Map<TaskID, TaskInfo> tasks = jobInfo.getAllTasks();
- // validate info of the 4 tasks(cleanup, setup, 1st map, 1st reduce)
- for (JobHistory.Task task : tasks.values()) {
+ // validate info of the 4 tasks(cleanup, setup, 1st map, 1st reduce)
- String tid = task.get(Keys.TASKID);
- if (tid.equals(mapTaskId.toString()) ||
- tid.equals(reduceTaskId.toString()) ||
- tid.equals(cleanupTaskId.toString()) ||
- tid.equals(setupTaskId.toString())) {
-
- TaskID taskId = null;
- if (tid.equals(mapTaskId.toString())) {
- taskId = mapTaskId;
- }
- else if (tid.equals(reduceTaskId.toString())) {
- taskId = reduceTaskId;
- }
- else if (tid.equals(cleanupTaskId.toString())) {
- taskId = cleanupTaskId;
- }
- else if (tid.equals(setupTaskId.toString())) {
- taskId = setupTaskId;
- }
- TaskInProgress tip = jip.getTaskInProgress(taskId);
+ for (TaskInfo task : tasks.values()) {
+ TaskID tid = task.getTaskId();
+
+ if (tid.equals(mapTaskId) ||
+ tid.equals(reduceTaskId) ||
+ tid.equals(cleanupTaskId) ||
+ tid.equals(setupTaskId)) {
+
+ TaskInProgress tip = jip.getTaskInProgress
+ (org.apache.hadoop.mapred.TaskID.downgrade(tid));
assertTrue("START_TIME of Task " + tid + " obtained from history " +
- "file did not match the expected value", tip.getExecStartTime() ==
- Long.parseLong(task.get(Keys.START_TIME)));
+ "file did not match the expected value",
+ tip.getExecStartTime() ==
+ task.getStartTime());
assertTrue("FINISH_TIME of Task " + tid + " obtained from history " +
- "file did not match the expected value", tip.getExecFinishTime() ==
- Long.parseLong(task.get(Keys.FINISH_TIME)));
+ "file did not match the expected value",
+ tip.getExecFinishTime() ==
+ task.getFinishTime());
- if (taskId == mapTaskId) {//check splits only for map task
+ if (tid == mapTaskId) {//check splits only for map task
assertTrue("Splits of Task " + tid + " obtained from history file " +
- " did not match the expected value",
- tip.getSplitNodes().equals(task.get(Keys.SPLITS)));
+ " did not match the expected value",
+ tip.getSplitNodes().equals(task.getSplitLocations()));
}
TaskAttemptID attemptId = tip.getSuccessfulTaskid();
- TaskStatus ts = tip.getTaskStatus(attemptId);
+ TaskStatus ts = tip.getTaskStatus(
+ org.apache.hadoop.mapred.TaskAttemptID.downgrade(attemptId));
// Validate task counters
- Counters c = ts.getCounters();
+ Counters c = new Counters(ts.getCounters());
assertTrue("Counters of Task " + tid + " obtained from history file " +
- " did not match the expected value",
- c.makeEscapedCompactString().equals(task.get(Keys.COUNTERS)));
+ " did not match the expected value",
+ c.equals(task.getCounters()));
}
}
}
@@ -683,78 +468,83 @@
JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
JobInProgress jip = jt.getJob(job.getID());
- Map<String, JobHistory.Task> tasks = jobInfo.getAllTasks();
+ Map<TaskID, TaskInfo> tasks = jobInfo.getAllTasks();
// For each task
- for (JobHistory.Task task : tasks.values()) {
+ for (TaskInfo task : tasks.values()) {
// validate info of each attempt
- for (JobHistory.TaskAttempt attempt : task.getTaskAttempts().values()) {
+ for (TaskAttemptInfo attempt : task.getAllTaskAttempts().values()) {
- String idStr = attempt.get(Keys.TASK_ATTEMPT_ID);
- TaskAttemptID attemptId = TaskAttemptID.forName(idStr);
+ TaskAttemptID attemptId = attempt.getAttemptId();
TaskID tid = attemptId.getTaskID();
- // Validate task id
- assertTrue("Task id of Task Attempt " + idStr + " obtained from " +
- "history file did not match the expected value",
- tid.toString().equals(attempt.get(Keys.TASKID)));
-
- TaskInProgress tip = jip.getTaskInProgress(tid);
- TaskStatus ts = tip.getTaskStatus(attemptId);
+ TaskInProgress tip = jip.getTaskInProgress
+ (org.apache.hadoop.mapred.TaskID.downgrade(tid));
+
+ TaskStatus ts = tip.getTaskStatus(
+ org.apache.hadoop.mapred.TaskAttemptID.downgrade(attemptId));
// Validate task attempt start time
- assertTrue("START_TIME of Task attempt " + idStr + " obtained from " +
- "history file did not match the expected value",
- ts.getStartTime() == Long.parseLong(attempt.get(Keys.START_TIME)));
+ assertTrue("START_TIME of Task attempt " + attemptId +
+ " obtained from " +
+ "history file did not match the expected value",
+ ts.getStartTime() == attempt.getStartTime());
// Validate task attempt finish time
- assertTrue("FINISH_TIME of Task attempt " + idStr + " obtained from " +
- "history file did not match the expected value",
- ts.getFinishTime() == Long.parseLong(attempt.get(Keys.FINISH_TIME)));
+ assertTrue("FINISH_TIME of Task attempt " + attemptId +
+ " obtained from " +
+ "history file " + ts.getFinishTime() +
+ " did not match the expected value, " +
+ attempt.getFinishTime(),
+ ts.getFinishTime() == attempt.getFinishTime());
- TaskTrackerStatus ttStatus = jt.getTaskTrackerStatus(ts.getTaskTracker());
+ TaskTrackerStatus ttStatus =
+ jt.getTaskTrackerStatus(ts.getTaskTracker());
if (ttStatus != null) {
- assertTrue("http port of task attempt " + idStr + " obtained from " +
+ assertTrue("http port of task attempt " + attemptId +
+ " obtained from " +
"history file did not match the expected value",
ttStatus.getHttpPort() ==
- Integer.parseInt(attempt.get(Keys.HTTP_PORT)));
+ attempt.getHttpPort());
- if (attempt.get(Keys.TASK_STATUS).equals("SUCCESS")) {
+ if (attempt.getTaskStatus().equals("SUCCEEDED")) {
String ttHostname = jt.getNode(ttStatus.getHost()).toString();
// check if hostname is valid
- assertTrue("Host name of task attempt " + idStr + " obtained from" +
+ assertTrue("Host name of task attempt " + attemptId +
+ " obtained from" +
" history file did not match the expected value",
- ttHostname.equals(attempt.get(Keys.HOSTNAME)));
+ ttHostname.equals(attempt.getHostname()));
}
}
- if (attempt.get(Keys.TASK_STATUS).equals("SUCCESS")) {
+ if (attempt.getTaskStatus().equals("SUCCEEDED")) {
// Validate SHUFFLE_FINISHED time and SORT_FINISHED time of
// Reduce Task Attempts
- if (attempt.get(Keys.TASK_TYPE).equals("REDUCE")) {
- assertTrue("SHUFFLE_FINISHED time of task attempt " + idStr +
+ if (attempt.getTaskType().equals("REDUCE")) {
+ assertTrue("SHUFFLE_FINISHED time of task attempt " + attemptId +
" obtained from history file did not match the expected" +
" value", ts.getShuffleFinishTime() ==
- Long.parseLong(attempt.get(Keys.SHUFFLE_FINISHED)));
- assertTrue("SORT_FINISHED time of task attempt " + idStr +
+ attempt.getShuffleFinishTime());
+ assertTrue("SORT_FINISHED time of task attempt " + attemptId +
" obtained from history file did not match the expected" +
" value", ts.getSortFinishTime() ==
- Long.parseLong(attempt.get(Keys.SORT_FINISHED)));
+ attempt.getSortFinishTime());
}
//Validate task counters
- Counters c = ts.getCounters();
- assertTrue("Counters of Task Attempt " + idStr + " obtained from " +
+ Counters c = new Counters(ts.getCounters());
+ assertTrue("Counters of Task Attempt " + attemptId + " obtained from " +
"history file did not match the expected value",
- c.makeEscapedCompactString().equals(attempt.get(Keys.COUNTERS)));
+ c.equals(attempt.getCounters()));
}
// check if tracker name is valid
- assertTrue("Tracker name of task attempt " + idStr + " obtained from " +
+ assertTrue("Tracker name of task attempt " + attemptId +
+ " obtained from " +
"history file did not match the expected value",
- ts.getTaskTracker().equals(attempt.get(Keys.TRACKER_NAME)));
+ ts.getTaskTracker().equals(attempt.getTrackerName()));
}
}
}
@@ -766,13 +556,15 @@
* @param job RunningJob object of the job whose history is to be validated
* @param conf job conf
*/
- static void validateJobHistoryFileContent(MiniMRCluster mr,
+ public static void validateJobHistoryFileContent(MiniMRCluster mr,
RunningJob job, JobConf conf) throws IOException {
JobID id = job.getID();
- Path doneDir = JobHistory.getCompletedJobHistoryLocation();
+ JobHistory jobHistory =
+ mr.getJobTrackerRunner().getJobTracker().getJobHistory();
+ Path doneDir = jobHistory.getCompletedJobHistoryLocation();
// Get the history file name
- String logFileName = getDoneFile(conf, id, doneDir);
+ String logFileName = getDoneFile(jobHistory, conf, id, doneDir);
// Framework history log file location
Path logFile = new Path(doneDir, logFileName);
@@ -781,18 +573,10 @@
// Check if the history file exists
assertTrue("History file does not exist", fileSys.exists(logFile));
-
- // check if the history file is parsable
- String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName(
- logFileName).split("_");
-
- String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4];
- JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId);
-
- DefaultJobHistoryParser.JobTasksParseListener l =
- new DefaultJobHistoryParser.JobTasksParseListener(jobInfo);
- JobHistory.parseHistoryFromFS(logFile.toUri().getPath(), l, fileSys);
-
+ JobHistoryParser parser = new JobHistoryParser(fileSys,
+ logFile.toUri().getPath());
+
+ JobHistoryParser.JobInfo jobInfo = parser.parse();
// Now the history file contents are available in jobInfo. Let us compare
// them with the actual values from JT.
validateJobLevelKeyValues(mr, job, jobInfo, conf);
@@ -800,7 +584,7 @@
validateTaskAttemptLevelKeyValues(mr, job, jobInfo);
}
- public void testDoneFolderOnHDFS() throws IOException {
+ public void testDoneFolderOnHDFS() throws IOException, InterruptedException {
MiniMRCluster mr = null;
try {
JobConf conf = new JobConf();
@@ -810,12 +594,42 @@
//set the done folder location
String doneFolder = "history_done";
- conf.set("mapred.job.tracker.history.completed.location", doneFolder);
+ conf.set(JTConfig.JT_JOBHISTORY_COMPLETED_LOCATION, doneFolder);
+ String logDir =
+ "file:///" + new File(System.getProperty("hadoop.log.dir")).
+ getAbsolutePath() + File.separator + "history";
+
+ Path logDirPath = new Path(logDir);
+ FileSystem logDirFs = logDirPath.getFileSystem(conf);
+ //there may be some stale files, clean them
+ if (logDirFs.exists(logDirPath)) {
+ boolean deleted = logDirFs.delete(logDirPath, true);
+ LOG.info(logDirPath + " deleted " + deleted);
+ }
+
+ logDirFs.mkdirs(logDirPath);
+ assertEquals("No of file in logDir not correct", 0,
+ logDirFs.listStatus(logDirPath).length);
+ logDirFs.create(new Path(logDirPath, "f1"));
+ logDirFs.create(new Path(logDirPath, "f2"));
+ assertEquals("No of file in logDir not correct", 2,
+ logDirFs.listStatus(logDirPath).length);
+
MiniDFSCluster dfsCluster = new MiniDFSCluster(conf, 2, true, null);
mr = new MiniMRCluster(2, dfsCluster.getFileSystem().getUri().toString(),
3, null, null, conf);
+ assertEquals("Files in logDir did not move to DONE folder",
+ 0, logDirFs.listStatus(logDirPath).length);
+
+ JobHistory jobHistory =
+ mr.getJobTrackerRunner().getJobTracker().getJobHistory();
+ Path doneDir = jobHistory.getCompletedJobHistoryLocation();
+
+ assertEquals("Files in DONE dir not correct",
+ 2, doneDir.getFileSystem(conf).listStatus(doneDir).length);
+
// run the TCs
conf = mr.createJobConf();
@@ -836,15 +650,18 @@
// Run a job that will be succeeded and validate its history file
RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
- Path doneDir = JobHistory.getCompletedJobHistoryLocation();
assertEquals("History DONE folder not correct",
doneFolder, doneDir.getName());
JobID id = job.getID();
- String logFileName = getDoneFile(conf, id, doneDir);
+ String logFileName = getDoneFile(jobHistory, conf, id, doneDir);
// Framework history log file location
Path logFile = new Path(doneDir, logFileName);
FileSystem fileSys = logFile.getFileSystem(conf);
+
+ Cluster cluster = new Cluster(conf);
+ assertEquals("Client returned wrong history url", logFile.toString(),
+ cluster.getJobHistoryUrl(id));
// Check if the history file exists
assertTrue("History file does not exist", fileSys.exists(logFile));
@@ -874,7 +691,8 @@
assertFalse("Config for completed jobs not deleted from running folder",
fileSys.exists(runningJobConfFilename));
- validateJobHistoryFileFormat(job.getID(), conf, "SUCCESS", false);
+ validateJobHistoryFileFormat(jobHistory,
+ job.getID(), conf, "SUCCEEDED", false);
validateJobHistoryFileContent(mr, job, conf);
// get the job conf filename
@@ -899,7 +717,7 @@
//set the done folder location
String doneFolder = TEST_ROOT_DIR + "history_done";
- conf.set("mapred.job.tracker.history.completed.location", doneFolder);
+ conf.set(JTConfig.JT_JOBHISTORY_COMPLETED_LOCATION, doneFolder);
mr = new MiniMRCluster(2, "file:///", 3, null, null, conf);
@@ -922,12 +740,13 @@
// Run a job that will be succeeded and validate its history file
RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
-
- Path doneDir = JobHistory.getCompletedJobHistoryLocation();
+ JobHistory jobHistory =
+ mr.getJobTrackerRunner().getJobTracker().getJobHistory();
+ Path doneDir = jobHistory.getCompletedJobHistoryLocation();
assertEquals("History DONE folder not correct",
doneFolder, doneDir.toString());
JobID id = job.getID();
- String logFileName = getDoneFile(conf, id, doneDir);
+ String logFileName = getDoneFile(jobHistory, conf, id, doneDir);
// Framework history log file location
Path logFile = new Path(doneDir, logFileName);
@@ -941,14 +760,13 @@
assertTrue("Config for completed jobs doesnt exist",
fileSys.exists(confFile));
- // check if the file exists in a done folder
+ // check if the conf file exists in a done folder
assertTrue("Completed job config doesnt exist in the done folder",
doneDir.getName().equals(confFile.getParent().getName()));
// check if the file exists in a done folder
assertTrue("Completed jobs doesnt exist in the done folder",
doneDir.getName().equals(logFile.getParent().getName()));
-
// check if the job file is removed from the history location
Path runningJobsHistoryFolder = logFile.getParent().getParent();
@@ -961,11 +779,13 @@
assertFalse("Config for completed jobs not deleted from running folder",
fileSys.exists(runningJobConfFilename));
- validateJobHistoryFileFormat(job.getID(), conf, "SUCCESS", false);
+ validateJobHistoryFileFormat(jobHistory, job.getID(), conf,
+ "SUCCEEDED", false);
validateJobHistoryFileContent(mr, job, conf);
// get the job conf filename
- String name = JobHistory.JobInfo.getLocalJobFilePath(job.getID());
+ JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+ String name = jt.getLocalJobFilePath(job.getID());
File file = new File(name);
// check if the file get deleted
@@ -983,123 +803,21 @@
//Returns the file in the done folder
//Waits for sometime to get the file moved to done
- private static String getDoneFile(JobConf conf, JobID id,
+ private static String getDoneFile(JobHistory jobHistory,
+ JobConf conf, JobID id,
Path doneDir) throws IOException {
String name = null;
for (int i = 0; name == null && i < 20; i++) {
- name = JobHistory.JobInfo.getDoneJobHistoryFileName(conf, id);
+ Path path = JobHistory.getJobHistoryFile(
+ jobHistory.getCompletedJobHistoryLocation(), id, conf.getUser());
+ if (path.getFileSystem(conf).exists(path)) {
+ name = path.toString();
+ }
UtilsForTests.waitFor(1000);
}
+ assertNotNull("Job history file not created", name);
return name;
}
- // Returns the output path where user history log file is written to with
- // default configuration setting for hadoop.job.history.user.location
- private static Path getLogLocationInOutputPath(String logFileName,
- JobConf conf) {
- JobConf jobConf = new JobConf(true);//default JobConf
- FileOutputFormat.setOutputPath(jobConf,
- FileOutputFormat.getOutputPath(conf));
- return JobHistory.JobInfo.getJobHistoryLogLocationForUser(
- logFileName, jobConf);
- }
-
- /**
- * Checks if the user history file exists in the correct dir
- * @param id job id
- * @param conf job conf
- */
- static void validateJobHistoryUserLogLocation(JobID id, JobConf conf)
- throws IOException {
- // Get the history file name
- Path doneDir = JobHistory.getCompletedJobHistoryLocation();
- String logFileName = getDoneFile(conf, id, doneDir);
-
- // User history log file location
- Path logFile = JobHistory.JobInfo.getJobHistoryLogLocationForUser(
- logFileName, conf);
- if(logFile == null) {
- // get the output path where history file is written to when
- // hadoop.job.history.user.location is not set
- logFile = getLogLocationInOutputPath(logFileName, conf);
- }
- FileSystem fileSys = null;
- fileSys = logFile.getFileSystem(conf);
-
- // Check if the user history file exists in the correct dir
- if (conf.get("hadoop.job.history.user.location") == null) {
- assertTrue("User log file " + logFile + " does not exist",
- fileSys.exists(logFile));
- }
- else if ("none".equals(conf.get("hadoop.job.history.user.location"))) {
- // history file should not exist in the output path
- assertFalse("Unexpected. User log file exists in output dir when " +
- "hadoop.job.history.user.location is set to \"none\"",
- fileSys.exists(logFile));
- }
- else {
- //hadoop.job.history.user.location is set to a specific location.
- // User log file should exist in that location
- assertTrue("User log file " + logFile + " does not exist",
- fileSys.exists(logFile));
-
- // User log file should not exist in output path.
-
- // get the output path where history file is written to when
- // hadoop.job.history.user.location is not set
- Path logFile1 = getLogLocationInOutputPath(logFileName, conf);
-
- if (logFile != logFile1) {
- fileSys = logFile1.getFileSystem(conf);
- assertFalse("Unexpected. User log file exists in output dir when " +
- "hadoop.job.history.user.location is set to a different location",
- fileSys.exists(logFile1));
- }
- }
- }
-
- // Validate user history file location for the given values of
- // hadoop.job.history.user.location as
- // (1)null(default case), (2)"none", and (3)some user specified dir.
- public void testJobHistoryUserLogLocation() throws IOException {
- MiniMRCluster mr = null;
- try {
- mr = new MiniMRCluster(2, "file:///", 3);
-
- // run the TCs
- JobConf conf = mr.createJobConf();
-
- FileSystem fs = FileSystem.get(conf);
- // clean up
- fs.delete(new Path(TEST_ROOT_DIR + "/succeed"), true);
-
- Path inDir = new Path(TEST_ROOT_DIR + "/succeed/input1");
- Path outDir = new Path(TEST_ROOT_DIR + "/succeed/output1");
-
- // validate for the case of null(default)
- RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
- validateJobHistoryUserLogLocation(job.getID(), conf);
-
- inDir = new Path(TEST_ROOT_DIR + "/succeed/input2");
- outDir = new Path(TEST_ROOT_DIR + "/succeed/output2");
- // validate for the case of "none"
- conf.set("hadoop.job.history.user.location", "none");
- job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
- validateJobHistoryUserLogLocation(job.getID(), conf);
-
- inDir = new Path(TEST_ROOT_DIR + "/succeed/input3");
- outDir = new Path(TEST_ROOT_DIR + "/succeed/output3");
- // validate for the case of any dir
- conf.set("hadoop.job.history.user.location", TEST_ROOT_DIR + "/succeed");
- job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
- validateJobHistoryUserLogLocation(job.getID(), conf);
-
- } finally {
- if (mr != null) {
- cleanupLocalFiles(mr);
- mr.shutdown();
- }
- }
- }
private void cleanupLocalFiles(MiniMRCluster mr)
throws IOException {
@@ -1108,7 +826,9 @@
Path sysDir = new Path(jt.getSystemDir());
FileSystem fs = sysDir.getFileSystem(conf);
fs.delete(sysDir, true);
- Path jobHistoryDir = JobHistory.getJobHistoryLocation();
+ Path jobHistoryDir =
+ mr.getJobTrackerRunner().getJobTracker().getJobHistory().
+ getJobHistoryLocation();
fs = jobHistoryDir.getFileSystem(conf);
fs.delete(jobHistoryDir, true);
}
@@ -1118,13 +838,13 @@
* @param id job id
* @param conf job conf
*/
- private static void validateJobHistoryJobStatus(JobID id, JobConf conf,
- String status) throws IOException {
+ private static void validateJobHistoryJobStatus(JobHistory jobHistory,
+ JobID id, JobConf conf, String status) throws IOException {
// Get the history file name
- Path doneDir = JobHistory.getCompletedJobHistoryLocation();
- String logFileName = getDoneFile(conf, id, doneDir);
-
+ Path doneDir = jobHistory.getCompletedJobHistoryLocation();
+ String logFileName = getDoneFile(jobHistory, conf, id, doneDir);
+
// Framework history log file location
Path logFile = new Path(doneDir, logFileName);
FileSystem fileSys = logFile.getFileSystem(conf);
@@ -1137,19 +857,13 @@
fileSys.getFileStatus(logFile).getPermission().equals(
new FsPermission(JobHistory.HISTORY_FILE_PERMISSION)));
- // check if the history file is parsable
- String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName(
- logFileName).split("_");
-
- String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4];
- JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId);
-
- DefaultJobHistoryParser.JobTasksParseListener l =
- new DefaultJobHistoryParser.JobTasksParseListener(jobInfo);
- JobHistory.parseHistoryFromFS(logFile.toUri().getPath(), l, fileSys);
+ JobHistoryParser parser = new JobHistoryParser(fileSys,
+ logFile.toUri().getPath());
+ JobHistoryParser.JobInfo jobInfo = parser.parse();
+
assertTrue("Job Status read from job history file is not the expected" +
- " status", status.equals(jobInfo.getValues().get(Keys.JOB_STATUS)));
+ " status", status.equals(jobInfo.getJobStatus()));
}
// run jobs that will be (1) succeeded (2) failed (3) killed
@@ -1172,22 +886,86 @@
// Run a job that will be succeeded and validate its job status
// existing in history file
RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
- validateJobHistoryJobStatus(job.getID(), conf, "SUCCESS");
- long historyCleanerRanAt = JobHistory.HistoryCleaner.getLastRan();
- assertTrue(historyCleanerRanAt != 0);
+
+ JobHistory jobHistory =
+ mr.getJobTrackerRunner().getJobTracker().getJobHistory();
+ validateJobHistoryJobStatus(jobHistory, job.getID(), conf,
+ JobStatus.getJobRunState(JobStatus.SUCCEEDED));
// Run a job that will be failed and validate its job status
// existing in history file
job = UtilsForTests.runJobFail(conf, inDir, outDir);
- validateJobHistoryJobStatus(job.getID(), conf, "FAILED");
- assertTrue(historyCleanerRanAt == JobHistory.HistoryCleaner.getLastRan());
+ validateJobHistoryJobStatus(jobHistory, job.getID(), conf,
+ JobStatus.getJobRunState(JobStatus.FAILED));
// Run a job that will be killed and validate its job status
// existing in history file
job = UtilsForTests.runJobKill(conf, inDir, outDir);
- validateJobHistoryJobStatus(job.getID(), conf, "KILLED");
- assertTrue(historyCleanerRanAt == JobHistory.HistoryCleaner.getLastRan());
+ validateJobHistoryJobStatus(jobHistory, job.getID(), conf,
+ JobStatus.getJobRunState(JobStatus.KILLED));
+
+ } finally {
+ if (mr != null) {
+ cleanupLocalFiles(mr);
+ mr.shutdown();
+ }
+ }
+ }
+
+ public void testHistoryInitWithCorruptFiles() throws IOException {
+ MiniMRCluster mr = null;
+ try {
+ JobConf conf = new JobConf();
+ Path historyDir = new Path(System.getProperty("test.build.data", "."),
+ "history");
+ conf.set(JTConfig.JT_JOBHISTORY_LOCATION,
+ historyDir.toString());
+ conf.setUser("user");
+
+ FileSystem localFs = FileSystem.getLocal(conf);
+ //there may be some stale files, clean them
+ if (localFs.exists(historyDir)) {
+ boolean deleted = localFs.delete(historyDir, true);
+ LOG.info(historyDir + " deleted " + deleted);
+ }
+
+ // Start the cluster, create a history file
+ mr = new MiniMRCluster(0, "file:///", 3, null, null, conf);
+ JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+ JobHistory jh = jt.getJobHistory();
+ final JobID jobId = JobID.forName("job_200809171136_0001");
+ jh.setupEventWriter(jobId, conf);
+ JobSubmittedEvent jse =
+ new JobSubmittedEvent(jobId, "job", "user", 12345, "path");
+ jh.logEvent(jse, jobId);
+ jh.closeWriter(jobId);
+
+ // Corrupt the history file. User RawLocalFileSystem so that we
+ // do keep the original CRC file intact.
+ String historyFileName = jobId.toString() + "_" + "user";
+ Path historyFilePath = new Path (historyDir.toString(), historyFileName);
+
+ RawLocalFileSystem fs = (RawLocalFileSystem)
+ FileSystem.getLocal(conf).getRaw();
+
+ FSDataOutputStream out = fs.create(historyFilePath, true);
+ byte[] corruptData = new byte[32];
+ new Random().nextBytes(corruptData);
+ out.write (corruptData, 0, 32);
+ out.close();
+
+ // Stop and start the tracker. The tracker should come up nicely
+ mr.stopJobTracker();
+ mr.startJobTracker();
+ jt = mr.getJobTrackerRunner().getJobTracker();
+ assertNotNull("JobTracker did not come up", jt );
+ jh = jt.getJobHistory();
+ assertNotNull("JobHistory did not get initialized correctly", jh);
+
+ // Only the done folder should remain in the history directory
+ assertEquals("Files in logDir did not move to DONE folder",
+ 1, historyDir.getFileSystem(conf).listStatus(historyDir).length);
} finally {
if (mr != null) {
cleanupLocalFiles(mr);