You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:26:22 UTC
svn commit: r885145 [31/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233:
./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/
src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/
src/benchmarks/gridmix2/ src/benchmarks/gridmix2/sr...
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java Sat Nov 28 20:26:01 2009
@@ -22,17 +22,16 @@
import java.util.List;
import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
+import org.apache.hadoop.mapred.FakeObjectUtilities.FakeTaskInProgress;
import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import junit.extensions.TestSetup;
-import junit.framework.Test;
import junit.framework.TestCase;
-import junit.framework.TestSuite;
public class TestSetupTaskScheduling extends TestCase {
@@ -61,7 +60,7 @@
@Override
public synchronized void initTasks() throws IOException {
super.initTasks();
- JobClient.RawSplit emptySplit = new JobClient.RawSplit();
+ Job.RawSplit emptySplit = new Job.RawSplit();
setup = new TaskInProgress[2];
setup[0] = new TaskInProgress(getJobID(), "test", emptySplit,
jobtracker, getJobConf(), this, numMapTasks + 1, 1);
@@ -95,10 +94,55 @@
}
}
+ static class FakeJobWithTaskCleanupTask
+ extends FakeObjectUtilities.FakeJobInProgress {
+
+ FakeJobWithTaskCleanupTask(JobConf jobConf,
+ JobTracker tracker) throws IOException {
+ super(jobConf, tracker);
+ }
+
+ /**
+ * Initialize tasks(1 map and 1 reduce task each needs 2 slots, similar to
+ * tasks of a high RAM job).
+ */
+ @Override
+ public synchronized void initTasks() throws IOException {
+ super.initTasks();
+ Job.RawSplit emptySplit = new Job.RawSplit();
+ final int numSlotsPerTask = 2;
+ maps = new TaskInProgress[1];
+ reduces = new TaskInProgress[1];
+
+ maps[0] = new FakeTaskInProgress(getJobID(), "test", emptySplit,
+ jobtracker, getJobConf(), this, 0, numSlotsPerTask);
+ TaskAttemptID attemptId = new TaskAttemptID(maps[0].getTIPId(), 0);
+
+ // make this task a taskCleanup task of a map task
+ mapCleanupTasks.add(attemptId);
+ TaskStatus stat = new MapTaskStatus(attemptId, 0.01f, 2,
+ TaskStatus.State.FAILED_UNCLEAN, "", "", trackers[0],
+ TaskStatus.Phase.MAP, new Counters());
+ maps[0].updateStatus(stat);
+
+ //similarly for reduce task's taskCleanup task
+ reduces[0] = new FakeTaskInProgress(getJobID(), "test", 1,
+ 0, jobtracker, getJobConf(), this, numSlotsPerTask);
+ attemptId = new TaskAttemptID(reduces[0].getTIPId(), 0);
+
+ // make this task a taskCleanup task of a reduce task
+ reduceCleanupTasks.add(attemptId);
+ stat = new ReduceTaskStatus(attemptId, 0.01f, 2,
+ TaskStatus.State.FAILED_UNCLEAN, "", "", trackers[0],
+ TaskStatus.Phase.REDUCE, new Counters());
+ reduces[0].updateStatus(stat);
+ }
+ }
+
public void setUp() throws Exception {
JobConf conf = new JobConf();
- conf.set("mapred.job.tracker", "localhost:0");
- conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
+ conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0");
+ conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0");
jobTracker = new FakeJobTracker(conf, new Clock(), trackers);
for (String tracker : trackers) {
FakeObjectUtilities.establishFirstContact(jobTracker, tracker);
@@ -106,36 +150,47 @@
}
// create a job for testing setup tasks and reservations
- FakeJobInProgress createJob(boolean withSetup) throws IOException {
+ FakeJobInProgress createJob(TaskType taskType) throws IOException {
JobConf conf = new JobConf();
conf.setSpeculativeExecution(false);
conf.setNumMapTasks(2);
conf.setNumReduceTasks(2);
- conf.set("mapred.max.reduce.failures.percent", ".70");
- conf.set("mapred.max.map.failures.percent", ".70");
+ conf.set(JobContext.REDUCE_FAILURES_MAXPERCENT, ".70");
+ conf.set(JobContext.MAP_FAILURES_MAX_PERCENT, ".70");
FakeJobInProgress job = null;
- if (withSetup) {
- job = new FakeJobWithSetupTask(conf, jobTracker);
- } else {
- conf.setBoolean("mapred.committer.job.setup.cleanup.needed", false);
+ if (taskType == null) {
+ conf.setBoolean(JobContext.SETUP_CLEANUP_NEEDED, false);
job = new FakeJobInProgress(conf, jobTracker);
+ } else if (taskType == TaskType.JOB_SETUP) {
+ job = new FakeJobWithSetupTask(conf, jobTracker);
+ } else if (taskType == TaskType.TASK_CLEANUP) {
+ job = new FakeJobWithTaskCleanupTask(conf, jobTracker);
}
job.setClusterSize(trackers.length);
job.initTasks();
return job;
}
- // create a new TaskStatus and add to a list of status objects
- void addNewTaskStatus(FakeJobInProgress job,
- boolean isMapTask, String tracker, List<TaskStatus> reports)
+ // create a new TaskStatus and add to a list of status objects.
+ // useMapSlot param is needed only when taskType is TASK_CLEANUP.
+ void addNewTaskStatus(FakeJobInProgress job, TaskType taskType,
+ boolean useMapSlot, String tracker, List<TaskStatus> reports)
throws IOException {
TaskAttemptID task = null;
TaskStatus status = null;
- if (isMapTask) {
+ if (taskType == TaskType.MAP) {
task = job.findMapTask(tracker);
status = new MapTaskStatus(task, 0.01f, 2,
TaskStatus.State.RUNNING, "", "", tracker,
TaskStatus.Phase.MAP, new Counters());
+ } else if (taskType == TaskType.TASK_CLEANUP) {
+ if (useMapSlot) {
+ status = job.maps[0].taskStatuses.get(
+ new TaskAttemptID(job.maps[0].getTIPId(), 0));
+ } else {
+ status = job.reduces[0].taskStatuses.get(
+ new TaskAttemptID(job.reduces[0].getTIPId(), 0));
+ }
} else {
task = job.findReduceTask(tracker);
status = new ReduceTaskStatus(task, 0.01f, 2,
@@ -162,7 +217,7 @@
*/
public void testSetupTaskReturnedForFreeMapSlots() throws IOException {
// create a job with a setup task.
- FakeJobInProgress job = createJob(true);
+ FakeJobInProgress job = createJob(TaskType.JOB_SETUP);
jobTracker.jobs.put(job.getJobID(), job);
// create a status simulating a free tasktracker
@@ -185,16 +240,16 @@
*/
public void testMapSlotsCountedForSetup() throws IOException {
// create a job with a setup task.
- FakeJobInProgress job = createJob(true);
+ FakeJobInProgress job = createJob(TaskType.JOB_SETUP);
jobTracker.jobs.put(job.getJobID(), job);
// create another job for reservation
- FakeJobInProgress job1 = createJob(false);
+ FakeJobInProgress job1 = createJob(null);
jobTracker.jobs.put(job1.getJobID(), job1);
// create TT status for testing getSetupAndCleanupTasks
List<TaskStatus> taskStatuses = new ArrayList<TaskStatus>();
- addNewTaskStatus(job, true, trackers[0], taskStatuses);
+ addNewTaskStatus(job, TaskType.MAP, true, trackers[0], taskStatuses);
TaskTrackerStatus ttStatus
= createTaskTrackerStatus(trackers[0], taskStatuses);
@@ -213,19 +268,19 @@
*/
public void testReduceSlotsCountedForSetup() throws IOException {
// create a job with a setup task.
- FakeJobInProgress job = createJob(true);
+ FakeJobInProgress job = createJob(TaskType.JOB_SETUP);
jobTracker.jobs.put(job.getJobID(), job);
// create another job for reservation
- FakeJobInProgress job1 = createJob(false);
+ FakeJobInProgress job1 = createJob(null);
jobTracker.jobs.put(job1.getJobID(), job1);
// create TT status for testing getSetupAndCleanupTasks
List<TaskStatus> reports = new ArrayList<TaskStatus>();
// because free map slots are checked first in code,
// we fill up map slots also.
- addNewTaskStatus(job1, true, trackers[1], reports);
- addNewTaskStatus(job1, false, trackers[1], reports);
+ addNewTaskStatus(job1, TaskType.MAP, true, trackers[1], reports);
+ addNewTaskStatus(job1, TaskType.REDUCE, false,trackers[1], reports);
TaskTrackerStatus ttStatus
= createTaskTrackerStatus(trackers[1], reports);
@@ -235,4 +290,48 @@
assertNull(tasks);
jobTracker.jobs.clear();
}
+
+ void validateNumSlotsUsedForTaskCleanup(TaskTrackerStatus ttStatus)
+ throws IOException {
+ List<Task> tasks = jobTracker.getSetupAndCleanupTasks(ttStatus);
+
+ assertEquals("Actual number of taskCleanup tasks is not same as expected", 1, tasks.size());
+ LOG.info("taskCleanup task is " + tasks.get(0));
+ assertTrue(tasks.get(0).isTaskCleanupTask());
+
+ // slots needed for taskCleanup task should be 1(even for high RAM jobs)
+ assertEquals("TaskCleanup task should not need more than 1 slot.",
+ 1, tasks.get(0).getNumSlotsRequired());
+ }
+
+ /**
+ * Test to check that map slots are counted when returning
+ * a taskCleanup task.
+ * @throws IOException
+ */
+ public void testNumSlotsUsedForTaskCleanup() throws IOException {
+ // Create a high RAM job with a map task's cleanup task and a reduce task's
+ // cleanup task. Make this Fake job a high RAM job by setting the slots
+ // required for map/reduce task to 2.
+ FakeJobInProgress job = createJob(TaskType.TASK_CLEANUP);
+ jobTracker.jobs.put(job.getJobID(), job);
+
+ // create TT status for testing getSetupAndCleanupTasks
+ List<TaskStatus> taskStatuses = new ArrayList<TaskStatus>();
+ TaskTrackerStatus ttStatus =
+ createTaskTrackerStatus(trackers[0], taskStatuses);//create dummy status
+ addNewTaskStatus(job, TaskType.TASK_CLEANUP, true, trackers[0],
+ taskStatuses);// status of map task's cleanup task
+ addNewTaskStatus(job, TaskType.TASK_CLEANUP, false, trackers[0],
+ taskStatuses);// status of reduce task's cleanup task
+ ttStatus = createTaskTrackerStatus(trackers[0], taskStatuses);
+
+ // validate mapTaskCleanup task
+ validateNumSlotsUsedForTaskCleanup(ttStatus);
+
+ // validate reduceTaskCleanup task
+ validateNumSlotsUsedForTaskCleanup(ttStatus);
+
+ jobTracker.jobs.clear();
+ }
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java Sat Nov 28 20:26:01 2009
@@ -44,6 +44,7 @@
import org.apache.hadoop.mapred.UtilsForTests.KillMapper;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.security.UnixUserGroupInformation;
/**
@@ -100,7 +101,7 @@
// it with the MiniMRCluster
myListener = new MyListener();
- conf.set("mapred.job.tracker.handler.count", "1");
+ conf.set(JTConfig.JT_IPC_HANDLER_COUNT, "1");
mrCluster = new MiniMRCluster(0, 0,
numTT, dfs.getFileSystem().getUri().toString(),
1, null, null, MR_UGI, new JobConf());
@@ -151,7 +152,7 @@
private void verifyOutput(FileSystem fs, Path outDir) throws IOException {
Path[] outputFiles = FileUtil.stat2Paths(
- fs.listStatus(outDir, new OutputLogFilter()));
+ fs.listStatus(outDir, new Utils.OutputFileUtils.OutputFilesFilter()));
assertEquals(numReduces, outputFiles.length);
InputStream is = fs.open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
@@ -208,7 +209,8 @@
conf.set("mapred.reducer.class", "testjar.ExternalIdentityReducer");
- conf.setLong("mapred.min.split.size", 1024*1024);
+ conf.setLong(org.apache.hadoop.mapreduce.lib.input.
+ FileInputFormat.SPLIT_MINSIZE, 1024*1024);
conf.setNumReduceTasks(numReduces);
conf.setJobPriority(JobPriority.HIGH);
@@ -257,9 +259,8 @@
// Check Task directories
TaskAttemptID taskid = new TaskAttemptID(
new TaskID(jobId, TaskType.MAP, 1),0);
- TestMiniMRWithDFS.checkTaskDirectories(
- mrCluster, new String[]{jobId.toString()},
- new String[]{taskid.toString()});
+ TestMiniMRWithDFS.checkTaskDirectories(mrCluster, TEST1_UGI.getUserName(),
+ new String[] { jobId.toString() }, new String[] { taskid.toString() });
ByteArrayOutputStream out = new ByteArrayOutputStream();
int exitCode = TestJobClient.runTool(conf, new JobClient(),
@@ -279,16 +280,19 @@
verifyOutput(outDir.getFileSystem(conf), outDir);
//TestJobHistory
- TestJobHistory.validateJobHistoryFileFormat(jobId, conf, "SUCCESS", false);
+ TestJobHistory.validateJobHistoryFileFormat(
+ mrCluster.getJobTrackerRunner().getJobTracker().getJobHistory(),
+ jobId, conf, "SUCCEEDED", false);
+
TestJobHistory.validateJobHistoryFileContent(mrCluster, job, conf);
- TestJobHistory.validateJobHistoryUserLogLocation(job.getID(), conf);
// Since we keep setKeepTaskFilesPattern, these files should still be
// present and will not be cleaned up.
for(int i=0; i < numTT; ++i) {
- String jobDirStr = mrCluster.getTaskTrackerLocalDir(i)+
- "/taskTracker/jobcache";
- boolean b = FileSystem.getLocal(conf).delete(new Path(jobDirStr), true);
+ Path jobDirPath =
+ new Path(mrCluster.getTaskTrackerLocalDir(i), TaskTracker
+ .getJobCacheSubdir(TEST1_UGI.getUserName()));
+ boolean b = FileSystem.getLocal(conf).delete(jobDirPath, true);
assertTrue(b);
}
}
@@ -315,9 +319,9 @@
conf.setOutputFormat(NullOutputFormat.class);
conf.setJobPriority(JobPriority.HIGH);
- conf.setLong("mapred.map.max.attempts", 1);
+ conf.setLong(JobContext.MAP_MAX_ATTEMPTS, 1);
- conf.set("hadoop.job.history.user.location", "none");
+ conf.set(JobContext.HISTORY_LOCATION, "none");
conf.setNumReduceTasks(0);
@@ -366,15 +370,13 @@
conf.setOutputFormat(NullOutputFormat.class);
conf.setNumReduceTasks(0);
- conf.setLong("mapred.map.max.attempts", 2);
+ conf.setLong(JobContext.MAP_MAX_ATTEMPTS, 2);
final Path inDir = new Path("./wc/input");
final Path outDir = new Path("./wc/output");
final Path histDir = new Path("./wc/history");
- conf.set("hadoop.job.history.user.location", histDir.toString());
-
- conf.setNumReduceTasks(numReduces);
+ conf.set(JobContext.HISTORY_LOCATION, histDir.toString());
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSpecialCharactersInOutputPath.java Sat Nov 28 20:26:01 2009
@@ -34,6 +34,7 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.util.Progressable;
/**
@@ -66,7 +67,7 @@
// use WordCount example
FileSystem.setDefaultUri(conf, fileSys);
- conf.set("mapred.job.tracker", jobTracker);
+ conf.set(JTConfig.JT_IPC_ADDRESS, jobTracker);
conf.setJobName("foo");
conf.setInputFormat(TextInputFormat.class);
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java Sat Nov 28 20:26:01 2009
@@ -27,6 +27,7 @@
import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
public class TestSpeculativeExecution extends TestCase {
@@ -49,8 +50,8 @@
new TestSetup(new TestSuite(TestSpeculativeExecution.class)) {
protected void setUp() throws Exception {
JobConf conf = new JobConf();
- conf.set("mapred.job.tracker", "localhost:0");
- conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
+ conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0");
+ conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0");
jobTracker = new FakeJobTracker(conf, (clock = new SpecFakeClock()),
trackers);
for (String tracker : trackers) {
@@ -109,7 +110,7 @@
conf.setSpeculativeExecution(true);
conf.setNumMapTasks(5);
conf.setNumReduceTasks(5);
- conf.setFloat("mapred.speculative.execution.slowTaskThreshold", 0.5f);
+ conf.setFloat(JobContext.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
//schedule maps
@@ -145,7 +146,7 @@
conf.setSpeculativeExecution(true);
conf.setNumMapTasks(5);
conf.setNumReduceTasks(0);
- conf.setFloat("mapred.speculative.execution.slowTaskThreshold", 0.5f);
+ conf.setFloat(JobContext.SPECULATIVE_SLOWTASK_THRESHOLD, 0.5f);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.initTasks();
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSubmitJob.java Sat Nov 28 20:26:01 2009
@@ -19,8 +19,10 @@
import java.io.IOException;
-import org.apache.hadoop.examples.SleepJob;
import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.util.ToolRunner;
import junit.framework.TestCase;
@@ -46,12 +48,12 @@
throws Exception {
JobConf jtConf = new JobConf();
jtConf
- .setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024L);
- jtConf.setLong(JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
+ .setLong(MRConfig.MAPMEMORY_MB, 1 * 1024L);
+ jtConf.setLong(MRConfig.REDUCEMEMORY_MB,
2 * 1024L);
- jtConf.setLong(JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+ jtConf.setLong(JTConfig.JT_MAX_MAPMEMORY_MB,
3 * 1024L);
- jtConf.setLong(JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+ jtConf.setLong(JTConfig.JT_MAX_REDUCEMEMORY_MB,
4 * 1024L);
miniMRCluster = new MiniMRCluster(0, "file:///", 0, null, null, jtConf);
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java Sat Nov 28 20:26:01 2009
@@ -22,10 +22,12 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.util.LinuxMemoryCalculatorPlugin;
import org.apache.hadoop.util.MemoryCalculatorPlugin;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import junit.framework.TestCase;
@@ -157,10 +159,8 @@
4 * 1024 * 1024 * 1024L);
conf.setLong(DummyMemoryCalculatorPlugin.MAXPMEM_TESTING_PROPERTY,
2 * 1024 * 1024 * 1024L);
- conf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
- 512L);
- conf.setLong(
- JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1024L);
+ conf.setLong(MRConfig.MAPMEMORY_MB, 512L);
+ conf.setLong(MRConfig.REDUCEMEMORY_MB, 1024L);
try {
setUpCluster(conf);
@@ -202,9 +202,9 @@
private void setUpCluster(JobConf conf)
throws Exception {
- conf.setClass("mapred.jobtracker.taskScheduler",
+ conf.setClass(JTConfig.JT_TASK_SCHEDULER,
TestTTMemoryReporting.FakeTaskScheduler.class, TaskScheduler.class);
- conf.set("mapred.job.tracker.handler.count", "1");
+ conf.set(JTConfig.JT_IPC_HANDLER_COUNT, "1");
miniMRCluster = new MiniMRCluster(1, "file:///", 3, null, null, conf);
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskFail.java Sat Nov 28 20:26:01 2009
@@ -19,6 +19,7 @@
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import junit.framework.TestCase;
@@ -30,6 +31,7 @@
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskType;
public class TestTaskFail extends TestCase {
@@ -40,7 +42,7 @@
implements Mapper<LongWritable, Text, Text, IntWritable> {
String taskid;
public void configure(JobConf job) {
- taskid = job.get("mapred.task.id");
+ taskid = job.get(JobContext.TASK_ATTEMPT_ID);
}
public void map (LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
@@ -50,7 +52,9 @@
throw new IOException();
} else if (taskid.endsWith("_1")) {
System.exit(-1);
- }
+ } else if (taskid.endsWith("_2")) {
+ throw new Error();
+ }
}
}
@@ -100,6 +104,7 @@
conf.setNumReduceTasks(0);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
+ conf.setSpeculativeExecution(false);
String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
"/tmp")).toString().replace(' ', '+');
conf.set("test.build.data", TEST_ROOT_DIR);
@@ -107,46 +112,92 @@
return new JobClient(conf).submitJob(conf);
}
+ private void validateAttempt(TaskInProgress tip, TaskAttemptID attemptId,
+ TaskStatus ts, boolean isCleanup)
+ throws IOException {
+ assertEquals(isCleanup, tip.isCleanupAttempt(attemptId));
+ assertTrue(ts != null);
+ assertEquals(TaskStatus.State.FAILED, ts.getRunState());
+ // validate tasklogs for task attempt
+ String log = readTaskLog(
+ TaskLog.LogName.STDERR, attemptId, false);
+ assertTrue(log.contains(taskLog));
+ if (!isCleanup) {
+ // validate task logs: tasklog should contain both task logs
+ // and cleanup logs
+ assertTrue(log.contains(cleanupLog));
+ } else {
+ // validate tasklogs for cleanup attempt
+ log = readTaskLog(
+ TaskLog.LogName.STDERR, attemptId, true);
+ assertTrue(log.contains(cleanupLog));
+ }
+ }
+
+ /**
+ * Reads tasklog and returns it as string after trimming it.
+ * @param filter Task log filter; can be STDOUT, STDERR,
+ * SYSLOG, DEBUGOUT, DEBUGERR
+ * @param taskId The task id for which the log has to collected
+ * @param isCleanup whether the task is a cleanup attempt or not.
+ * @return task log as string
+ * @throws IOException
+ */
+ private String readTaskLog(TaskLog.LogName filter,
+ TaskAttemptID taskId,
+ boolean isCleanup)
+ throws IOException {
+ // string buffer to store task log
+ StringBuffer result = new StringBuffer();
+ int res;
+
+ // reads the whole tasklog into inputstream
+ InputStream taskLogReader = new TaskLog.Reader(taskId, filter, 0, -1, isCleanup);
+ // construct string log from inputstream.
+ byte[] b = new byte[65536];
+ while (true) {
+ res = taskLogReader.read(b);
+ if (res > 0) {
+ result.append(new String(b));
+ } else {
+ break;
+ }
+ }
+ taskLogReader.close();
+
+ // trim the string and return it
+ String str = result.toString();
+ str = str.trim();
+ return str;
+ }
+
private void validateJob(RunningJob job, MiniMRCluster mr)
throws IOException {
assertEquals(JobStatus.SUCCEEDED, job.getJobState());
JobID jobId = job.getID();
// construct the task id of first map task
+ // this should not be cleanup attempt since the first attempt
+ // fails with an exception
TaskAttemptID attemptId =
new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 0), 0);
TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().
getTip(attemptId.getTaskID());
- // this should not be cleanup attempt since the first attempt
- // fails with an exception
- assertTrue(!tip.isCleanupAttempt(attemptId));
TaskStatus ts =
mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
- assertTrue(ts != null);
- assertEquals(TaskStatus.State.FAILED, ts.getRunState());
- // validate task logs: tasklog should contain both task logs
- // and cleanup logs
- String log = TestMiniMRMapRedDebugScript.readTaskLog(
- TaskLog.LogName.STDERR, attemptId, false);
- assertTrue(log.contains(taskLog));
- assertTrue(log.contains(cleanupLog));
+ validateAttempt(tip, attemptId, ts, false);
attemptId = new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 0), 1);
// this should be cleanup attempt since the second attempt fails
// with System.exit
- assertTrue(tip.isCleanupAttempt(attemptId));
ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
- assertTrue(ts != null);
- assertEquals(TaskStatus.State.FAILED, ts.getRunState());
- // validate tasklogs for task attempt
- log = TestMiniMRMapRedDebugScript.readTaskLog(
- TaskLog.LogName.STDERR, attemptId, false);
- assertTrue(log.contains(taskLog));
-
- // validate tasklogs for cleanup attempt
- log = TestMiniMRMapRedDebugScript.readTaskLog(
- TaskLog.LogName.STDERR, attemptId, true);
- assertTrue(log.contains(cleanupLog));
+ validateAttempt(tip, attemptId, ts, true);
+
+ attemptId = new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 0), 2);
+ // this should be cleanup attempt since the third attempt fails
+ // with Error
+ ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
+ validateAttempt(tip, attemptId, ts, true);
}
public void testWithDFS() throws IOException {
@@ -165,6 +216,9 @@
String input = "The quick brown fox\nhas many silly\nred fox sox\n";
// launch job with fail tasks
JobConf jobConf = mr.createJobConf();
+ // turn down the completion poll interval from the 5 second default
+ // for better test performance.
+ jobConf.set(Job.COMPLETION_POLL_INTERVAL_KEY, "50");
jobConf.setOutputCommitter(CommitterWithLogs.class);
RunningJob rJob = launchJob(jobConf, inDir, outDir, input);
rJob.waitForCompletion();
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskLimits.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskLimits.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskLimits.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskLimits.java Sat Nov 28 20:26:01 2009
@@ -21,6 +21,8 @@
import junit.framework.TestCase;
import java.io.IOException;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+
/**
* A JUnit test to test configured task limits.
*/
@@ -29,8 +31,8 @@
static void runTest(int maxTasks, int numMaps, int numReds,
boolean shouldFail) throws Exception {
JobConf conf = new JobConf();
- conf.setInt("mapred.jobtracker.maxtasks.per.job", maxTasks);
- conf.set("mapred.job.tracker.handler.count", "1");
+ conf.setInt(JTConfig.JT_TASKS_PER_JOB, maxTasks);
+ conf.set(JTConfig.JT_IPC_HANDLER_COUNT, "1");
MiniMRCluster mr = new MiniMRCluster(0, "file:///", 1, null, null, conf);
JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
JobConf jc = mr.createJobConf();
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java Sat Nov 28 20:26:01 2009
@@ -37,6 +37,7 @@
import org.apache.hadoop.mapred.JobTracker.ReasonForBlackListing;
import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
public class TestTaskTrackerBlacklisting extends TestCase {
@@ -144,9 +145,9 @@
new TestSetup(new TestSuite(TestTaskTrackerBlacklisting.class)) {
protected void setUp() throws Exception {
JobConf conf = new JobConf();
- conf.set("mapred.job.tracker", "localhost:0");
- conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
- conf.setInt("mapred.max.tracker.blacklists", 1);
+ conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0");
+ conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0");
+ conf.setInt(JTConfig.JT_MAX_TRACKER_BLACKLISTS, 1);
jobTracker =
new FakeJobTracker(conf, (clock = new FakeJobTrackerClock()),
@@ -173,7 +174,7 @@
healthStatus.setLastReported(status.getLastReported());
}
jobTracker.heartbeat(tts, false, initialContact,
- false, (short) responseId);
+ false, responseId);
}
responseId++;
}
@@ -468,8 +469,8 @@
conf.setSpeculativeExecution(false);
conf.setNumMapTasks(0);
conf.setNumReduceTasks(5);
- conf.set("mapred.max.reduce.failures.percent", ".70");
- conf.setBoolean("mapred.committer.job.setup.cleanup.needed", false);
+ conf.set(JobContext.REDUCE_FAILURES_MAXPERCENT, ".70");
+ conf.setBoolean(JobContext.SETUP_CLEANUP_NEEDED, false);
conf.setMaxTaskFailuresPerTracker(1);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.setClusterSize(trackers.length);
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Sat Nov 28 20:26:01 2009
@@ -18,22 +18,32 @@
package org.apache.hadoop.mapred;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry;
-import javax.security.auth.login.LoginException;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
+import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import junit.framework.TestCase;
@@ -54,20 +64,53 @@
LogFactory.getLog(TestTaskTrackerLocalization.class);
protected TaskTracker tracker;
+ protected UserGroupInformation taskTrackerUGI;
+ protected TaskController taskController;
protected JobConf trackerFConf;
+ private JobConf localizedJobConf;
protected JobID jobId;
protected TaskAttemptID taskId;
protected Task task;
protected String[] localDirs;
protected static LocalDirAllocator lDirAlloc =
- new LocalDirAllocator("mapred.local.dir");
+ new LocalDirAllocator(MRConfig.LOCAL_DIR);
+ protected Path attemptWorkDir;
+ protected File[] attemptLogFiles;
+ protected JobConf localizedTaskConf;
+
+ class InlineCleanupQueue extends CleanupQueue {
+ List<Path> stalePaths = new ArrayList<Path>();
+
+ public InlineCleanupQueue() {
+ // do nothing
+ }
+
+ @Override
+ public void addToQueue(FileSystem fs, Path... paths) {
+ // delete in-line
+ for (Path p : paths) {
+ try {
+ LOG.info("Trying to delete the path " + p);
+ if (!fs.delete(p, true)) {
+ LOG.warn("Stale path " + p.toUri().getPath());
+ stalePaths.add(p);
+ }
+ } catch (IOException e) {
+ LOG.warn("Caught exception while deleting path "
+ + p.toUri().getPath());
+ LOG.info(StringUtils.stringifyException(e));
+ stalePaths.add(p);
+ }
+ }
+ }
+ }
@Override
protected void setUp()
throws Exception {
TEST_ROOT_DIR =
- new File(System.getProperty("test.build.data", "/tmp"),
- "testTaskTrackerLocalization");
+ new File(System.getProperty("test.build.data", "/tmp"), getClass()
+ .getSimpleName());
if (!TEST_ROOT_DIR.exists()) {
TEST_ROOT_DIR.mkdirs();
}
@@ -85,32 +128,29 @@
for (int i = 0; i < numLocalDirs; i++) {
localDirs[i] = new File(ROOT_MAPRED_LOCAL_DIR, "0_" + i).getPath();
}
- trackerFConf.setStrings("mapred.local.dir", localDirs);
+ trackerFConf.setStrings(MRConfig.LOCAL_DIR, localDirs);
- // Create the job jar file
- File jobJarFile = new File(TEST_ROOT_DIR, "jobjar-on-dfs.jar");
- JarOutputStream jstream =
- new JarOutputStream(new FileOutputStream(jobJarFile));
- ZipEntry ze = new ZipEntry("lib/lib1.jar");
- jstream.putNextEntry(ze);
- jstream.closeEntry();
- ze = new ZipEntry("lib/lib2.jar");
- jstream.putNextEntry(ze);
- jstream.closeEntry();
- jstream.finish();
- jstream.close();
- trackerFConf.setJar(jobJarFile.toURI().toString());
+ // Create the job configuration file. Same as trackerConf in this test.
+ Job job = new Job(trackerFConf);
- // Create the job configuration file
- File jobConfFile = new File(TEST_ROOT_DIR, "jobconf-on-dfs.xml");
- FileOutputStream out = new FileOutputStream(jobConfFile);
- trackerFConf.writeXml(out);
- out.close();
+ job.setUGIAndUserGroupNames();
+
+ // JobClient uploads the job jar to the file system and sets it in the
+ // jobConf.
+ uploadJobJar(job);
- // Set up the TaskTracker
+ // JobClient uploads the jobConf to the file system.
+ File jobConfFile = uploadJobConf(job.getConfiguration());
+
+ // Set up the TaskTracker
tracker = new TaskTracker();
tracker.setConf(trackerFConf);
- tracker.systemFS = FileSystem.getLocal(trackerFConf); // for test case
+
+ // for test case system FS is the local FS
+ tracker.localFs = tracker.systemFS = FileSystem.getLocal(trackerFConf);
+ tracker.systemDirectory = new Path(TEST_ROOT_DIR.getAbsolutePath());
+
+ taskTrackerUGI = UserGroupInformation.login(trackerFConf);
// Set up the task to be localized
String jtIdentifier = "200907202331";
@@ -119,10 +159,75 @@
new TaskAttemptID(jtIdentifier, jobId.getId(), TaskType.MAP, 1, 0);
task =
new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, null, 1);
+ task.setConf(job.getConfiguration()); // Set conf. Set user name in particular.
- TaskController taskController = new DefaultTaskController();
+ // create jobTokens file
+ uploadJobTokensFile();
+
+
+ taskController = new DefaultTaskController();
taskController.setConf(trackerFConf);
taskController.setup();
+
+ tracker.setLocalizer(new Localizer(tracker.localFs, localDirs,
+ taskController));
+ }
+
+ /**
+ * @param job
+ * @throws IOException
+ * @throws FileNotFoundException
+ */
+ private void uploadJobJar(Job job)
+ throws IOException,
+ FileNotFoundException {
+ File jobJarFile = new File(TEST_ROOT_DIR, "jobjar-on-dfs.jar");
+ JarOutputStream jstream =
+ new JarOutputStream(new FileOutputStream(jobJarFile));
+ ZipEntry ze = new ZipEntry("lib/lib1.jar");
+ jstream.putNextEntry(ze);
+ jstream.closeEntry();
+ ze = new ZipEntry("lib/lib2.jar");
+ jstream.putNextEntry(ze);
+ jstream.closeEntry();
+ jstream.finish();
+ jstream.close();
+ job.setJar(jobJarFile.toURI().toString());
+ }
+
+ /**
+ * @param conf
+ * @return
+ * @throws FileNotFoundException
+ * @throws IOException
+ */
+ protected File uploadJobConf(Configuration conf)
+ throws FileNotFoundException,
+ IOException {
+ File jobConfFile = new File(TEST_ROOT_DIR, "jobconf-on-dfs.xml");
+ FileOutputStream out = new FileOutputStream(jobConfFile);
+ conf.writeXml(out);
+ out.close();
+ return jobConfFile;
+ }
+
+ /**
+ * create fake JobTokens file
+ * @return
+ * @throws IOException
+ */
+ protected void uploadJobTokensFile() throws IOException {
+
+ File dir = new File(TEST_ROOT_DIR, jobId.toString());
+ if(!dir.exists())
+ assertTrue("faild to create dir="+dir.getAbsolutePath(), dir.mkdirs());
+
+ File jobTokenFile = new File(dir, JobTokens.JOB_TOKEN_FILENAME);
+ FileOutputStream fos = new FileOutputStream(jobTokenFile);
+ java.io.DataOutputStream out = new java.io.DataOutputStream(fos);
+ JobTokens jt = new JobTokens();
+ jt.write(out); // writing empty file, we don't the keys for this test
+ out.close();
}
@Override
@@ -131,7 +236,7 @@
FileUtil.fullyDelete(TEST_ROOT_DIR);
}
- private static String[] getFilePermissionAttrs(String path)
+ protected static String[] getFilePermissionAttrs(String path)
throws IOException {
String output = Shell.execCommand("stat", path, "-c", "%A:%U:%G");
return output.split(":|\n");
@@ -146,106 +251,162 @@
assertTrue("Path " + path + " has the permissions " + attrs[0]
+ " instead of the expected " + expectedPermissions, attrs[0]
.equals(expectedPermissions));
- assertTrue("Path " + path + " is not user owned not by "
- + expectedOwnerUser + " but by " + attrs[1], attrs[1]
- .equals(expectedOwnerUser));
- assertTrue("Path " + path + " is not group owned not by "
- + expectedOwnerGroup + " but by " + attrs[2], attrs[2]
- .equals(expectedOwnerGroup));
+ assertTrue("Path " + path + " is user owned not by " + expectedOwnerUser
+ + " but by " + attrs[1], attrs[1].equals(expectedOwnerUser));
+ assertTrue("Path " + path + " is group owned not by " + expectedOwnerGroup
+ + " but by " + attrs[2], attrs[2].equals(expectedOwnerGroup));
}
/**
* Verify the task-controller's setup functionality
*
* @throws IOException
- * @throws LoginException
*/
public void testTaskControllerSetup()
- throws IOException,
- LoginException {
+ throws IOException {
// Task-controller is already set up in the test's setup method. Now verify.
- UserGroupInformation ugi = UserGroupInformation.login(new JobConf());
for (String localDir : localDirs) {
// Verify the local-dir itself.
File lDir = new File(localDir);
assertTrue("localDir " + lDir + " doesn't exists!", lDir.exists());
- checkFilePermissions(lDir.getAbsolutePath(), "drwxr-xr-x", ugi
- .getUserName(), ugi.getGroupNames()[0]);
+ checkFilePermissions(lDir.getAbsolutePath(), "drwxr-xr-x", task
+ .getUser(), taskTrackerUGI.getGroupNames()[0]);
+ }
+
+ // Verify the pemissions on the userlogs dir
+ File taskLog = TaskLog.getUserLogDir();
+ checkFilePermissions(taskLog.getAbsolutePath(), "drwxr-xr-x", task
+ .getUser(), taskTrackerUGI.getGroupNames()[0]);
+ }
+
+ /**
+ * Test the localization of a user on the TT.
+ *
+ * @throws IOException
+ */
+ public void testUserLocalization()
+ throws IOException {
+
+ // /////////// The main method being tested
+ tracker.getLocalizer().initializeUserDirs(task.getUser());
+ // ///////////
+
+ // Check the directory structure and permissions
+ checkUserLocalization();
+
+ // For the sake of testing re-entrancy of initializeUserDirs(), we remove
+ // the user directories now and make sure that further calls of the method
+ // don't create directories any more.
+ for (String dir : localDirs) {
+ File userDir = new File(dir, TaskTracker.getUserDir(task.getUser()));
+ FileUtil.fullyDelete(userDir);
+ }
+
+ // Now call the method again.
+ tracker.getLocalizer().initializeUserDirs(task.getUser());
+
+ // Files should not be created now and so shouldn't be there anymore.
+ for (String dir : localDirs) {
+ File userDir = new File(dir, TaskTracker.getUserDir(task.getUser()));
+ assertFalse("Unexpectedly, user-dir " + userDir.getAbsolutePath()
+ + " exists!", userDir.exists());
+ }
+ }
+
+ protected void checkUserLocalization()
+ throws IOException {
+ for (String dir : localDirs) {
+
+ File localDir = new File(dir);
+ assertTrue(MRConfig.LOCAL_DIR + localDir + " isn'task created!",
+ localDir.exists());
+
+ File taskTrackerSubDir = new File(localDir, TaskTracker.SUBDIR);
+ assertTrue("taskTracker sub-dir in the local-dir " + localDir
+ + "is not created!", taskTrackerSubDir.exists());
+
+ File userDir = new File(taskTrackerSubDir, task.getUser());
+ assertTrue("user-dir in taskTrackerSubdir " + taskTrackerSubDir
+ + "is not created!", userDir.exists());
+ checkFilePermissions(userDir.getAbsolutePath(), "drwx------", task
+ .getUser(), taskTrackerUGI.getGroupNames()[0]);
+
+ File jobCache = new File(userDir, TaskTracker.JOBCACHE);
+ assertTrue("jobcache in the userDir " + userDir + " isn't created!",
+ jobCache.exists());
+ checkFilePermissions(jobCache.getAbsolutePath(), "drwx------", task
+ .getUser(), taskTrackerUGI.getGroupNames()[0]);
// Verify the distributed cache dir.
File distributedCacheDir =
- new File(localDir, TaskTracker.getDistributedCacheDir());
+ new File(localDir, TaskTracker
+ .getDistributedCacheDir(task.getUser()));
assertTrue("distributed cache dir " + distributedCacheDir
+ " doesn't exists!", distributedCacheDir.exists());
checkFilePermissions(distributedCacheDir.getAbsolutePath(),
- "drwxr-xr-x", ugi.getUserName(), ugi.getGroupNames()[0]);
-
- // Verify the job cache dir.
- File jobCacheDir = new File(localDir, TaskTracker.getJobCacheSubdir());
- assertTrue("jobCacheDir " + jobCacheDir + " doesn't exists!",
- jobCacheDir.exists());
- checkFilePermissions(jobCacheDir.getAbsolutePath(), "drwxr-xr-x", ugi
- .getUserName(), ugi.getGroupNames()[0]);
+ "drwx------", task.getUser(), taskTrackerUGI.getGroupNames()[0]);
}
-
- // Verify the pemissions on the userlogs dir
- File taskLog = TaskLog.getUserLogDir();
- checkFilePermissions(taskLog.getAbsolutePath(), "drwxr-xr-x", ugi
- .getUserName(), ugi.getGroupNames()[0]);
}
/**
* Test job localization on a TT. Tests localization of job.xml, job.jar and
- * corresponding setting of configuration.
+ * corresponding setting of configuration. Also test
+ * {@link TaskController#initializeJob(JobInitializationContext)}
*
* @throws IOException
- * @throws LoginException
*/
public void testJobLocalization()
- throws IOException,
- LoginException {
+ throws IOException {
+
+ tracker.getLocalizer().initializeUserDirs(task.getUser());
// /////////// The main method being tested
- JobConf localizedJobConf = tracker.localizeJobFiles(task);
+ localizedJobConf = tracker.localizeJobFiles(task);
// ///////////
+ // Now initialize the job via task-controller so as to set
+ // ownership/permissions of jars, job-work-dir
+ JobInitializationContext context = new JobInitializationContext();
+ context.jobid = jobId;
+ context.user = task.getUser();
+ context.workDir =
+ new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
+
+ // /////////// The method being tested
+ taskController.initializeJob(context);
+ // ///////////
+
+ checkJobLocalization();
+ }
+
+ protected void checkJobLocalization()
+ throws IOException {
// Check the directory structure
for (String dir : localDirs) {
File localDir = new File(dir);
- assertTrue("mapred.local.dir " + localDir + " isn'task created!",
- localDir.exists());
-
File taskTrackerSubDir = new File(localDir, TaskTracker.SUBDIR);
- assertTrue("taskTracker sub-dir in the local-dir " + localDir
- + "is not created!", taskTrackerSubDir.exists());
-
- File jobCache = new File(taskTrackerSubDir, TaskTracker.JOBCACHE);
- assertTrue("jobcache in the taskTrackerSubdir " + taskTrackerSubDir
- + " isn'task created!", jobCache.exists());
+ File userDir = new File(taskTrackerSubDir, task.getUser());
+ File jobCache = new File(userDir, TaskTracker.JOBCACHE);
File jobDir = new File(jobCache, jobId.toString());
- assertTrue("job-dir in " + jobCache + " isn'task created!", jobDir
- .exists());
+ assertTrue("job-dir in " + jobCache + " isn't created!", jobDir.exists());
// check the private permissions on the job directory
- UserGroupInformation ugi = UserGroupInformation.login(localizedJobConf);
- checkFilePermissions(jobDir.getAbsolutePath(), "drwx------", ugi
- .getUserName(), ugi.getGroupNames()[0]);
+ checkFilePermissions(jobDir.getAbsolutePath(), "drwx------", task
+ .getUser(), taskTrackerUGI.getGroupNames()[0]);
}
// check the localization of job.xml
- LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
-
assertTrue("job.xml is not localized on this TaskTracker!!", lDirAlloc
- .getLocalPathToRead(TaskTracker.getLocalJobConfFile(jobId.toString()),
- trackerFConf) != null);
+ .getLocalPathToRead(TaskTracker.getLocalJobConfFile(task.getUser(),
+ jobId.toString()), trackerFConf) != null);
// check the localization of job.jar
Path jarFileLocalized =
- lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(jobId
- .toString()), trackerFConf);
+ lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(task.getUser(),
+ jobId.toString()), trackerFConf);
assertTrue("job.jar is not localized on this TaskTracker!!",
jarFileLocalized != null);
assertTrue("lib/lib1.jar is not unjarred on this TaskTracker!!", new File(
@@ -257,22 +418,22 @@
// check the creation of job work directory
assertTrue("job-work dir is not created on this TaskTracker!!", lDirAlloc
- .getLocalPathToRead(TaskTracker.getJobWorkDir(jobId.toString()),
- trackerFConf) != null);
+ .getLocalPathToRead(TaskTracker.getJobWorkDir(task.getUser(), jobId
+ .toString()), trackerFConf) != null);
- // Check the setting of job.local.dir and job.jar which will eventually be
+ // Check the setting of mapreduce.job.local.dir and job.jar which will eventually be
// used by the user's task
boolean jobLocalDirFlag = false, mapredJarFlag = false;
String localizedJobLocalDir =
localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR);
String localizedJobJar = localizedJobConf.getJar();
- for (String localDir : localizedJobConf.getStrings("mapred.local.dir")) {
+ for (String localDir : localizedJobConf.getStrings(MRConfig.LOCAL_DIR)) {
if (localizedJobLocalDir.equals(localDir + Path.SEPARATOR
- + TaskTracker.getJobWorkDir(jobId.toString()))) {
+ + TaskTracker.getJobWorkDir(task.getUser(), jobId.toString()))) {
jobLocalDirFlag = true;
}
if (localizedJobJar.equals(localDir + Path.SEPARATOR
- + TaskTracker.getJobJarFile(jobId.toString()))) {
+ + TaskTracker.getJobJarFile(task.getUser(), jobId.toString()))) {
mapredJarFlag = true;
}
}
@@ -280,7 +441,7 @@
+ " is not set properly to the target users directory : "
+ localizedJobLocalDir, jobLocalDirFlag);
assertTrue(
- "mapred.jar is not set properly to the target users directory : "
+ "mapreduce.job.jar is not set properly to the target users directory : "
+ localizedJobJar, mapredJarFlag);
}
@@ -288,13 +449,21 @@
* Test task localization on a TT.
*
* @throws IOException
- * @throws LoginException
*/
public void testTaskLocalization()
- throws IOException,
- LoginException {
+ throws IOException {
- JobConf localizedJobConf = tracker.localizeJobFiles(task);
+ tracker.getLocalizer().initializeUserDirs(task.getUser());
+ localizedJobConf = tracker.localizeJobFiles(task);
+
+ // Now initialize the job via task-controller so as to set
+ // ownership/permissions of jars, job-work-dir
+ JobInitializationContext jobContext = new JobInitializationContext();
+ jobContext.jobid = jobId;
+ jobContext.user = task.getUser();
+ jobContext.workDir =
+ new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
+ taskController.initializeJob(jobContext);
TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
tip.setJobConf(localizedJobConf);
@@ -304,78 +473,195 @@
// //////////
// check the functionality of localizeTask
- for (String dir : trackerFConf.getStrings("mapred.local.dir")) {
- assertTrue("attempt-dir in localDir " + dir + " is not created!!",
- new File(dir, TaskTracker.getLocalTaskDir(jobId.toString(), taskId
- .toString())).exists());
+ for (String dir : trackerFConf.getStrings(MRConfig.LOCAL_DIR)) {
+ File attemptDir =
+ new File(dir, TaskTracker.getLocalTaskDir(task.getUser(), jobId
+ .toString(), taskId.toString()));
+ assertTrue("attempt-dir " + attemptDir + " in localDir " + dir
+ + " is not created!!", attemptDir.exists());
}
- Path workDir =
- lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(task
- .getJobID().toString(), task.getTaskID().toString(), task
- .isTaskCleanupTask()), trackerFConf);
+ attemptWorkDir =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(
+ task.getUser(), task.getJobID().toString(), task.getTaskID()
+ .toString(), task.isTaskCleanupTask()), trackerFConf);
assertTrue("atttempt work dir for " + taskId.toString()
- + " is not created in any of the configured dirs!!", workDir != null);
+ + " is not created in any of the configured dirs!!",
+ attemptWorkDir != null);
TaskRunner runner = task.createRunner(tracker, tip);
// /////// Few more methods being tested
runner.setupChildTaskConfiguration(lDirAlloc);
- TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()),
+ TaskRunner.createChildTmpDir(new File(attemptWorkDir.toUri().getPath()),
localizedJobConf);
- File[] logFiles = TaskRunner.prepareLogFiles(task.getTaskID());
- // ///////
+ attemptLogFiles = TaskRunner.prepareLogFiles(task.getTaskID());
// Make sure the task-conf file is created
Path localTaskFile =
lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
- .getJobID().toString(), task.getTaskID().toString(), task
- .isTaskCleanupTask()), trackerFConf);
+ .getUser(), task.getJobID().toString(), task.getTaskID()
+ .toString(), task.isTaskCleanupTask()), trackerFConf);
assertTrue("Task conf file " + localTaskFile.toString()
+ " is not created!!", new File(localTaskFile.toUri().getPath())
.exists());
// /////// One more method being tested. This happens in child space.
- JobConf localizedTaskConf = new JobConf(localTaskFile);
+ localizedTaskConf = new JobConf(localTaskFile);
TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf);
// ///////
- // Make sure that the mapred.local.dir is sandboxed
+ // Initialize task via TaskController
+ TaskControllerContext taskContext =
+ new TaskController.TaskControllerContext();
+ taskContext.env =
+ new JvmEnv(null, null, null, null, -1, new File(localizedJobConf
+ .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf);
+ taskContext.task = task;
+ // /////////// The method being tested
+ taskController.initializeTask(taskContext);
+ // ///////////
+
+ checkTaskLocalization();
+ }
+
+ protected void checkTaskLocalization()
+ throws IOException {
+ // Make sure that the mapreduce.cluster.local.dir is sandboxed
for (String childMapredLocalDir : localizedTaskConf
- .getStrings("mapred.local.dir")) {
+ .getStrings(MRConfig.LOCAL_DIR)) {
assertTrue("Local dir " + childMapredLocalDir + " is not sandboxed !!",
- childMapredLocalDir.endsWith(TaskTracker.getLocalTaskDir(jobId
- .toString(), taskId.toString(), false)));
+ childMapredLocalDir.endsWith(TaskTracker.getLocalTaskDir(task
+ .getUser(), jobId.toString(), taskId.toString(), false)));
}
// Make sure task task.getJobFile is changed and pointed correctly.
assertTrue(task.getJobFile().endsWith(
- TaskTracker
- .getTaskConfFile(jobId.toString(), taskId.toString(), false)));
+ TaskTracker.getTaskConfFile(task.getUser(), jobId.toString(), taskId
+ .toString(), false)));
// Make sure that the tmp directories are created
assertTrue("tmp dir is not created in workDir "
- + workDir.toUri().getPath(),
- new File(workDir.toUri().getPath(), "tmp").exists());
+ + attemptWorkDir.toUri().getPath(), new File(attemptWorkDir.toUri()
+ .getPath(), "tmp").exists());
- // Make sure that the log are setup properly
+ // Make sure that the logs are setup properly
File logDir =
new File(HADOOP_LOG_DIR, TaskLog.USERLOGS_DIR_NAME + Path.SEPARATOR
+ task.getTaskID().toString());
assertTrue("task's log dir " + logDir.toString() + " doesn't exist!",
logDir.exists());
- UserGroupInformation ugi = UserGroupInformation.login(localizedJobConf);
- checkFilePermissions(logDir.getAbsolutePath(), "drwx------", ugi
- .getUserName(), ugi.getGroupNames()[0]);
+ checkFilePermissions(logDir.getAbsolutePath(), "drwx------", task
+ .getUser(), taskTrackerUGI.getGroupNames()[0]);
File expectedStdout = new File(logDir, TaskLog.LogName.STDOUT.toString());
assertTrue("stdout log file is improper. Expected : "
- + expectedStdout.toString() + " Observed : " + logFiles[0].toString(),
- expectedStdout.toString().equals(logFiles[0].toString()));
+ + expectedStdout.toString() + " Observed : "
+ + attemptLogFiles[0].toString(), expectedStdout.toString().equals(
+ attemptLogFiles[0].toString()));
File expectedStderr =
new File(logDir, Path.SEPARATOR + TaskLog.LogName.STDERR.toString());
assertTrue("stderr log file is improper. Expected : "
- + expectedStderr.toString() + " Observed : " + logFiles[1].toString(),
- expectedStderr.toString().equals(logFiles[1].toString()));
+ + expectedStderr.toString() + " Observed : "
+ + attemptLogFiles[1].toString(), expectedStderr.toString().equals(
+ attemptLogFiles[1].toString()));
+ }
+
+ /**
+ * @throws IOException
+ */
+ public void testTaskCleanup()
+ throws IOException {
+
+ // Localize job and localize task.
+ tracker.getLocalizer().initializeUserDirs(task.getUser());
+ localizedJobConf = tracker.localizeJobFiles(task);
+ // Now initialize the job via task-controller so as to set
+ // ownership/permissions of jars, job-work-dir
+ JobInitializationContext jobContext = new JobInitializationContext();
+ jobContext.jobid = jobId;
+ jobContext.user = localizedJobConf.getUser();
+ jobContext.workDir =
+ new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
+ taskController.initializeJob(jobContext);
+ TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
+ tip.setJobConf(localizedJobConf);
+ tip.localizeTask(task);
+ Path workDir =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(
+ task.getUser(), task.getJobID().toString(), task.getTaskID()
+ .toString(), task.isTaskCleanupTask()), trackerFConf);
+ TaskRunner runner = task.createRunner(tracker, tip);
+ tip.setTaskRunner(runner);
+ runner.setupChildTaskConfiguration(lDirAlloc);
+ TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()),
+ localizedJobConf);
+ TaskRunner.prepareLogFiles(task.getTaskID());
+ Path localTaskFile =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
+ .getUser(), task.getJobID().toString(), task.getTaskID()
+ .toString(), task.isTaskCleanupTask()), trackerFConf);
+ JobConf localizedTaskConf = new JobConf(localTaskFile);
+ TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf);
+ TaskControllerContext taskContext =
+ new TaskController.TaskControllerContext();
+ taskContext.env =
+ new JvmEnv(null, null, null, null, -1, new File(localizedJobConf
+ .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf);
+ taskContext.task = task;
+ // /////////// The method being tested
+ taskController.initializeTask(taskContext);
+
+ // TODO: Let the task run and create files.
+
+ InlineCleanupQueue cleanupQueue = new InlineCleanupQueue();
+ tracker.directoryCleanupThread = cleanupQueue;
+
+ // ////////// The central methods being tested
+ tip.removeTaskFiles(true, taskId);
+ tracker.removeJobFiles(task.getUser(), jobId.toString());
+ // //////////
+
+ // TODO: make sure that all files intended to be deleted are deleted.
+
+ assertTrue("Some task files are not deleted!! Number of stale paths is "
+ + cleanupQueue.stalePaths.size(), cleanupQueue.stalePaths.size() == 0);
+
+ // Check that the empty $mapreduce.cluster.local.dir/taskTracker/$user dirs are still
+ // there.
+ for (String localDir : localDirs) {
+ Path userDir =
+ new Path(localDir, TaskTracker.getUserDir(task.getUser()));
+ assertTrue("User directory " + userDir + " is not present!!",
+ tracker.localFs.exists(userDir));
+ }
+
+ // Test userlogs cleanup.
+ verifyUserLogsCleanup();
+ }
+
+ /**
+ * Test userlogs cleanup.
+ *
+ * @throws IOException
+ */
+ private void verifyUserLogsCleanup()
+ throws IOException {
+ Path logDir =
+ new Path(HADOOP_LOG_DIR.toURI().getPath(), TaskLog.USERLOGS_DIR_NAME
+ + Path.SEPARATOR + task.getTaskID().toString());
+
+ // Logs should be there before cleanup.
+ assertTrue("Userlogs dir " + logDir + " is not presen as expected!!",
+ tracker.localFs.exists(logDir));
+
+ // ////////// Another being tested
+ TaskLog.cleanup(-1); // -1 so as to move purgeTimeStamp to future and file
+ // modification time behind retainTimeStatmp
+ // //////////
+
+ // Logs should be gone after cleanup.
+ assertFalse("Userlogs dir " + logDir + " is not deleted as expected!!",
+ tracker.localFs.exists(logDir));
}
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java Sat Nov 28 20:26:01 2009
@@ -28,13 +28,16 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.examples.SleepJob;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.mapreduce.util.TestProcfsBasedProcessTree;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.TestProcfsBasedProcessTree;
import org.apache.hadoop.util.ToolRunner;
import junit.framework.TestCase;
@@ -57,10 +60,10 @@
private void startCluster(JobConf conf)
throws Exception {
- conf.set("mapred.job.tracker.handler.count", "1");
- conf.set("mapred.tasktracker.map.tasks.maximum", "1");
- conf.set("mapred.tasktracker.reduce.tasks.maximum", "1");
- conf.set("mapred.tasktracker.tasks.sleeptime-before-sigkill", "0");
+ conf.set(JTConfig.JT_IPC_HANDLER_COUNT, "1");
+ conf.set(TTConfig.TT_MAP_SLOTS, "1");
+ conf.set(TTConfig.TT_REDUCE_SLOTS, "1");
+ conf.set(TTConfig.TT_SLEEP_TIME_BEFORE_SIG_KILL, "0");
miniMRCluster = new MiniMRCluster(1, "file:///", 1, null, null, conf);
}
@@ -170,11 +173,8 @@
// Start cluster with proper configuration.
JobConf fConf = new JobConf();
- fConf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
- 2 * 1024L);
- fConf.setLong(
- JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
- 2 * 1024L);
+ fConf.setLong(MRConfig.MAPMEMORY_MB, 2 * 1024L);
+ fConf.setLong(MRConfig.REDUCEMEMORY_MB, 2 * 1024L);
startCluster(new JobConf());
JobConf conf = new JobConf(miniMRCluster.createJobConf());
@@ -199,13 +199,10 @@
// Start cluster with proper configuration.
JobConf fConf = new JobConf();
// very small value, so that no task escapes to successful completion.
- fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval",
+ fConf.set(TTConfig.TT_MEMORY_MANAGER_MONITORING_INTERVAL,
String.valueOf(300));
- fConf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
- 2 * 1024);
- fConf.setLong(
- JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
- 2 * 1024);
+ fConf.setLong(MRConfig.MAPMEMORY_MB, 2 * 1024);
+ fConf.setLong(MRConfig.REDUCEMEMORY_MB, 2 * 1024);
startCluster(fConf);
runJobExceedingMemoryLimit();
}
@@ -227,7 +224,7 @@
// Start cluster with proper configuration.
JobConf fConf = new JobConf();
// very small value, so that no task escapes to successful completion.
- fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval",
+ fConf.set(TTConfig.TT_MEMORY_MANAGER_MONITORING_INTERVAL,
String.valueOf(300));
//set old values, max vm property per task and upper limit on the tasks
//vm
@@ -320,16 +317,16 @@
// Start cluster with proper configuration.
JobConf fConf = new JobConf();
- fConf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+ fConf.setLong(MRConfig.MAPMEMORY_MB,
1L);
fConf.setLong(
- JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1L);
+ MRConfig.REDUCEMEMORY_MB, 1L);
// Because of the above, the total tt limit is 2mb
long TASK_TRACKER_LIMIT = 2 * 1024 * 1024L;
// very small value, so that no task escapes to successful completion.
- fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval",
+ fConf.set(TTConfig.TT_MEMORY_MANAGER_MONITORING_INTERVAL,
String.valueOf(300));
startCluster(fConf);
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTextInputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTextInputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTextInputFormat.java Sat Nov 28 20:26:01 2009
@@ -18,18 +18,34 @@
package org.apache.hadoop.mapred;
-import java.io.*;
-import java.util.*;
-import junit.framework.TestCase;
-
-import org.apache.commons.logging.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.BitSet;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.util.ReflectionUtils;
-public class TestTextInputFormat extends TestCase {
+import org.junit.Test;
+import static junit.framework.Assert.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class TestTextInputFormat {
private static final Log LOG =
LogFactory.getLog(TestTextInputFormat.class.getName());
@@ -39,17 +55,19 @@
private static FileSystem localFs = null;
static {
try {
+ defaultConf.set("fs.default.name", "file:///");
localFs = FileSystem.getLocal(defaultConf);
} catch (IOException e) {
throw new RuntimeException("init failure", e);
}
}
- private static Path workDir =
- new Path(new Path(System.getProperty("test.build.data", "."), "data"),
- "TestTextInputFormat");
-
+ private static Path workDir =
+ new Path(new Path(System.getProperty("test.build.data", "/tmp")),
+ "TestTextInputFormat").makeQualified(localFs);
+
+ @Test
public void testFormat() throws Exception {
- JobConf job = new JobConf();
+ JobConf job = new JobConf(defaultConf);
Path file = new Path(workDir, "test.txt");
// A reporter that does nothing
@@ -127,6 +145,100 @@
}
}
+ @Test
+ public void testSplitableCodecs() throws IOException {
+ JobConf conf = new JobConf(defaultConf);
+ int seed = new Random().nextInt();
+ // Create the codec
+ CompressionCodec codec = null;
+ try {
+ codec = (CompressionCodec)
+ ReflectionUtils.newInstance(conf.getClassByName("org.apache.hadoop.io.compress.BZip2Codec"), conf);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException("Illegal codec!");
+ }
+ Path file = new Path(workDir, "test"+codec.getDefaultExtension());
+
+ // A reporter that does nothing
+ Reporter reporter = Reporter.NULL;
+ LOG.info("seed = "+seed);
+ Random random = new Random(seed);
+ FileSystem localFs = FileSystem.getLocal(conf);
+
+ localFs.delete(workDir, true);
+ FileInputFormat.setInputPaths(conf, workDir);
+
+ final int MAX_LENGTH = 500000;
+
+ // for a variety of lengths
+ for (int length = MAX_LENGTH / 2; length < MAX_LENGTH;
+ length += random.nextInt(MAX_LENGTH / 4)+1) {
+
+ LOG.info("creating; entries = " + length);
+
+
+ // create a file with length entries
+ Writer writer =
+ new OutputStreamWriter(codec.createOutputStream(localFs.create(file)));
+ try {
+ for (int i = 0; i < length; i++) {
+ writer.write(Integer.toString(i));
+ writer.write("\n");
+ }
+ } finally {
+ writer.close();
+ }
+
+ // try splitting the file in a variety of sizes
+ TextInputFormat format = new TextInputFormat();
+ format.configure(conf);
+ LongWritable key = new LongWritable();
+ Text value = new Text();
+ for (int i = 0; i < 3; i++) {
+ int numSplits = random.nextInt(MAX_LENGTH/2000)+1;
+ LOG.info("splitting: requesting = " + numSplits);
+ InputSplit[] splits = format.getSplits(conf, numSplits);
+ LOG.info("splitting: got = " + splits.length);
+
+
+
+ // check each split
+ BitSet bits = new BitSet(length);
+ for (int j = 0; j < splits.length; j++) {
+ LOG.debug("split["+j+"]= " + splits[j]);
+ RecordReader<LongWritable, Text> reader =
+ format.getRecordReader(splits[j], conf, reporter);
+ try {
+ int counter = 0;
+ while (reader.next(key, value)) {
+ int v = Integer.parseInt(value.toString());
+ LOG.debug("read " + v);
+
+ if (bits.get(v)) {
+ LOG.warn("conflict with " + v +
+ " in split " + j +
+ " at position "+reader.getPos());
+ }
+ assertFalse("Key in multiple partitions.", bits.get(v));
+ bits.set(v);
+ counter++;
+ }
+ if (counter > 0) {
+ LOG.info("splits["+j+"]="+splits[j]+" count=" + counter);
+ } else {
+ LOG.debug("splits["+j+"]="+splits[j]+" count=" + counter);
+ }
+ } finally {
+ reader.close();
+ }
+ }
+ assertEquals("Some keys in no partition.", length, bits.cardinality());
+ }
+
+ }
+
+ }
+
private static LineReader makeStream(String str) throws IOException {
return new LineReader(new ByteArrayInputStream
(str.getBytes("UTF-8")),
@@ -137,7 +249,8 @@
(str.getBytes("UTF-8")),
bufsz);
}
-
+
+ @Test
public void testUTF8() throws Exception {
LineReader in = makeStream("abcd\u20acbdcd\u20ac");
Text line = new Text();
@@ -156,6 +269,7 @@
*
* @throws Exception
*/
+ @Test
public void testNewLines() throws Exception {
final String STR = "a\nbb\n\nccc\rdddd\r\r\r\n\r\neeeee";
final int STRLENBYTES = STR.getBytes().length;
@@ -195,6 +309,7 @@
*
* @throws Exception
*/
+ @Test
public void testMaxLineLength() throws Exception {
final String STR = "a\nbb\n\nccc\rdddd\r\neeeee";
final int STRLENBYTES = STR.getBytes().length;
@@ -219,6 +334,38 @@
}
}
+ @Test
+ public void testMRMaxLine() throws Exception {
+ final int MAXPOS = 1024 * 1024;
+ final int MAXLINE = 10 * 1024;
+ final int BUF = 64 * 1024;
+ final InputStream infNull = new InputStream() {
+ int position = 0;
+ final int MAXPOSBUF = 1024 * 1024 + BUF; // max LRR pos + LineReader buf
+ @Override
+ public int read() {
+ ++position;
+ return 0;
+ }
+ @Override
+ public int read(byte[] b) {
+ assertTrue("Read too many bytes from the stream", position < MAXPOSBUF);
+ Arrays.fill(b, (byte) 0);
+ position += b.length;
+ return b.length;
+ }
+ };
+ final LongWritable key = new LongWritable();
+ final Text val = new Text();
+ LOG.info("Reading a line from /dev/null");
+ final Configuration conf = new Configuration(false);
+ conf.setInt(org.apache.hadoop.mapreduce.lib.input.
+ LineRecordReader.MAX_LINE_LENGTH, MAXLINE);
+ conf.setInt("io.file.buffer.size", BUF); // used by LRR
+ final LineRecordReader lrr = new LineRecordReader(infNull, 0, MAXPOS, conf);
+ assertFalse("Read a line from null", lrr.next(key, val));
+ }
+
private static void writeFile(FileSystem fs, Path name,
CompressionCodec codec,
String contents) throws IOException {
@@ -244,7 +391,7 @@
Text value = reader.createValue();
while (reader.next(key, value)) {
result.add(value);
- value = (Text) reader.createValue();
+ value = reader.createValue();
}
reader.close();
return result;
@@ -253,8 +400,9 @@
/**
* Test using the gzip codec for reading
*/
- public static void testGzip() throws IOException {
- JobConf job = new JobConf();
+ @Test
+ public void testGzip() throws IOException {
+ JobConf job = new JobConf(defaultConf);
CompressionCodec gzip = new GzipCodec();
ReflectionUtils.setConf(gzip, job);
localFs.delete(workDir, true);
@@ -286,8 +434,9 @@
/**
* Test using the gzip codec and an empty input file
*/
- public static void testGzipEmpty() throws IOException {
- JobConf job = new JobConf();
+ @Test
+ public void testGzipEmpty() throws IOException {
+ JobConf job = new JobConf(defaultConf);
CompressionCodec gzip = new GzipCodec();
ReflectionUtils.setConf(gzip, job);
localFs.delete(workDir, true);
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTextOutputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTextOutputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTextOutputFormat.java Sat Nov 28 20:26:01 2009
@@ -47,7 +47,7 @@
@SuppressWarnings("unchecked")
public void testFormat() throws Exception {
JobConf job = new JobConf();
- job.set("mapred.task.id", attempt);
+ job.set(JobContext.TASK_ATTEMPT_ID, attempt);
FileOutputFormat.setOutputPath(job, workDir.getParent().getParent());
FileOutputFormat.setWorkOutputPath(job, workDir);
FileSystem fs = workDir.getFileSystem(job);
@@ -99,8 +99,8 @@
public void testFormatWithCustomSeparator() throws Exception {
JobConf job = new JobConf();
String separator = "\u0001";
- job.set("mapred.textoutputformat.separator", separator);
- job.set("mapred.task.id", attempt);
+ job.set("mapreduce.output.textoutputformat.separator", separator);
+ job.set(JobContext.TASK_ATTEMPT_ID, attempt);
FileOutputFormat.setOutputPath(job, workDir.getParent().getParent());
FileOutputFormat.setWorkOutputPath(job, workDir);
FileSystem fs = workDir.getFileSystem(job);
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java Sat Nov 28 20:26:01 2009
@@ -25,6 +25,8 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
public class TestTrackerBlacklistAcrossJobs extends TestCase {
private static final String hosts[] = new String[] {
@@ -36,7 +38,7 @@
String hostname = "";
public void configure(JobConf job) {
- this.hostname = job.get("slave.host.name");
+ this.hostname = job.get(TTConfig.TT_HOST_NAME);
}
public void map(NullWritable key, NullWritable value,
@@ -57,7 +59,7 @@
fileSys = FileSystem.get(conf);
// start mr cluster
JobConf jtConf = new JobConf();
- jtConf.setInt("mapred.max.tracker.blacklists", 1);
+ jtConf.setInt(JTConfig.JT_MAX_TRACKER_BLACKLISTS, 1);
mr = new MiniMRCluster(3, fileSys.getUri().toString(),
1, null, hosts, jtConf);
@@ -65,7 +67,7 @@
// setup job configuration
JobConf mrConf = mr.createJobConf();
JobConf job = new JobConf(mrConf);
- job.setInt("mapred.max.tracker.failures", 1);
+ job.setInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 1);
job.setNumMapTasks(6);
job.setNumReduceTasks(0);
job.setMapperClass(FailOnHostMapper.class);
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTrackerReservation.java Sat Nov 28 20:26:01 2009
@@ -23,7 +23,9 @@
import javax.security.auth.login.LoginException;
import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import junit.extensions.TestSetup;
@@ -56,8 +58,8 @@
TestSetup setup = new TestSetup(new TestSuite(TestTrackerReservation.class)) {
protected void setUp() throws Exception {
JobConf conf = new JobConf();
- conf.set("mapred.job.tracker", "localhost:0");
- conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
+ conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0");
+ conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0");
jobTracker = new FakeJobTracker(conf, new Clock(), trackers);
for (String tracker : trackers) {
FakeObjectUtilities.establishFirstContact(jobTracker, tracker);
@@ -91,8 +93,7 @@
conf.setNumReduceTasks(1);
conf.setSpeculativeExecution(false);
- conf.setBoolean(
- "mapred.committer.job.setup.cleanup.needed", false);
+ conf.setBoolean(JobContext.SETUP_CLEANUP_NEEDED, false);
//Set task tracker objects for reservation.
TaskTracker tt1 = jobTracker.getTaskTracker(trackers[0]);
@@ -124,6 +125,11 @@
2, fjob.getNumReservedTaskTrackersForMaps());
assertEquals("Trackers not reserved for the job : reduces",
2, fjob.getNumReservedTaskTrackersForReduces());
+ ClusterMetrics metrics = jobTracker.getClusterMetrics();
+ assertEquals("reserved map slots do not match",
+ 4, metrics.getReservedMapSlots());
+ assertEquals("reserved reduce slots do not match",
+ 4, metrics.getReservedReduceSlots());
TaskAttemptID mTid = fjob.findMapTask(trackers[1]);
TaskAttemptID rTid = fjob.findReduceTask(trackers[1]);
@@ -138,6 +144,11 @@
0, fjob.getNumReservedTaskTrackersForMaps());
assertEquals("Reservation for the job not released : Reduces",
0, fjob.getNumReservedTaskTrackersForReduces());
+ metrics = jobTracker.getClusterMetrics();
+ assertEquals("reserved map slots do not match",
+ 0, metrics.getReservedMapSlots());
+ assertEquals("reserved reduce slots do not match",
+ 0, metrics.getReservedReduceSlots());
}
/**
@@ -165,6 +176,11 @@
0, job.getNumReservedTaskTrackersForMaps());
assertEquals("Reservation for the job not released : Reduces",
0, job.getNumReservedTaskTrackersForReduces());
+ ClusterMetrics metrics = jobTracker.getClusterMetrics();
+ assertEquals("reserved map slots do not match",
+ 0, metrics.getReservedMapSlots());
+ assertEquals("reserved reduce slots do not match",
+ 0, metrics.getReservedReduceSlots());
}
/**
@@ -179,9 +195,9 @@
conf.setSpeculativeExecution(false);
conf.setNumMapTasks(2);
conf.setNumReduceTasks(2);
- conf.set("mapred.max.reduce.failures.percent", ".70");
- conf.set("mapred.max.map.failures.percent", ".70");
- conf.setBoolean("mapred.committer.job.setup.cleanup.needed", false);
+ conf.set(JobContext.REDUCE_FAILURES_MAXPERCENT, ".70");
+ conf.set(JobContext.MAP_FAILURES_MAX_PERCENT, ".70");
+ conf.setBoolean(JobContext.SETUP_CLEANUP_NEEDED, false);
conf.setMaxTaskFailuresPerTracker(1);
FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
job.setClusterSize(trackers.length);
@@ -212,6 +228,11 @@
2, job.getNumReservedTaskTrackersForMaps());
assertEquals("Trackers not reserved for the job : reduces",
2, job.getNumReservedTaskTrackersForReduces());
+ ClusterMetrics metrics = jobTracker.getClusterMetrics();
+ assertEquals("reserved map slots do not match",
+ 4, metrics.getReservedMapSlots());
+ assertEquals("reserved reduce slots do not match",
+ 4, metrics.getReservedReduceSlots());
/*
* FakeJobInProgress.findMapTask does not handle
@@ -230,6 +251,12 @@
1, job.getNumReservedTaskTrackersForMaps());
assertEquals("Extra Trackers reserved for the job : reduces",
1, job.getNumReservedTaskTrackersForReduces());
+ metrics = jobTracker.getClusterMetrics();
+ assertEquals("reserved map slots do not match",
+ 2, metrics.getReservedMapSlots());
+ assertEquals("reserved reduce slots do not match",
+ 2, metrics.getReservedReduceSlots());
+
//Finish the map task on the tracker 1. Finishing it here to work
//around bug in the FakeJobInProgress object
job.finishTask(mTid);
@@ -245,7 +272,11 @@
0, job.getNumReservedTaskTrackersForMaps());
assertEquals("Trackers not unreserved for the job : reduces",
0, job.getNumReservedTaskTrackersForReduces());
-
+ metrics = jobTracker.getClusterMetrics();
+ assertEquals("reserved map slots do not match",
+ 0, metrics.getReservedMapSlots());
+ assertEquals("reserved reduce slots do not match",
+ 0, metrics.getReservedReduceSlots());
}
}
\ No newline at end of file
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestUserDefinedCounters.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestUserDefinedCounters.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestUserDefinedCounters.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestUserDefinedCounters.java Sat Nov 28 20:26:01 2009
@@ -101,7 +101,8 @@
RunningJob runningJob = JobClient.runJob(conf);
Path[] outputFiles = FileUtil.stat2Paths(
- fs.listStatus(OUTPUT_DIR, new OutputLogFilter()));
+ fs.listStatus(OUTPUT_DIR,
+ new Utils.OutputFileUtils.OutputFilesFilter()));
if (outputFiles.length > 0) {
InputStream is = fs.open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));