You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2009/04/28 12:55:31 UTC
svn commit: r769330 - in /hadoop/core/trunk: ./
src/examples/org/apache/hadoop/examples/
src/mapred/org/apache/hadoop/mapreduce/ src/test/org/apache/hadoop/mapred/
Author: sharad
Date: Tue Apr 28 10:55:30 2009
New Revision: 769330
URL: http://svn.apache.org/viewvc?rev=769330&view=rev
Log:
HADOOP-5680. Change SleepJob to use new mapreduce api. Contributed by Amareshwari Sriramadasu.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=769330&r1=769329&r2=769330&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Apr 28 10:55:30 2009
@@ -256,6 +256,9 @@
HADOOP-5589. Eliminate source limit of 64 for map-side joins imposed by
TupleWritable encoding. (Jingkei Ly via cdouglas)
+ HADOOP-5680. Change org.apache.hadoop.examples.SleepJob to use new
+ mapreduce api. (Amareshwari Sriramadasu via sharad)
+
OPTIMIZATIONS
HADOOP-5595. NameNode does not need to run a replicator to choose a
Modified: hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java?rev=769330&r1=769329&r2=769330&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java (original)
+++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java Tue Apr 28 10:55:30 2009
@@ -20,18 +20,18 @@
import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
-import java.util.Iterator;
-import java.util.Random;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -42,64 +42,69 @@
* of <code>numMappers * mapSleepTime / 100</code>, so the job uses
* some disk space.
*/
-public class SleepJob extends Configured implements Tool,
- Mapper<IntWritable, IntWritable, IntWritable, NullWritable>,
- Reducer<IntWritable, NullWritable, NullWritable, NullWritable>,
- Partitioner<IntWritable,NullWritable> {
-
- private long mapSleepDuration = 100;
- private long reduceSleepDuration = 100;
- private int mapSleepCount = 1;
- private int reduceSleepCount = 1;
- private int count = 0;
+public class SleepJob extends Configured implements Tool {
- public int getPartition(IntWritable k, NullWritable v, int numPartitions) {
- return k.get() % numPartitions;
+ public static class SleepJobPartitioner extends
+ Partitioner<IntWritable, NullWritable> {
+ public int getPartition(IntWritable k, NullWritable v, int numPartitions) {
+ return k.get() % numPartitions;
+ }
}
- public static class EmptySplit implements InputSplit {
+ public static class EmptySplit extends InputSplit implements Writable {
public void write(DataOutput out) throws IOException { }
public void readFields(DataInput in) throws IOException { }
public long getLength() { return 0L; }
public String[] getLocations() { return new String[0]; }
}
- public static class SleepInputFormat extends Configured
- implements InputFormat<IntWritable,IntWritable> {
- public InputSplit[] getSplits(JobConf conf, int numSplits) {
- InputSplit[] ret = new InputSplit[numSplits];
+ public static class SleepInputFormat
+ extends InputFormat<IntWritable,IntWritable> {
+
+ public List<InputSplit> getSplits(JobContext jobContext) {
+ List<InputSplit> ret = new ArrayList<InputSplit>();
+ int numSplits = jobContext.getConfiguration().
+ getInt("mapred.map.tasks", 1);
for (int i = 0; i < numSplits; ++i) {
- ret[i] = new EmptySplit();
+ ret.add(new EmptySplit());
}
return ret;
}
- public RecordReader<IntWritable,IntWritable> getRecordReader(
- InputSplit ignored, JobConf conf, Reporter reporter)
+
+ public RecordReader<IntWritable,IntWritable> createRecordReader(
+ InputSplit ignored, TaskAttemptContext taskContext)
throws IOException {
+ Configuration conf = taskContext.getConfiguration();
final int count = conf.getInt("sleep.job.map.sleep.count", 1);
if (count < 0) throw new IOException("Invalid map count: " + count);
final int redcount = conf.getInt("sleep.job.reduce.sleep.count", 1);
if (redcount < 0)
throw new IOException("Invalid reduce count: " + redcount);
- final int emitPerMapTask = (redcount * conf.getNumReduceTasks());
- return new RecordReader<IntWritable,IntWritable>() {
+ final int emitPerMapTask = (redcount * taskContext.getNumReduceTasks());
+
+ return new RecordReader<IntWritable,IntWritable>() {
private int records = 0;
private int emitCount = 0;
+ private IntWritable key = null;
+ private IntWritable value = null;
+ public void initialize(InputSplit split, TaskAttemptContext context) {
+ }
- public boolean next(IntWritable key, IntWritable value)
+ public boolean nextKeyValue()
throws IOException {
+ key = new IntWritable();
key.set(emitCount);
int emit = emitPerMapTask / count;
if ((emitPerMapTask) % count > records) {
++emit;
}
emitCount += emit;
+ value = new IntWritable();
value.set(emit);
return records++ < count;
}
- public IntWritable createKey() { return new IntWritable(); }
- public IntWritable createValue() { return new IntWritable(); }
- public long getPos() throws IOException { return records; }
+ public IntWritable getCurrentKey() { return key; }
+ public IntWritable getCurrentValue() { return value; }
public void close() throws IOException { }
public float getProgress() throws IOException {
return records / ((float)count);
@@ -108,93 +113,104 @@
}
}
- public void map(IntWritable key, IntWritable value,
- OutputCollector<IntWritable, NullWritable> output, Reporter reporter)
- throws IOException {
-
- //it is expected that every map processes mapSleepCount number of records.
- try {
- reporter.setStatus("Sleeping... (" +
+ public static class SleepMapper
+ extends Mapper<IntWritable, IntWritable, IntWritable, NullWritable> {
+ private long mapSleepDuration = 100;
+ private int mapSleepCount = 1;
+ private int count = 0;
+
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ this.mapSleepCount =
+ conf.getInt("sleep.job.map.sleep.count", mapSleepCount);
+ this.mapSleepDuration =
+ conf.getLong("sleep.job.map.sleep.time" , 100) / mapSleepCount;
+ }
+
+ public void map(IntWritable key, IntWritable value, Context context
+ ) throws IOException, InterruptedException {
+ //it is expected that every map processes mapSleepCount number of records.
+ try {
+ context.setStatus("Sleeping... (" +
(mapSleepDuration * (mapSleepCount - count)) + ") ms left");
- Thread.sleep(mapSleepDuration);
- }
- catch (InterruptedException ex) {
- throw (IOException)new IOException(
- "Interrupted while sleeping").initCause(ex);
- }
- ++count;
- // output reduceSleepCount * numReduce number of random values, so that
- // each reducer will get reduceSleepCount number of keys.
- int k = key.get();
- for (int i = 0; i < value.get(); ++i) {
- output.collect(new IntWritable(k + i), NullWritable.get());
+ Thread.sleep(mapSleepDuration);
+ }
+ catch (InterruptedException ex) {
+ throw (IOException)new IOException(
+ "Interrupted while sleeping").initCause(ex);
+ }
+ ++count;
+ // output reduceSleepCount * numReduce number of random values, so that
+ // each reducer will get reduceSleepCount number of keys.
+ int k = key.get();
+ for (int i = 0; i < value.get(); ++i) {
+ context.write(new IntWritable(k + i), NullWritable.get());
+ }
}
}
+
+ public static class SleepReducer
+ extends Reducer<IntWritable, NullWritable, NullWritable, NullWritable> {
+ private long reduceSleepDuration = 100;
+ private int reduceSleepCount = 1;
+ private int count = 0;
+
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ this.reduceSleepCount =
+ conf.getInt("sleep.job.reduce.sleep.count", reduceSleepCount);
+ this.reduceSleepDuration =
+ conf.getLong("sleep.job.reduce.sleep.time" , 100) / reduceSleepCount;
+ }
- public void reduce(IntWritable key, Iterator<NullWritable> values,
- OutputCollector<NullWritable, NullWritable> output, Reporter reporter)
+ public void reduce(IntWritable key, Iterable<NullWritable> values,
+ Context context)
throws IOException {
- try {
- reporter.setStatus("Sleeping... (" +
- (reduceSleepDuration * (reduceSleepCount - count)) + ") ms left");
+ try {
+ context.setStatus("Sleeping... (" +
+ (reduceSleepDuration * (reduceSleepCount - count)) + ") ms left");
Thread.sleep(reduceSleepDuration);
- }
- catch (InterruptedException ex) {
- throw (IOException)new IOException(
+ }
+ catch (InterruptedException ex) {
+ throw (IOException)new IOException(
"Interrupted while sleeping").initCause(ex);
+ }
+ count++;
}
- count++;
}
- public void configure(JobConf job) {
- this.mapSleepCount =
- job.getInt("sleep.job.map.sleep.count", mapSleepCount);
- this.reduceSleepCount =
- job.getInt("sleep.job.reduce.sleep.count", reduceSleepCount);
- this.mapSleepDuration =
- job.getLong("sleep.job.map.sleep.time" , 100) / mapSleepCount;
- this.reduceSleepDuration =
- job.getLong("sleep.job.reduce.sleep.time" , 100) / reduceSleepCount;
- }
-
- public void close() throws IOException {
- }
-
- public static void main(String[] args) throws Exception{
+ public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new SleepJob(), args);
System.exit(res);
}
- public int run(int numMapper, int numReducer, long mapSleepTime,
- int mapSleepCount, long reduceSleepTime,
- int reduceSleepCount) throws IOException {
- JobConf job = setupJobConf(numMapper, numReducer, mapSleepTime,
- mapSleepCount, reduceSleepTime, reduceSleepCount);
- JobClient.runJob(job);
- return 0;
- }
-
- public JobConf setupJobConf(int numMapper, int numReducer,
- long mapSleepTime, int mapSleepCount,
- long reduceSleepTime, int reduceSleepCount) {
- JobConf job = new JobConf(getConf(), SleepJob.class);
- job.setNumMapTasks(numMapper);
+ public Job createJob(int numMapper, int numReducer,
+ long mapSleepTime, int mapSleepCount,
+ long reduceSleepTime, int reduceSleepCount)
+ throws IOException {
+ Configuration conf = getConf();
+ conf.setLong("sleep.job.map.sleep.time", mapSleepTime);
+ conf.setLong("sleep.job.reduce.sleep.time", reduceSleepTime);
+ conf.setInt("sleep.job.map.sleep.count", mapSleepCount);
+ conf.setInt("sleep.job.reduce.sleep.count", reduceSleepCount);
+ conf.setInt("mapred.map.tasks", numMapper);
+ Job job = new Job(conf, "sleep");
+ job.setNumReduceTasks(numReducer);
+ job.setJarByClass(SleepJob.class);
job.setNumReduceTasks(numReducer);
- job.setMapperClass(SleepJob.class);
+ job.setMapperClass(SleepMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(NullWritable.class);
- job.setReducerClass(SleepJob.class);
- job.setOutputFormat(NullOutputFormat.class);
- job.setInputFormat(SleepInputFormat.class);
- job.setPartitionerClass(SleepJob.class);
+ job.setReducerClass(SleepReducer.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
+ job.setInputFormatClass(SleepInputFormat.class);
+ job.setPartitionerClass(SleepJobPartitioner.class);
job.setSpeculativeExecution(false);
job.setJobName("Sleep job");
FileInputFormat.addInputPath(job, new Path("ignored"));
- job.setLong("sleep.job.map.sleep.time", mapSleepTime);
- job.setLong("sleep.job.reduce.sleep.time", reduceSleepTime);
- job.setInt("sleep.job.map.sleep.count", mapSleepCount);
- job.setInt("sleep.job.reduce.sleep.count", reduceSleepCount);
return job;
}
@@ -205,7 +221,7 @@
" [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)]" +
" [-recordt recordSleepTime (msec)]");
ToolRunner.printGenericCommandUsage(System.err);
- return -1;
+ return 2;
}
int numMapper = 1, numReducer = 1;
@@ -233,9 +249,9 @@
// sleep for *SleepTime duration in Task by recSleepTime per record
mapSleepCount = (int)Math.ceil(mapSleepTime / ((double)recSleepTime));
reduceSleepCount = (int)Math.ceil(reduceSleepTime / ((double)recSleepTime));
-
- return run(numMapper, numReducer, mapSleepTime, mapSleepCount,
- reduceSleepTime, reduceSleepCount);
+ Job job = createJob(numMapper, numReducer, mapSleepTime,
+ mapSleepCount, reduceSleepTime, reduceSleepCount);
+ return job.waitForCompletion(true) ? 0 : 1;
}
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java?rev=769330&r1=769329&r2=769330&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java Tue Apr 28 10:55:30 2009
@@ -255,6 +255,41 @@
}
/**
+ * Turn speculative execution on or off for this job.
+ *
+ * @param speculativeExecution <code>true</code> if speculative execution
+ * should be turned on, else <code>false</code>.
+ */
+ public void setSpeculativeExecution(boolean speculativeExecution) {
+ ensureState(JobState.DEFINE);
+ conf.setSpeculativeExecution(speculativeExecution);
+ }
+
+ /**
+ * Turn speculative execution on or off for this job for map tasks.
+ *
+ * @param speculativeExecution <code>true</code> if speculative execution
+ * should be turned on for map tasks,
+ * else <code>false</code>.
+ */
+ public void setMapSpeculativeExecution(boolean speculativeExecution) {
+ ensureState(JobState.DEFINE);
+ conf.setMapSpeculativeExecution(speculativeExecution);
+ }
+
+ /**
+ * Turn speculative execution on or off for this job for reduce tasks.
+ *
+ * @param speculativeExecution <code>true</code> if speculative execution
+ * should be turned on for reduce tasks,
+ * else <code>false</code>.
+ */
+ public void setReduceSpeculativeExecution(boolean speculativeExecution) {
+ ensureState(JobState.DEFINE);
+ conf.setReduceSpeculativeExecution(speculativeExecution);
+ }
+
+ /**
* Get the URL where some job progress information will be displayed.
*
* @return the URL where some job progress information will be displayed.
@@ -265,6 +300,16 @@
}
/**
+ * Get the job identifier.
+ *
+ * @return the job identifier.
+ */
+ public JobID getID() {
+ ensureState(JobState.RUNNING);
+ return info.getID();
+ }
+
+ /**
* Get the <i>progress</i> of the job's map-tasks, as a float between 0.0
* and 1.0. When all map tasks have completed, the function returns 1.0.
*
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java?rev=769330&r1=769329&r2=769330&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java Tue Apr 28 10:55:30 2009
@@ -34,6 +34,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -78,23 +79,27 @@
assertEquals(qMgr.getSchedulerInfo("qq1"), "queueInfoForqq1");
}
- public void testAllEnabledACLForJobSubmission() throws IOException {
+ public void testAllEnabledACLForJobSubmission()
+ throws IOException, InterruptedException, ClassNotFoundException {
JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
verifyJobSubmission(conf, true);
}
- public void testAllDisabledACLForJobSubmission() throws IOException {
+ public void testAllDisabledACLForJobSubmission()
+ throws IOException, InterruptedException, ClassNotFoundException {
JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "");
verifyJobSubmission(conf, false);
}
- public void testUserDisabledACLForJobSubmission() throws IOException {
+ public void testUserDisabledACLForJobSubmission()
+ throws IOException, InterruptedException, ClassNotFoundException {
JobConf conf = setupConf("mapred.queue.default.acl-submit-job",
"3698-non-existent-user");
verifyJobSubmission(conf, false);
}
- public void testDisabledACLForNonDefaultQueue() throws IOException {
+ public void testDisabledACLForNonDefaultQueue()
+ throws IOException, InterruptedException, ClassNotFoundException {
// allow everyone in default queue
JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
// setup a different queue
@@ -105,13 +110,14 @@
verifyJobSubmission(conf, false, "q1");
}
- public void testSubmissionToInvalidQueue() throws IOException{
+ public void testSubmissionToInvalidQueue()
+ throws IOException, InterruptedException, ClassNotFoundException {
JobConf conf = new JobConf();
conf.set("mapred.queue.names","default");
setUpCluster(conf);
String queueName = "q1";
try {
- RunningJob rjob = submitSleepJob(1, 1, 100, 100, true, null, queueName);
+ Job rjob = submitSleepJob(1, 1, 100, 100, true, null, queueName);
} catch (IOException ioe) {
assertTrue(ioe.getMessage().contains("Queue \"" + queueName + "\" does not exist"));
return;
@@ -122,7 +128,7 @@
}
public void testEnabledACLForNonDefaultQueue() throws IOException,
- LoginException {
+ LoginException, InterruptedException, ClassNotFoundException {
// login as self...
UserGroupInformation ugi = UnixUserGroupInformation.login();
String userName = ugi.getUserName();
@@ -137,7 +143,8 @@
}
public void testUserEnabledACLForJobSubmission()
- throws IOException, LoginException {
+ throws IOException, LoginException,
+ InterruptedException, ClassNotFoundException {
// login as self...
UserGroupInformation ugi = UnixUserGroupInformation.login();
String userName = ugi.getUserName();
@@ -148,7 +155,8 @@
}
public void testGroupsEnabledACLForJobSubmission()
- throws IOException, LoginException {
+ throws IOException, LoginException,
+ InterruptedException, ClassNotFoundException {
// login as self, get one group, and add in allowed list.
UserGroupInformation ugi = UnixUserGroupInformation.login();
String[] groups = ugi.getGroupNames();
@@ -160,23 +168,27 @@
verifyJobSubmission(conf, true);
}
- public void testAllEnabledACLForJobKill() throws IOException {
+ public void testAllEnabledACLForJobKill()
+ throws IOException, InterruptedException, ClassNotFoundException {
JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", "*");
verifyJobKill(conf, true);
}
- public void testAllDisabledACLForJobKill() throws IOException {
+ public void testAllDisabledACLForJobKill()
+ throws IOException, InterruptedException, ClassNotFoundException {
JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", "");
verifyJobKillAsOtherUser(conf, false, "dummy-user,dummy-user-group");
}
- public void testOwnerAllowedForJobKill() throws IOException {
+ public void testOwnerAllowedForJobKill()
+ throws IOException, InterruptedException, ClassNotFoundException {
JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
"junk-user");
verifyJobKill(conf, true);
}
- public void testUserDisabledACLForJobKill() throws IOException {
+ public void testUserDisabledACLForJobKill()
+ throws IOException, InterruptedException, ClassNotFoundException {
//setup a cluster allowing a user to submit
JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
"dummy-user");
@@ -184,7 +196,7 @@
}
public void testUserEnabledACLForJobKill() throws IOException,
- LoginException {
+ LoginException, InterruptedException, ClassNotFoundException {
// login as self...
UserGroupInformation ugi = UnixUserGroupInformation.login();
String userName = ugi.getUserName();
@@ -193,7 +205,8 @@
verifyJobKillAsOtherUser(conf, true, "dummy-user,dummy-user-group");
}
- public void testUserDisabledForJobPriorityChange() throws IOException {
+ public void testUserDisabledForJobPriorityChange()
+ throws IOException, InterruptedException, ClassNotFoundException {
JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
"junk-user");
verifyJobPriorityChangeAsOtherUser(conf, false,
@@ -379,12 +392,13 @@
}
private void verifyJobSubmission(JobConf conf, boolean shouldSucceed)
- throws IOException {
+ throws IOException, InterruptedException, ClassNotFoundException {
verifyJobSubmission(conf, shouldSucceed, "default");
}
private void verifyJobSubmission(JobConf conf, boolean shouldSucceed,
- String queue) throws IOException {
+ String queue)
+ throws IOException, InterruptedException, ClassNotFoundException {
setUpCluster(conf);
try {
runAndVerifySubmission(conf, shouldSucceed, queue, null);
@@ -395,9 +409,9 @@
private void runAndVerifySubmission(JobConf conf, boolean shouldSucceed,
String queue, String userInfo)
- throws IOException {
+ throws IOException, InterruptedException, ClassNotFoundException {
try {
- RunningJob rjob = submitSleepJob(1, 1, 100, 100, true, userInfo, queue);
+ Job rjob = submitSleepJob(1, 1, 100, 100, true, null, queue);
if (shouldSucceed) {
assertTrue(rjob.isSuccessful());
} else {
@@ -428,10 +442,10 @@
}
private void verifyJobKill(JobConf conf, boolean shouldSucceed)
- throws IOException {
+ throws IOException, InterruptedException, ClassNotFoundException {
setUpCluster(conf);
try {
- RunningJob rjob = submitSleepJob(1, 1, 1000, 1000, false);
+ Job rjob = submitSleepJob(1, 1, 1000, 1000, false);
assertFalse(rjob.isComplete());
while(rjob.mapProgress() == 0.0f) {
try {
@@ -441,7 +455,7 @@
}
}
rjob.killJob();
- while(rjob.cleanupProgress() == 0.0f) {
+ while (!rjob.isComplete()) {
try {
Thread.sleep(10);
} catch (InterruptedException ie) {
@@ -449,7 +463,7 @@
}
}
if (shouldSucceed) {
- assertTrue(rjob.isComplete());
+ assertTrue(!rjob.isSuccessful());
} else {
fail("Job kill should have failed.");
}
@@ -470,16 +484,18 @@
private void verifyJobKillAsOtherUser(JobConf conf, boolean shouldSucceed,
String otherUserInfo)
- throws IOException {
+ throws IOException, InterruptedException, ClassNotFoundException {
setUpCluster(conf);
try {
// submit a job as another user.
String userInfo = otherUserInfo;
- RunningJob rjob = submitSleepJob(1, 1, 1000, 1000, false, userInfo);
- assertFalse(rjob.isComplete());
+ Job job = submitSleepJob(1, 1, 1000, 1000, false, userInfo);
+ assertFalse(job.isComplete());
//try to kill as self
try {
+ JobClient jc = new JobClient(miniMRCluster.createJobConf());
+ RunningJob rjob = jc.getJob((JobID)job.getID());
rjob.killJob();
if (!shouldSucceed) {
fail("should fail kill operation");
@@ -495,7 +511,7 @@
"ADMINISTER_JOBS on queue default"));
}
//wait for job to complete on its own
- while (!rjob.isComplete()) {
+ while (!job.isComplete()) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
@@ -509,16 +525,18 @@
private void verifyJobPriorityChangeAsOtherUser(JobConf conf,
boolean shouldSucceed, String otherUserInfo)
- throws IOException {
+ throws IOException, InterruptedException, ClassNotFoundException {
setUpCluster(conf);
try {
// submit job as another user.
String userInfo = otherUserInfo;
- RunningJob rjob = submitSleepJob(1, 1, 1000, 1000, false, userInfo);
- assertFalse(rjob.isComplete());
+ Job job = submitSleepJob(1, 1, 1000, 1000, false, userInfo);
+ assertFalse(job.isComplete());
// try to change priority as self
try {
+ JobClient jc = new JobClient(miniMRCluster.createJobConf());
+ RunningJob rjob = jc.getJob((JobID)job.getID());
rjob.setJobPriority("VERY_LOW");
if (!shouldSucceed) {
fail("changing priority should fail.");
@@ -531,7 +549,7 @@
"ADMINISTER_JOBS on queue default"));
}
//wait for job to complete on its own
- while (!rjob.isComplete()) {
+ while (!job.isComplete()) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
@@ -556,49 +574,47 @@
if (miniDFSCluster != null) { miniDFSCluster.shutdown(); }
}
- private RunningJob submitSleepJob(int numMappers, int numReducers,
- long mapSleepTime, long reduceSleepTime,
- boolean shouldComplete)
- throws IOException {
+ private Job submitSleepJob(int numMappers, int numReducers,
+ long mapSleepTime, long reduceSleepTime,
+ boolean shouldComplete)
+ throws IOException, InterruptedException, ClassNotFoundException {
return submitSleepJob(numMappers, numReducers, mapSleepTime,
reduceSleepTime, shouldComplete, null);
}
- private RunningJob submitSleepJob(int numMappers, int numReducers,
- long mapSleepTime, long reduceSleepTime,
- boolean shouldComplete, String userInfo)
- throws IOException {
+ private Job submitSleepJob(int numMappers, int numReducers,
+ long mapSleepTime, long reduceSleepTime,
+ boolean shouldComplete, String userInfo)
+ throws IOException, InterruptedException, ClassNotFoundException {
return submitSleepJob(numMappers, numReducers, mapSleepTime,
reduceSleepTime, shouldComplete, userInfo, null);
}
- private RunningJob submitSleepJob(int numMappers, int numReducers,
- long mapSleepTime, long reduceSleepTime,
- boolean shouldComplete, String userInfo,
- String queueName)
- throws IOException {
- JobConf clientConf = new JobConf();
+ private Job submitSleepJob(int numMappers, int numReducers,
+ long mapSleepTime, long reduceSleepTime,
+ boolean shouldComplete, String userInfo,
+ String queueName)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ Configuration clientConf = new Configuration();
clientConf.set("mapred.job.tracker", "localhost:"
+ miniMRCluster.getJobTrackerPort());
- SleepJob job = new SleepJob();
- job.setConf(clientConf);
- clientConf = job.setupJobConf(numMappers, numReducers,
- mapSleepTime, (int)mapSleepTime/100,
- reduceSleepTime, (int)reduceSleepTime/100);
- if (queueName != null) {
- clientConf.setQueueName(queueName);
- }
- JobConf jc = new JobConf(clientConf);
if (userInfo != null) {
- jc.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, userInfo);
+ clientConf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, userInfo);
+ }
+ if (queueName != null) {
+ clientConf.set("mapred.job.queue.name", queueName);
}
- RunningJob rJob = null;
+ SleepJob sleep = new SleepJob();
+ sleep.setConf(clientConf);
+ Job job = sleep.createJob(numMappers, numReducers,
+ mapSleepTime, (int)mapSleepTime/100,
+ reduceSleepTime, (int)reduceSleepTime/100);
if (shouldComplete) {
- rJob = JobClient.runJob(jc);
+ job.waitForCompletion(false);
} else {
- rJob = new JobClient(clientConf).submitJob(jc);
+ job.submit();
}
- return rJob;
+ return job;
}
}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java?rev=769330&r1=769329&r2=769330&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java Tue Apr 28 10:55:30 2009
@@ -27,6 +27,7 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.MemoryCalculatorPlugin;
import org.apache.hadoop.util.ProcfsBasedProcessTree;
import org.apache.hadoop.util.StringUtils;
@@ -66,9 +67,9 @@
}
}
- private void runSleepJob(JobConf conf) throws Exception {
+ private int runSleepJob(JobConf conf) throws Exception {
String[] args = { "-m", "3", "-r", "1", "-mt", "3000", "-rt", "1000" };
- ToolRunner.run(conf, new SleepJob(), args);
+ return ToolRunner.run(conf, new SleepJob(), args);
}
private void runAndCheckSuccessfulJob(JobConf conf)
@@ -87,16 +88,15 @@
Matcher mat = null;
// Start the job.
- boolean success = true;
+ int ret;
try {
- runSleepJob(conf);
- success = true;
+ ret = runSleepJob(conf);
} catch (Exception e) {
- success = false;
+ ret = 1;
}
// Job has to succeed
- assertTrue(success);
+ assertTrue(ret == 0);
JobClient jClient = new JobClient(conf);
JobStatus[] jStatus = jClient.getAllJobs();
@@ -279,16 +279,15 @@
+ nn.getNameNodeAddress().getPort());
// Start the job.
- boolean success = true;
+ int ret = 0;
try {
- runSleepJob(conf);
- success = true;
+ ret = runSleepJob(conf);
} catch (Exception e) {
- success = false;
+ ret = 1;
}
// Job has to fail
- assertFalse(success);
+ assertTrue(ret != 0);
JobClient jClient = new JobClient(conf);
JobStatus[] jStatus = jClient.getAllJobs();
@@ -382,12 +381,13 @@
SleepJob sleepJob = new SleepJob();
sleepJob.setConf(conf);
// Start the job
- RunningJob job =
- jClient.submitJob(sleepJob.setupJobConf(1, 1, 5000, 1, 1000, 1));
+ Job job = sleepJob.createJob(1, 1, 5000, 1, 1000, 1);
+ job.submit();
boolean TTOverFlowMsgPresent = false;
while (true) {
// Set-up tasks are the first to be launched.
- TaskReport[] setUpReports = jt.getSetupTaskReports(job.getID());
+ TaskReport[] setUpReports = jClient.getSetupTaskReports(
+ (org.apache.hadoop.mapred.JobID)job.getID());
for (TaskReport tr : setUpReports) {
String[] diag = tr.getDiagnostics();
for (String str : diag) {
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java?rev=769330&r1=769329&r2=769330&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java Tue Apr 28 10:55:30 2009
@@ -20,12 +20,11 @@
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.examples.SleepJob.SleepInputFormat;
+import org.apache.hadoop.mapred.UtilsForTests.RandomInputFormat;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import junit.framework.TestCase;
@@ -37,16 +36,16 @@
final Path inDir = new Path("/testing");
final Path outDir = new Path("/output");
- public static class SleepJobFailOnHost extends MapReduceBase
- implements Mapper<IntWritable, IntWritable, IntWritable, NullWritable> {
+ public static class FailOnHostMapper extends MapReduceBase
+ implements Mapper<Text, Text, Text, Text> {
String hostname = "";
public void configure(JobConf job) {
this.hostname = job.get("slave.host.name");
}
- public void map(IntWritable key, IntWritable value,
- OutputCollector<IntWritable, NullWritable> output,
+ public void map(Text key, Text value,
+ OutputCollector<Text, Text> output,
Reporter reporter)
throws IOException {
if (this.hostname.equals(hosts[0])) {
@@ -81,11 +80,11 @@
job.setInt("mapred.max.tracker.failures", 1);
job.setNumMapTasks(30);
job.setNumReduceTasks(0);
- job.setMapperClass(SleepJobFailOnHost.class);
- job.setMapOutputKeyClass(IntWritable.class);
- job.setMapOutputValueClass(NullWritable.class);
+ job.setMapperClass(FailOnHostMapper.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
job.setOutputFormat(NullOutputFormat.class);
- job.setInputFormat(SleepInputFormat.class);
+ job.setInputFormat(RandomInputFormat.class);
FileInputFormat.setInputPaths(job, inDir);
FileOutputFormat.setOutputPath(job, outDir);