You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2011/05/13 09:19:27 UTC

svn commit: r1102584 - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ mr-client/hadoop-map...

Author: sharad
Date: Fri May 13 07:19:26 2011
New Revision: 1102584

URL: http://svn.apache.org/viewvc?rev=1102584&view=rev
Log:
Support mapreduce 0.20 APIs.

Added:
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/CustomOutputCommitter.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/FailMapper.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMROldApiJobs.java
Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobContextImpl.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1102584&r1=1102583&r2=1102584&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Fri May 13 07:19:26 2011
@@ -3,6 +3,7 @@ Hadoop MapReduce Change Log
 Trunk (unreleased changes)
 
   MAPREDUCE-279
+    Support mapreduce old (0.20) APIs. (sharad)
 
     Fixed reservation's bad interaction with delay scheduling in CS.
     (acmurthy)

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/CustomOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/CustomOutputCommitter.java?rev=1102584&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/CustomOutputCommitter.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/CustomOutputCommitter.java Fri May 13 07:19:26 2011
@@ -0,0 +1,67 @@
+package org.apache.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+
+public class CustomOutputCommitter extends OutputCommitter {
+
+  public static final String JOB_SETUP_FILE_NAME = "_job_setup";
+  public static final String JOB_COMMIT_FILE_NAME = "_job_commit";
+  public static final String JOB_ABORT_FILE_NAME = "_job_abort";
+  public static final String TASK_SETUP_FILE_NAME = "_task_setup";
+  public static final String TASK_ABORT_FILE_NAME = "_task_abort";
+  public static final String TASK_COMMIT_FILE_NAME = "_task_commit";
+
+  @Override
+  public void setupJob(JobContext jobContext) throws IOException {
+    writeFile(jobContext.getJobConf(), JOB_SETUP_FILE_NAME);
+  }
+
+  @Override
+  public void commitJob(JobContext jobContext) throws IOException {
+    super.commitJob(jobContext);
+    writeFile(jobContext.getJobConf(), JOB_COMMIT_FILE_NAME);
+  }
+
+  @Override
+  public void abortJob(JobContext jobContext, int status) 
+  throws IOException {
+    super.abortJob(jobContext, status);
+    writeFile(jobContext.getJobConf(), JOB_ABORT_FILE_NAME);
+  }
+  
+  @Override
+  public void setupTask(TaskAttemptContext taskContext) throws IOException {
+    writeFile(taskContext.getJobConf(), TASK_SETUP_FILE_NAME);
+  }
+
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext taskContext)
+      throws IOException {
+    return true;
+  }
+
+  @Override
+  public void commitTask(TaskAttemptContext taskContext) throws IOException {
+    writeFile(taskContext.getJobConf(), TASK_COMMIT_FILE_NAME);
+  }
+
+  @Override
+  public void abortTask(TaskAttemptContext taskContext) throws IOException {
+    writeFile(taskContext.getJobConf(), TASK_ABORT_FILE_NAME);
+  }
+
+  private void writeFile(JobConf conf , String filename) throws IOException {
+    System.out.println("writing file ----" + filename);
+    Path outputPath = FileOutputFormat.getOutputPath(conf);
+    FileSystem fs = outputPath.getFileSystem(conf);
+    fs.create(new Path(outputPath, filename)).close();
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/FailMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/FailMapper.java?rev=1102584&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/FailMapper.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/FailMapper.java Fri May 13 07:19:26 2011
@@ -0,0 +1,23 @@
+package org.apache.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+// Mapper that fails
+public class FailMapper extends MapReduceBase implements
+    Mapper<WritableComparable, Writable, WritableComparable, Writable> {
+
+  public void map(WritableComparable key, Writable value,
+      OutputCollector<WritableComparable, Writable> out, Reporter reporter)
+      throws IOException {
+    // NOTE- the next line is required for the TestDebugScript test to succeed
+    System.err.println("failing map");
+    throw new RuntimeException("failing map");
+  }
+}

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1102584&r1=1102583&r2=1102584&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Fri May 13 07:19:26 2011
@@ -39,7 +39,9 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileOutputCommitter;
 import org.apache.hadoop.mapred.JobACLsManager;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -57,8 +59,8 @@ import org.apache.hadoop.mapreduce.lib.c
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
-import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.mapreduce.v2.api.records.Counter;
@@ -693,24 +695,8 @@ public class JobImpl implements org.apac
       job.startTime = job.clock.getTime();
       try {
         setup(job);
-        job.jobContext = new JobContextImpl(job.conf,
-            job.oldJobId);
         job.fs = FileSystem.get(job.conf);
 
-        org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId 
-            attemptID = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId.class);
-        attemptID.setTaskId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskId.class));
-        attemptID.getTaskId().setJobId(job.jobId);
-        attemptID.getTaskId().setTaskType(TaskType.MAP);//TODO:fix task type ??
-        
-        TaskAttemptContext taskContext =
-            new TaskAttemptContextImpl(job.conf,
-                TypeConverter.fromYarn(attemptID));
-        OutputFormat outputFormat = ReflectionUtils.newInstance(taskContext
-            .getOutputFormatClass(), job.conf);
-        // TODO: support for old/new committer..
-        job.committer = outputFormat.getOutputCommitter(taskContext);
-
         //log to job history
         //TODO_JH_Validate the values being sent here (along with defaults). Ideally for all JH evnts.
         JobSubmittedEvent jse =
@@ -727,6 +713,43 @@ public class JobImpl implements org.apac
 
         checkTaskLimits();
 
+        
+        boolean newApiCommitter = false;
+        if ((job.numReduceTasks > 0 && 
+            job.conf.getBoolean("mapred.reducer.new-api", false)) ||
+              (job.numReduceTasks == 0 && 
+               job.conf.getBoolean("mapred.mapper.new-api", false)))  {
+          newApiCommitter = true;
+          LOG.info("Using mapred newApiCommitter.");
+        }
+        
+        LOG.info("OutputCommitter set in config " + job.conf.get("mapred.output.committer.class"));
+        
+        if (newApiCommitter) {
+          job.jobContext = new JobContextImpl(job.conf,
+              job.oldJobId);
+          org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = RecordFactoryProvider
+              .getRecordFactory(null)
+              .newRecordInstance(
+                  org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId.class);
+          attemptID.setTaskId(RecordFactoryProvider.getRecordFactory(null)
+              .newRecordInstance(TaskId.class));
+          attemptID.getTaskId().setJobId(job.jobId);
+          attemptID.getTaskId().setTaskType(TaskType.MAP);
+          TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.conf,
+              TypeConverter.fromYarn(attemptID));
+          OutputFormat outputFormat = ReflectionUtils.newInstance(
+              taskContext.getOutputFormatClass(), job.conf);
+          job.committer = outputFormat.getOutputCommitter(taskContext);
+        } else {
+          job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
+              new JobConf(job.conf), job.oldJobId);
+          job.committer = ReflectionUtils.newInstance(
+              job.conf.getClass("mapred.output.committer.class", FileOutputCommitter.class,
+              org.apache.hadoop.mapred.OutputCommitter.class), job.conf);
+        }
+        LOG.info("OutputCommitter is " + job.committer.getClass().getName());
+        
         long inputLength = 0;
         for (int i = 0; i < job.numMapTasks; ++i) {
           inputLength += taskSplitMetaInfo[i].getInputDataLength();

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobContextImpl.java?rev=1102584&r1=1102583&r2=1102584&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobContextImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobContextImpl.java Fri May 13 07:19:26 2011
@@ -33,14 +33,14 @@ public class JobContextImpl 
   private JobConf job;
   private Progressable progress;
 
-  JobContextImpl(JobConf conf, org.apache.hadoop.mapreduce.JobID jobId, 
+  public JobContextImpl(JobConf conf, org.apache.hadoop.mapreduce.JobID jobId, 
                  Progressable progress) {
     super(conf, jobId);
     this.job = conf;
     this.progress = progress;
   }
 
-  JobContextImpl(JobConf conf, org.apache.hadoop.mapreduce.JobID jobId) {
+  public JobContextImpl(JobConf conf, org.apache.hadoop.mapreduce.JobID jobId) {
     this(conf, jobId, Reporter.NULL);
   }
   

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMROldApiJobs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMROldApiJobs.java?rev=1102584&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMROldApiJobs.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMROldApiJobs.java Fri May 13 07:19:26 2011
@@ -0,0 +1,196 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.CustomOutputCommitter;
+import org.apache.hadoop.FailMapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestMROldApiJobs {
+
+  private static final Log LOG = LogFactory.getLog(TestMROldApiJobs.class);
+
+  protected static MiniMRYarnCluster mrCluster;
+
+  @BeforeClass
+  public static void setup() {
+
+    if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+               + " not found. Not running test.");
+      return;
+    }
+
+    if (mrCluster == null) {
+      mrCluster = new MiniMRYarnCluster(TestMROldApiJobs.class.getName());
+      mrCluster.init(new Configuration());
+      mrCluster.start();
+    }
+
+    // TestMRJobs is for testing non-uberized operation only; see TestUberAM
+    // for corresponding uberized tests.
+    mrCluster.getConfig().setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    if (mrCluster != null) {
+      mrCluster.stop();
+      mrCluster = null;
+    }
+  }
+
+  @Test
+  public void testJobSucceed() throws IOException, InterruptedException,
+      ClassNotFoundException { 
+
+    LOG.info("\n\n\nStarting testJobSucceed().");
+
+    if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+               + " not found. Not running test.");
+      return;
+    }
+
+    JobConf conf = new JobConf(mrCluster.getConfig());
+    
+    Path in = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
+      "in");
+    Path out = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
+      "out"); 
+    runJobSucceed(conf, in, out);
+    
+    FileSystem fs = FileSystem.get(conf);
+    Assert.assertTrue(fs.exists(new Path(out, CustomOutputCommitter.JOB_SETUP_FILE_NAME)));
+    Assert.assertFalse(fs.exists(new Path(out, CustomOutputCommitter.JOB_ABORT_FILE_NAME)));
+    Assert.assertTrue(fs.exists(new Path(out, CustomOutputCommitter.JOB_COMMIT_FILE_NAME)));
+    Assert.assertTrue(fs.exists(new Path(out, CustomOutputCommitter.TASK_SETUP_FILE_NAME)));
+    Assert.assertFalse(fs.exists(new Path(out, CustomOutputCommitter.TASK_ABORT_FILE_NAME)));
+    Assert.assertTrue(fs.exists(new Path(out, CustomOutputCommitter.TASK_COMMIT_FILE_NAME)));
+  }
+
+  @Test
+  public void testJobFail() throws IOException, InterruptedException,
+      ClassNotFoundException { 
+
+    LOG.info("\n\n\nStarting testJobFail().");
+
+    if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+               + " not found. Not running test.");
+      return;
+    }
+
+    JobConf conf = new JobConf(mrCluster.getConfig());
+    
+    Path in = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
+      "fail-in");
+    Path out = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
+      "fail-out"); 
+    runJobFail(conf, in, out);
+    
+    FileSystem fs = FileSystem.get(conf);
+    Assert.assertTrue(fs.exists(new Path(out, CustomOutputCommitter.JOB_SETUP_FILE_NAME)));
+    Assert.assertTrue(fs.exists(new Path(out, CustomOutputCommitter.JOB_ABORT_FILE_NAME)));
+    Assert.assertFalse(fs.exists(new Path(out, CustomOutputCommitter.JOB_COMMIT_FILE_NAME)));
+    Assert.assertTrue(fs.exists(new Path(out, CustomOutputCommitter.TASK_SETUP_FILE_NAME)));
+    Assert.assertTrue(fs.exists(new Path(out, CustomOutputCommitter.TASK_ABORT_FILE_NAME)));
+    Assert.assertFalse(fs.exists(new Path(out, CustomOutputCommitter.TASK_COMMIT_FILE_NAME)));
+  }
+
+  //Run a job that will be failed and wait until it completes
+  public static void runJobFail(JobConf conf, Path inDir, Path outDir)
+         throws IOException, InterruptedException {
+    conf.setJobName("test-job-fail");
+    conf.setMapperClass(FailMapper.class);
+    conf.setReducerClass(IdentityReducer.class);
+    conf.setMaxMapAttempts(1);
+    
+    boolean success = runJob(conf, inDir, outDir, 1, 0);
+    Assert.assertFalse("Job expected to fail succeeded", success);
+  }
+
+  //Run a job that will be succeeded and wait until it completes
+  public static void runJobSucceed(JobConf conf, Path inDir, Path outDir)
+         throws IOException, InterruptedException {
+    conf.setJobName("test-job-succeed");
+    conf.setMapperClass(IdentityMapper.class);
+    conf.setReducerClass(IdentityReducer.class);
+    
+    boolean success = runJob(conf, inDir, outDir, 1 , 1);
+    Assert.assertTrue("Job expected to succeed failed", success);
+  }
+
+  static boolean runJob(JobConf conf, Path inDir, Path outDir, int numMaps, 
+                           int numReds) throws IOException, InterruptedException {
+
+    FileSystem fs = FileSystem.get(conf);
+    if (fs.exists(outDir)) {
+      fs.delete(outDir, true);
+    }
+    if (!fs.exists(inDir)) {
+      fs.mkdirs(inDir);
+    }
+    String input = "The quick brown fox\n" + "has many silly\n"
+        + "red fox sox\n";
+    for (int i = 0; i < numMaps; ++i) {
+      DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
+      file.writeBytes(input);
+      file.close();
+    }    
+
+    conf.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath());
+    conf.setOutputCommitter(CustomOutputCommitter.class);
+    conf.setInputFormat(TextInputFormat.class);
+    conf.setOutputKeyClass(LongWritable.class);
+    conf.setOutputValueClass(Text.class);
+
+    FileInputFormat.setInputPaths(conf, inDir);
+    FileOutputFormat.setOutputPath(conf, outDir);
+    conf.setNumMapTasks(numMaps);
+    conf.setNumReduceTasks(numReds);
+
+    JobClient jobClient = new JobClient(conf);
+    RunningJob job = jobClient.submitJob(conf);
+    return jobClient.monitorAndPrintJob(conf, job);
+  }
+}