You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2011/05/13 09:19:27 UTC
svn commit: r1102584 - in /hadoop/mapreduce/branches/MR-279: ./
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/
mr-client/hadoop-map...
Author: sharad
Date: Fri May 13 07:19:26 2011
New Revision: 1102584
URL: http://svn.apache.org/viewvc?rev=1102584&view=rev
Log:
Support mapreduce 0.20 APIs.
Added:
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/CustomOutputCommitter.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/FailMapper.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMROldApiJobs.java
Modified:
hadoop/mapreduce/branches/MR-279/CHANGES.txt
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobContextImpl.java
Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1102584&r1=1102583&r2=1102584&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Fri May 13 07:19:26 2011
@@ -3,6 +3,7 @@ Hadoop MapReduce Change Log
Trunk (unreleased changes)
MAPREDUCE-279
+ Support mapreduce old (0.20) APIs. (sharad)
Fixed reservation's bad interaction with delay scheduling in CS.
(acmurthy)
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/CustomOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/CustomOutputCommitter.java?rev=1102584&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/CustomOutputCommitter.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/CustomOutputCommitter.java Fri May 13 07:19:26 2011
@@ -0,0 +1,67 @@
+package org.apache.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+
+public class CustomOutputCommitter extends OutputCommitter {
+
+ public static final String JOB_SETUP_FILE_NAME = "_job_setup";
+ public static final String JOB_COMMIT_FILE_NAME = "_job_commit";
+ public static final String JOB_ABORT_FILE_NAME = "_job_abort";
+ public static final String TASK_SETUP_FILE_NAME = "_task_setup";
+ public static final String TASK_ABORT_FILE_NAME = "_task_abort";
+ public static final String TASK_COMMIT_FILE_NAME = "_task_commit";
+
+ @Override
+ public void setupJob(JobContext jobContext) throws IOException {
+ writeFile(jobContext.getJobConf(), JOB_SETUP_FILE_NAME);
+ }
+
+ @Override
+ public void commitJob(JobContext jobContext) throws IOException {
+ super.commitJob(jobContext);
+ writeFile(jobContext.getJobConf(), JOB_COMMIT_FILE_NAME);
+ }
+
+ @Override
+ public void abortJob(JobContext jobContext, int status)
+ throws IOException {
+ super.abortJob(jobContext, status);
+ writeFile(jobContext.getJobConf(), JOB_ABORT_FILE_NAME);
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext taskContext) throws IOException {
+ writeFile(taskContext.getJobConf(), TASK_SETUP_FILE_NAME);
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext taskContext)
+ throws IOException {
+ return true;
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext taskContext) throws IOException {
+ writeFile(taskContext.getJobConf(), TASK_COMMIT_FILE_NAME);
+ }
+
+ @Override
+ public void abortTask(TaskAttemptContext taskContext) throws IOException {
+ writeFile(taskContext.getJobConf(), TASK_ABORT_FILE_NAME);
+ }
+
+ private void writeFile(JobConf conf , String filename) throws IOException {
+ System.out.println("writing file ----" + filename);
+ Path outputPath = FileOutputFormat.getOutputPath(conf);
+ FileSystem fs = outputPath.getFileSystem(conf);
+ fs.create(new Path(outputPath, filename)).close();
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/FailMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/FailMapper.java?rev=1102584&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/FailMapper.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/FailMapper.java Fri May 13 07:19:26 2011
@@ -0,0 +1,23 @@
+package org.apache.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+// Mapper that fails
+public class FailMapper extends MapReduceBase implements
+ Mapper<WritableComparable, Writable, WritableComparable, Writable> {
+
+ public void map(WritableComparable key, Writable value,
+ OutputCollector<WritableComparable, Writable> out, Reporter reporter)
+ throws IOException {
+ // NOTE- the next line is required for the TestDebugScript test to succeed
+ System.err.println("failing map");
+ throw new RuntimeException("failing map");
+ }
+}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1102584&r1=1102583&r2=1102584&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Fri May 13 07:19:26 2011
@@ -39,7 +39,9 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobACLsManager;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -57,8 +59,8 @@ import org.apache.hadoop.mapreduce.lib.c
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
-import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.Counter;
@@ -693,24 +695,8 @@ public class JobImpl implements org.apac
job.startTime = job.clock.getTime();
try {
setup(job);
- job.jobContext = new JobContextImpl(job.conf,
- job.oldJobId);
job.fs = FileSystem.get(job.conf);
- org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId
- attemptID = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId.class);
- attemptID.setTaskId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskId.class));
- attemptID.getTaskId().setJobId(job.jobId);
- attemptID.getTaskId().setTaskType(TaskType.MAP);//TODO:fix task type ??
-
- TaskAttemptContext taskContext =
- new TaskAttemptContextImpl(job.conf,
- TypeConverter.fromYarn(attemptID));
- OutputFormat outputFormat = ReflectionUtils.newInstance(taskContext
- .getOutputFormatClass(), job.conf);
- // TODO: support for old/new committer..
- job.committer = outputFormat.getOutputCommitter(taskContext);
-
//log to job history
//TODO_JH_Validate the values being sent here (along with defaults). Ideally for all JH evnts.
JobSubmittedEvent jse =
@@ -727,6 +713,43 @@ public class JobImpl implements org.apac
checkTaskLimits();
+
+ boolean newApiCommitter = false;
+ if ((job.numReduceTasks > 0 &&
+ job.conf.getBoolean("mapred.reducer.new-api", false)) ||
+ (job.numReduceTasks == 0 &&
+ job.conf.getBoolean("mapred.mapper.new-api", false))) {
+ newApiCommitter = true;
+ LOG.info("Using mapred newApiCommitter.");
+ }
+
+ LOG.info("OutputCommitter set in config " + job.conf.get("mapred.output.committer.class"));
+
+ if (newApiCommitter) {
+ job.jobContext = new JobContextImpl(job.conf,
+ job.oldJobId);
+ org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = RecordFactoryProvider
+ .getRecordFactory(null)
+ .newRecordInstance(
+ org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId.class);
+ attemptID.setTaskId(RecordFactoryProvider.getRecordFactory(null)
+ .newRecordInstance(TaskId.class));
+ attemptID.getTaskId().setJobId(job.jobId);
+ attemptID.getTaskId().setTaskType(TaskType.MAP);
+ TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.conf,
+ TypeConverter.fromYarn(attemptID));
+ OutputFormat outputFormat = ReflectionUtils.newInstance(
+ taskContext.getOutputFormatClass(), job.conf);
+ job.committer = outputFormat.getOutputCommitter(taskContext);
+ } else {
+ job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
+ new JobConf(job.conf), job.oldJobId);
+ job.committer = ReflectionUtils.newInstance(
+ job.conf.getClass("mapred.output.committer.class", FileOutputCommitter.class,
+ org.apache.hadoop.mapred.OutputCommitter.class), job.conf);
+ }
+ LOG.info("OutputCommitter is " + job.committer.getClass().getName());
+
long inputLength = 0;
for (int i = 0; i < job.numMapTasks; ++i) {
inputLength += taskSplitMetaInfo[i].getInputDataLength();
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobContextImpl.java?rev=1102584&r1=1102583&r2=1102584&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobContextImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobContextImpl.java Fri May 13 07:19:26 2011
@@ -33,14 +33,14 @@ public class JobContextImpl
private JobConf job;
private Progressable progress;
- JobContextImpl(JobConf conf, org.apache.hadoop.mapreduce.JobID jobId,
+ public JobContextImpl(JobConf conf, org.apache.hadoop.mapreduce.JobID jobId,
Progressable progress) {
super(conf, jobId);
this.job = conf;
this.progress = progress;
}
- JobContextImpl(JobConf conf, org.apache.hadoop.mapreduce.JobID jobId) {
+ public JobContextImpl(JobConf conf, org.apache.hadoop.mapreduce.JobID jobId) {
this(conf, jobId, Reporter.NULL);
}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMROldApiJobs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMROldApiJobs.java?rev=1102584&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMROldApiJobs.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMROldApiJobs.java Fri May 13 07:19:26 2011
@@ -0,0 +1,196 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.CustomOutputCommitter;
+import org.apache.hadoop.FailMapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestMROldApiJobs {
+
+ private static final Log LOG = LogFactory.getLog(TestMROldApiJobs.class);
+
+ protected static MiniMRYarnCluster mrCluster;
+
+ @BeforeClass
+ public static void setup() {
+
+ if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+ LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ + " not found. Not running test.");
+ return;
+ }
+
+ if (mrCluster == null) {
+ mrCluster = new MiniMRYarnCluster(TestMROldApiJobs.class.getName());
+ mrCluster.init(new Configuration());
+ mrCluster.start();
+ }
+
+ // TestMRJobs is for testing non-uberized operation only; see TestUberAM
+ // for corresponding uberized tests.
+ mrCluster.getConfig().setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ if (mrCluster != null) {
+ mrCluster.stop();
+ mrCluster = null;
+ }
+ }
+
+ @Test
+ public void testJobSucceed() throws IOException, InterruptedException,
+ ClassNotFoundException {
+
+ LOG.info("\n\n\nStarting testJobSucceed().");
+
+ if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+ LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ + " not found. Not running test.");
+ return;
+ }
+
+ JobConf conf = new JobConf(mrCluster.getConfig());
+
+ Path in = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
+ "in");
+ Path out = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
+ "out");
+ runJobSucceed(conf, in, out);
+
+ FileSystem fs = FileSystem.get(conf);
+ Assert.assertTrue(fs.exists(new Path(out, CustomOutputCommitter.JOB_SETUP_FILE_NAME)));
+ Assert.assertFalse(fs.exists(new Path(out, CustomOutputCommitter.JOB_ABORT_FILE_NAME)));
+ Assert.assertTrue(fs.exists(new Path(out, CustomOutputCommitter.JOB_COMMIT_FILE_NAME)));
+ Assert.assertTrue(fs.exists(new Path(out, CustomOutputCommitter.TASK_SETUP_FILE_NAME)));
+ Assert.assertFalse(fs.exists(new Path(out, CustomOutputCommitter.TASK_ABORT_FILE_NAME)));
+ Assert.assertTrue(fs.exists(new Path(out, CustomOutputCommitter.TASK_COMMIT_FILE_NAME)));
+ }
+
+ @Test
+ public void testJobFail() throws IOException, InterruptedException,
+ ClassNotFoundException {
+
+ LOG.info("\n\n\nStarting testJobFail().");
+
+ if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+ LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ + " not found. Not running test.");
+ return;
+ }
+
+ JobConf conf = new JobConf(mrCluster.getConfig());
+
+ Path in = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
+ "fail-in");
+ Path out = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
+ "fail-out");
+ runJobFail(conf, in, out);
+
+ FileSystem fs = FileSystem.get(conf);
+ Assert.assertTrue(fs.exists(new Path(out, CustomOutputCommitter.JOB_SETUP_FILE_NAME)));
+ Assert.assertTrue(fs.exists(new Path(out, CustomOutputCommitter.JOB_ABORT_FILE_NAME)));
+ Assert.assertFalse(fs.exists(new Path(out, CustomOutputCommitter.JOB_COMMIT_FILE_NAME)));
+ Assert.assertTrue(fs.exists(new Path(out, CustomOutputCommitter.TASK_SETUP_FILE_NAME)));
+ Assert.assertTrue(fs.exists(new Path(out, CustomOutputCommitter.TASK_ABORT_FILE_NAME)));
+ Assert.assertFalse(fs.exists(new Path(out, CustomOutputCommitter.TASK_COMMIT_FILE_NAME)));
+ }
+
+ //Run a job that will be failed and wait until it completes
+ public static void runJobFail(JobConf conf, Path inDir, Path outDir)
+ throws IOException, InterruptedException {
+ conf.setJobName("test-job-fail");
+ conf.setMapperClass(FailMapper.class);
+ conf.setReducerClass(IdentityReducer.class);
+ conf.setMaxMapAttempts(1);
+
+ boolean success = runJob(conf, inDir, outDir, 1, 0);
+ Assert.assertFalse("Job expected to fail succeeded", success);
+ }
+
+ //Run a job that will be succeeded and wait until it completes
+ public static void runJobSucceed(JobConf conf, Path inDir, Path outDir)
+ throws IOException, InterruptedException {
+ conf.setJobName("test-job-succeed");
+ conf.setMapperClass(IdentityMapper.class);
+ conf.setReducerClass(IdentityReducer.class);
+
+ boolean success = runJob(conf, inDir, outDir, 1 , 1);
+ Assert.assertTrue("Job expected to succeed failed", success);
+ }
+
+ static boolean runJob(JobConf conf, Path inDir, Path outDir, int numMaps,
+ int numReds) throws IOException, InterruptedException {
+
+ FileSystem fs = FileSystem.get(conf);
+ if (fs.exists(outDir)) {
+ fs.delete(outDir, true);
+ }
+ if (!fs.exists(inDir)) {
+ fs.mkdirs(inDir);
+ }
+ String input = "The quick brown fox\n" + "has many silly\n"
+ + "red fox sox\n";
+ for (int i = 0; i < numMaps; ++i) {
+ DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
+ file.writeBytes(input);
+ file.close();
+ }
+
+ conf.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath());
+ conf.setOutputCommitter(CustomOutputCommitter.class);
+ conf.setInputFormat(TextInputFormat.class);
+ conf.setOutputKeyClass(LongWritable.class);
+ conf.setOutputValueClass(Text.class);
+
+ FileInputFormat.setInputPaths(conf, inDir);
+ FileOutputFormat.setOutputPath(conf, outDir);
+ conf.setNumMapTasks(numMaps);
+ conf.setNumReduceTasks(numReds);
+
+ JobClient jobClient = new JobClient(conf);
+ RunningJob job = jobClient.submitJob(conf);
+ return jobClient.monitorAndPrintJob(conf, job);
+ }
+}