You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/03/17 21:21:54 UTC

svn commit: r1082677 [2/38] - in /hadoop/mapreduce/branches/MR-279: ./ assembly/ ivy/ mr-client/ mr-client/hadoop-mapreduce-client-app/ mr-client/hadoop-mapreduce-client-app/src/ mr-client/hadoop-mapreduce-client-app/src/main/ mr-client/hadoop-mapreduc...

Modified: hadoop/mapreduce/branches/MR-279/.gitignore
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/.gitignore?rev=1082677&r1=1082676&r2=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/.gitignore (original)
+++ hadoop/mapreduce/branches/MR-279/.gitignore Thu Mar 17 20:21:13 2011
@@ -42,3 +42,7 @@ src/docs/build
 src/docs/cn/build
 src/docs/cn/src/documentation/sitemap.xmap
 src/docs/cn/uming.conf
+.gitignore
+target
+SecurityAuth.audit
+conf/yarn-site.xml

Added: hadoop/mapreduce/branches/MR-279/INSTALL
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/INSTALL?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/INSTALL (added)
+++ hadoop/mapreduce/branches/MR-279/INSTALL Thu Mar 17 20:21:13 2011
@@ -0,0 +1,61 @@
+To compile  Hadoop Mapreduce next following, do the following:
+
+Step 1) Download Hadoop Common
+
+svn checkout http://svn.apache.org/repos/asf/hadoop/common/branches/yahoo-merge/
+ant veryclean mvn-install 
+
+Step 2) Download Hadoop HDFS 
+
+svn checkout http://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1052/
+ant veryclean mvn-install -Dresolvers=internal 
+
+Step 3) Go to the root directory of hadoop mapreduce
+
+Step 4) Run 
+
+mvn clean install assembly:assembly
+ant veryclean jar jar-test  -Dresolvers=internal 
+
+In case you want to skip the tests run:
+
+mvn clean install assembly:assembly -Dmaven.test.skip.exec=true
+ant veryclean jar jar-test  -Dresolvers=internal 
+
+You will see a tarball in
+ls target/hadoop-mapreduce-1.0-SNAPSHOT-bin.tar.gz  
+
+Step 5) Untar the tarball in a clean and different directory.
+say HADOOP_YARN_INSTALL
+
+To run Hadoop Mapreduce next applications :
+
+Step 6) cd $HADOOP_YARN_INSTALL
+
+Step 7) export the following variables:
+
+HADOOP_MAPRED_HOME=
+HADOOP_COMMON_HOME=
+HADOOP_HDFS_HOME=
+YARN_HOME=directory where you untarred yarn
+HADOOP_CONF_DIR=
+YARN_CONF_DIR=$HADOOP_CONF_DIR
+
+Step 8) bin/yarn-daemon.sh start resourcemanager
+
+Step 9) bin/yarn-daemon.sh start nodemanager
+
+Step 10) Create the following symlinks in hadoop-common/lib 
+
+ln -s $HADOOP_YARN_INSTALL/modules/hadoop-mapreduce-client-app-1.0-SNAPSHOT.jar .	
+ln -s $HADOOP_YARN_INSTALL/modules/yarn-api-1.0-SNAPSHOT.jar .
+ln -s $HADOOP_YARN_INSTALL/modules/hadoop-mapreduce-client-common-1.0-SNAPSHOT.jar .	
+ln -s $HADOOP_YARN_INSTALL/modules/yarn-common-1.0-SNAPSHOT.jar .
+ln -s $HADOOP_YARN_INSTALL/modules/hadoop-mapreduce-client-core-1.0-SNAPSHOT.jar .	
+ln -s $HADOOP_YARN_INSTALL/modules/yarn-server-common-1.0-SNAPSHOT.jar .
+
+Step 11) You are all set, an example on how to run a job is:
+
+$HADOOP_COMMON_HOME/bin/hadoop jar $HADOOP_MAPRED_HOME/build/hadoop-mapred-examples-0.22.0-SNAPSHOT.jar randomwriter -Dmapreduce.job.user.name=$USER -Dmapreduce.randomwriter.bytespermap=10000 -Ddfs.blocksize=536870912 -Ddfs.block.size=536870912 -libjars $HADOOP_YARN_INSTALL/hadoop-mapreduce-1.0-SNAPSHOT/modules/hadoop-mapreduce-client-jobclient-1.0-SNAPSHOT.jar output 
+
+

Added: hadoop/mapreduce/branches/MR-279/assembly/all.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/assembly/all.xml?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/assembly/all.xml (added)
+++ hadoop/mapreduce/branches/MR-279/assembly/all.xml Thu Mar 17 20:21:13 2011
@@ -0,0 +1,59 @@
+
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+  <id>bin</id>
+  <formats>
+    <format>tar.gz</format>
+  </formats>
+  <includeBaseDirectory>true</includeBaseDirectory>
+  <!-- TODO: this layout is wrong. We need module specific bin files in module specific dirs -->
+  <fileSets>
+    <fileSet>
+      <directory>yarn/yarn-server/yarn-server-nodemanager/target/classes/bin</directory>
+      <outputDirectory>bin</outputDirectory>
+      <includes>
+        <include>container-executor</include>
+      </includes>
+      <fileMode>0755</fileMode>
+    </fileSet>
+    <fileSet>
+      <directory>yarn/bin</directory>
+      <outputDirectory>bin</outputDirectory>
+      <includes>
+        <include>*</include>
+      </includes>
+      <fileMode>0755</fileMode>
+    </fileSet>
+    <fileSet>
+      <directory>yarn/conf</directory>
+      <outputDirectory>conf</outputDirectory>
+      <includes>
+        <include>**/*</include>
+      </includes>
+    </fileSet>
+  </fileSets>
+  <moduleSets>
+    <moduleSet>
+      <excludes>
+        <exclude>org.apache.hadoop:yarn-server-tests</exclude>
+      </excludes>
+      <binaries>
+        <outputDirectory>modules</outputDirectory>
+        <includeDependencies>false</includeDependencies>
+        <unpack>false</unpack>
+      </binaries>
+    </moduleSet>
+  </moduleSets>
+  <dependencySets>
+    <dependencySet>
+      <useProjectArtifact>false</useProjectArtifact>
+      <outputDirectory>/lib</outputDirectory>
+      <!-- Exclude hadoop artifacts. They will be found via HADOOP* env -->
+      <excludes>
+        <exclude>org.apache.hadoop:hadoop-common</exclude>
+        <exclude>org.apache.hadoop:hadoop-hdfs</exclude>
+      </excludes>
+    </dependencySet>
+  </dependencySets>
+</assembly>

Modified: hadoop/mapreduce/branches/MR-279/build.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/build.xml?rev=1082677&r1=1082676&r2=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/build.xml (original)
+++ hadoop/mapreduce/branches/MR-279/build.xml Thu Mar 17 20:21:13 2011
@@ -1354,10 +1354,8 @@
   </target>   
 
   <target name="clean-cache" description="Clean. Delete ivy cache">
-    <delete dir="${user.home}/.ivy2/cache/org.apache.hadoop/hadoop-common"/>
-    <delete dir="${user.home}/.ivy2/cache/org.apache.hadoop/hadoop-common-test"/>
-    <delete dir="${user.home}/.ivy2/cache/org.apache.hadoop/hadoop-hdfs"/>
-    <delete dir="${user.home}/.ivy2/cache/org.apache.hadoop/hadoop-hdfs-test"/>
+    <delete dir="${user.home}/.ivy2/cache/org.apache.hadoop"/>
+    <delete dir="${user.home}/.ivy2/cache/org.apache.hadoop.mapreduce"/>
   </target>
 
   <target name="mvn-install-mapred" depends="mvn-taskdef,examples,tools,set-version">

Modified: hadoop/mapreduce/branches/MR-279/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/ivy.xml?rev=1082677&r1=1082676&r2=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/ivy.xml (original)
+++ hadoop/mapreduce/branches/MR-279/ivy.xml Thu Mar 17 20:21:13 2011
@@ -72,6 +72,9 @@
    <dependency org="checkstyle" name="checkstyle" rev="${checkstyle.version}"
                conf="checkstyle->default"/>
 
+   <dependency org="org.apache.hadoop" name="hadoop-mapreduce-client-core" 
+               rev="${hadoop-mapreduce-client-core.version}" conf="common->default"/> 
+
    <dependency org="jdiff" name="jdiff" rev="${jdiff.version}"
                conf="jdiff->default"/>
    <dependency org="xerces" name="xerces" rev="${xerces.version}"

Modified: hadoop/mapreduce/branches/MR-279/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/ivy/libraries.properties?rev=1082677&r1=1082676&r2=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/ivy/libraries.properties (original)
+++ hadoop/mapreduce/branches/MR-279/ivy/libraries.properties Thu Mar 17 20:21:13 2011
@@ -32,3 +32,6 @@ jdiff.version=1.0.9
 rats-lib.version=0.6
 
 xerces.version=1.4.4
+
+yarn.version=1.0-SNAPSHOT
+hadoop-mapreduce-client-core.version=1.0-SNAPSHOT

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/pom.xml?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/pom.xml (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/pom.xml Thu Mar 17 20:21:13 2011
@@ -0,0 +1,100 @@
+<?xml version="1.0"?>
+<project>
+  <parent>
+    <artifactId>hadoop-mapreduce-client</artifactId>
+    <groupId>org.apache.hadoop</groupId>
+    <version>${yarn.version}</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.apache.hadoop</groupId>
+  <artifactId>hadoop-mapreduce-client-app</artifactId>
+  <name>hadoop-mapreduce-client-app</name>
+  <version>${yarn.version}</version>
+  <url>http://maven.apache.org</url>
+
+  <dependencies>
+    <!-- begin MNG-4223 workaround -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>yarn-api</artifactId>
+      <version>${yarn.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>yarn-common</artifactId>
+      <version>${yarn.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>yarn-server</artifactId>
+      <version>${yarn.version}</version>
+      <type>pom</type>
+      <scope>test</scope>
+    </dependency>
+    <!-- end MNG-4223 workaround -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>yarn-common</artifactId>
+      <version>${yarn.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-common</artifactId>
+      <version>${yarn.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>yarn-server-common</artifactId>
+      <version>${yarn.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>yarn-server-nodemanager</artifactId>
+      <version>${yarn.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>yarn-server-resourcemanager</artifactId>
+      <version>${yarn.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>2.3.1</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+            <phase>test-compile</phase>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>build-classpath</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>build-classpath</goal>
+            </goals>
+            <configuration>
+              <outputFile>target/classes/mrapp-generated-classpath</outputFile>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/FailingMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/FailingMapper.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/FailingMapper.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/FailingMapper.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,44 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * Fails the Mapper. First attempt throws exception. Rest do System.exit.
+ *
+ */
+public class FailingMapper extends Mapper<Text, Text, Text, Text> {
+  public void map(Text key, Text value,
+      Context context) throws IOException,InterruptedException {
+    if (context.getTaskAttemptID().getId() == 0) {
+      System.out.println("Attempt:" + context.getTaskAttemptID() + 
+        " Failing mapper throwing exception");
+      throw new IOException("Attempt:" + context.getTaskAttemptID() + 
+          " Failing mapper throwing exception");
+    } else {
+      System.out.println("Attempt:" + context.getTaskAttemptID() + 
+      " Exiting");
+      System.exit(-1);
+    }
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/RandomTextWriterJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/RandomTextWriterJob.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/RandomTextWriterJob.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/RandomTextWriterJob.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,758 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class RandomTextWriterJob extends Configured implements Tool {
+
+  public static final String TOTAL_BYTES = 
+    "mapreduce.randomtextwriter.totalbytes";
+  public static final String BYTES_PER_MAP = 
+    "mapreduce.randomtextwriter.bytespermap";
+  public static final String MAX_VALUE = "mapreduce.randomtextwriter.maxwordsvalue";
+  public static final String MIN_VALUE = "mapreduce.randomtextwriter.minwordsvalue";
+  public static final String MIN_KEY = "mapreduce.randomtextwriter.minwordskey";
+  public static final String MAX_KEY = "mapreduce.randomtextwriter.maxwordskey";
+
+  static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
+
+  public Job createJob(Configuration conf) throws IOException {
+    long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, 10 * 1024);
+    long totalBytesToWrite = conf.getLong(TOTAL_BYTES, numBytesToWritePerMap);
+    int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
+    if (numMaps == 0 && totalBytesToWrite > 0) {
+      numMaps = 1;
+      conf.setLong(BYTES_PER_MAP, totalBytesToWrite);
+    }
+    conf.setInt(MRJobConfig.NUM_MAPS, numMaps);
+
+    Job job = new Job(conf);
+
+    job.setJarByClass(RandomTextWriterJob.class);
+    job.setJobName("random-text-writer");
+
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+
+    job.setInputFormatClass(RandomInputFormat.class);
+    job.setMapperClass(RandomTextMapper.class);
+
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    //FileOutputFormat.setOutputPath(job, new Path("random-output"));
+    job.setNumReduceTasks(0);
+    return job;
+  }
+
+  public static class RandomInputFormat extends InputFormat<Text, Text> {
+
+    /** 
+     * Generate the requested number of file splits, with the filename
+     * set to the filename of the output file.
+     */
+    public List<InputSplit> getSplits(JobContext job) throws IOException {
+      List<InputSplit> result = new ArrayList<InputSplit>();
+      Path outDir = FileOutputFormat.getOutputPath(job);
+      int numSplits = 
+            job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
+      for(int i=0; i < numSplits; ++i) {
+        result.add(new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, 
+                                  (String[])null));
+      }
+      return result;
+    }
+
+    /**
+     * Return a single record (filename, "") where the filename is taken from
+     * the file split.
+     */
+    public static class RandomRecordReader extends RecordReader<Text, Text> {
+      Path name;
+      Text key = null;
+      Text value = new Text();
+      public RandomRecordReader(Path p) {
+        name = p;
+      }
+      
+      public void initialize(InputSplit split,
+                             TaskAttemptContext context)
+      throws IOException, InterruptedException {
+        
+      }
+      
+      public boolean nextKeyValue() {
+        if (name != null) {
+          key = new Text();
+          key.set(name.getName());
+          name = null;
+          return true;
+        }
+        return false;
+      }
+      
+      public Text getCurrentKey() {
+        return key;
+      }
+      
+      public Text getCurrentValue() {
+        return value;
+      }
+      
+      public void close() {}
+
+      public float getProgress() {
+        return 0.0f;
+      }
+    }
+
+    public RecordReader<Text, Text> createRecordReader(InputSplit split,
+        TaskAttemptContext context) throws IOException, InterruptedException {
+      return new RandomRecordReader(((FileSplit) split).getPath());
+    }
+  }
+
+  public static class RandomTextMapper extends Mapper<Text, Text, Text, Text> {
+    
+    private long numBytesToWrite;
+    private int minWordsInKey;
+    private int wordsInKeyRange;
+    private int minWordsInValue;
+    private int wordsInValueRange;
+    private Random random = new Random();
+    
+    /**
+     * Save the configuration value that we need to write the data.
+     */
+    public void setup(Context context) {
+      Configuration conf = context.getConfiguration();
+      numBytesToWrite = conf.getLong(BYTES_PER_MAP,
+                                    1*1024*1024*1024);
+      minWordsInKey = conf.getInt(MIN_KEY, 5);
+      wordsInKeyRange = (conf.getInt(MAX_KEY, 10) - minWordsInKey);
+      minWordsInValue = conf.getInt(MIN_VALUE, 10);
+      wordsInValueRange = (conf.getInt(MAX_VALUE, 100) - minWordsInValue);
+    }
+    
+    /**
+     * Given an output filename, write a bunch of random records to it.
+     */
+    public void map(Text key, Text value,
+                    Context context) throws IOException,InterruptedException {
+      int itemCount = 0;
+      while (numBytesToWrite > 0) {
+        // Generate the key/value 
+        int noWordsKey = minWordsInKey + 
+          (wordsInKeyRange != 0 ? random.nextInt(wordsInKeyRange) : 0);
+        int noWordsValue = minWordsInValue + 
+          (wordsInValueRange != 0 ? random.nextInt(wordsInValueRange) : 0);
+        Text keyWords = generateSentence(noWordsKey);
+        Text valueWords = generateSentence(noWordsValue);
+        
+        // Write the sentence 
+        context.write(keyWords, valueWords);
+        
+        numBytesToWrite -= (keyWords.getLength() + valueWords.getLength());
+        
+        // Update counters, progress etc.
+        context.getCounter(Counters.BYTES_WRITTEN).increment(
+                  keyWords.getLength() + valueWords.getLength());
+        context.getCounter(Counters.RECORDS_WRITTEN).increment(1);
+        if (++itemCount % 200 == 0) {
+          context.setStatus("wrote record " + itemCount + ". " + 
+                             numBytesToWrite + " bytes left.");
+        }
+      }
+      context.setStatus("done with " + itemCount + " records.");
+    }
+    
+    private Text generateSentence(int noWords) {
+      StringBuffer sentence = new StringBuffer();
+      String space = " ";
+      for (int i=0; i < noWords; ++i) {
+        sentence.append(words[random.nextInt(words.length)]);
+        sentence.append(space);
+      }
+      return new Text(sentence.toString());
+    }
+
+    private static String[] words = {
+      "diurnalness", "Homoiousian",
+      "spiranthic", "tetragynian",
+      "silverhead", "ungreat",
+      "lithograph", "exploiter",
+      "physiologian", "by",
+      "hellbender", "Filipendula",
+      "undeterring", "antiscolic",
+      "pentagamist", "hypoid",
+      "cacuminal", "sertularian",
+      "schoolmasterism", "nonuple",
+      "gallybeggar", "phytonic",
+      "swearingly", "nebular",
+      "Confervales", "thermochemically",
+      "characinoid", "cocksuredom",
+      "fallacious", "feasibleness",
+      "debromination", "playfellowship",
+      "tramplike", "testa",
+      "participatingly", "unaccessible",
+      "bromate", "experientialist",
+      "roughcast", "docimastical",
+      "choralcelo", "blightbird",
+      "peptonate", "sombreroed",
+      "unschematized", "antiabolitionist",
+      "besagne", "mastication",
+      "bromic", "sviatonosite",
+      "cattimandoo", "metaphrastical",
+      "endotheliomyoma", "hysterolysis",
+      "unfulminated", "Hester",
+      "oblongly", "blurredness",
+      "authorling", "chasmy",
+      "Scorpaenidae", "toxihaemia",
+      "Dictograph", "Quakerishly",
+      "deaf", "timbermonger",
+      "strammel", "Thraupidae",
+      "seditious", "plerome",
+      "Arneb", "eristically",
+      "serpentinic", "glaumrie",
+      "socioromantic", "apocalypst",
+      "tartrous", "Bassaris",
+      "angiolymphoma", "horsefly",
+      "kenno", "astronomize",
+      "euphemious", "arsenide",
+      "untongued", "parabolicness",
+      "uvanite", "helpless",
+      "gemmeous", "stormy",
+      "templar", "erythrodextrin",
+      "comism", "interfraternal",
+      "preparative", "parastas",
+      "frontoorbital", "Ophiosaurus",
+      "diopside", "serosanguineous",
+      "ununiformly", "karyological",
+      "collegian", "allotropic",
+      "depravity", "amylogenesis",
+      "reformatory", "epidymides",
+      "pleurotropous", "trillium",
+      "dastardliness", "coadvice",
+      "embryotic", "benthonic",
+      "pomiferous", "figureheadship",
+      "Megaluridae", "Harpa",
+      "frenal", "commotion",
+      "abthainry", "cobeliever",
+      "manilla", "spiciferous",
+      "nativeness", "obispo",
+      "monilioid", "biopsic",
+      "valvula", "enterostomy",
+      "planosubulate", "pterostigma",
+      "lifter", "triradiated",
+      "venialness", "tum",
+      "archistome", "tautness",
+      "unswanlike", "antivenin",
+      "Lentibulariaceae", "Triphora",
+      "angiopathy", "anta",
+      "Dawsonia", "becomma",
+      "Yannigan", "winterproof",
+      "antalgol", "harr",
+      "underogating", "ineunt",
+      "cornberry", "flippantness",
+      "scyphostoma", "approbation",
+      "Ghent", "Macraucheniidae",
+      "scabbiness", "unanatomized",
+      "photoelasticity", "eurythermal",
+      "enation", "prepavement",
+      "flushgate", "subsequentially",
+      "Edo", "antihero",
+      "Isokontae", "unforkedness",
+      "porriginous", "daytime",
+      "nonexecutive", "trisilicic",
+      "morphiomania", "paranephros",
+      "botchedly", "impugnation",
+      "Dodecatheon", "obolus",
+      "unburnt", "provedore",
+      "Aktistetae", "superindifference",
+      "Alethea", "Joachimite",
+      "cyanophilous", "chorograph",
+      "brooky", "figured",
+      "periclitation", "quintette",
+      "hondo", "ornithodelphous",
+      "unefficient", "pondside",
+      "bogydom", "laurinoxylon",
+      "Shiah", "unharmed",
+      "cartful", "noncrystallized",
+      "abusiveness", "cromlech",
+      "japanned", "rizzomed",
+      "underskin", "adscendent",
+      "allectory", "gelatinousness",
+      "volcano", "uncompromisingly",
+      "cubit", "idiotize",
+      "unfurbelowed", "undinted",
+      "magnetooptics", "Savitar",
+      "diwata", "ramosopalmate",
+      "Pishquow", "tomorn",
+      "apopenptic", "Haversian",
+      "Hysterocarpus", "ten",
+      "outhue", "Bertat",
+      "mechanist", "asparaginic",
+      "velaric", "tonsure",
+      "bubble", "Pyrales",
+      "regardful", "glyphography",
+      "calabazilla", "shellworker",
+      "stradametrical", "havoc",
+      "theologicopolitical", "sawdust",
+      "diatomaceous", "jajman",
+      "temporomastoid", "Serrifera",
+      "Ochnaceae", "aspersor",
+      "trailmaking", "Bishareen",
+      "digitule", "octogynous",
+      "epididymitis", "smokefarthings",
+      "bacillite", "overcrown",
+      "mangonism", "sirrah",
+      "undecorated", "psychofugal",
+      "bismuthiferous", "rechar",
+      "Lemuridae", "frameable",
+      "thiodiazole", "Scanic",
+      "sportswomanship", "interruptedness",
+      "admissory", "osteopaedion",
+      "tingly", "tomorrowness",
+      "ethnocracy", "trabecular",
+      "vitally", "fossilism",
+      "adz", "metopon",
+      "prefatorial", "expiscate",
+      "diathermacy", "chronist",
+      "nigh", "generalizable",
+      "hysterogen", "aurothiosulphuric",
+      "whitlowwort", "downthrust",
+      "Protestantize", "monander",
+      "Itea", "chronographic",
+      "silicize", "Dunlop",
+      "eer", "componental",
+      "spot", "pamphlet",
+      "antineuritic", "paradisean",
+      "interruptor", "debellator",
+      "overcultured", "Florissant",
+      "hyocholic", "pneumatotherapy",
+      "tailoress", "rave",
+      "unpeople", "Sebastian",
+      "thermanesthesia", "Coniferae",
+      "swacking", "posterishness",
+      "ethmopalatal", "whittle",
+      "analgize", "scabbardless",
+      "naught", "symbiogenetically",
+      "trip", "parodist",
+      "columniform", "trunnel",
+      "yawler", "goodwill",
+      "pseudohalogen", "swangy",
+      "cervisial", "mediateness",
+      "genii", "imprescribable",
+      "pony", "consumptional",
+      "carposporangial", "poleax",
+      "bestill", "subfebrile",
+      "sapphiric", "arrowworm",
+      "qualminess", "ultraobscure",
+      "thorite", "Fouquieria",
+      "Bermudian", "prescriber",
+      "elemicin", "warlike",
+      "semiangle", "rotular",
+      "misthread", "returnability",
+      "seraphism", "precostal",
+      "quarried", "Babylonism",
+      "sangaree", "seelful",
+      "placatory", "pachydermous",
+      "bozal", "galbulus",
+      "spermaphyte", "cumbrousness",
+      "pope", "signifier",
+      "Endomycetaceae", "shallowish",
+      "sequacity", "periarthritis",
+      "bathysphere", "pentosuria",
+      "Dadaism", "spookdom",
+      "Consolamentum", "afterpressure",
+      "mutter", "louse",
+      "ovoviviparous", "corbel",
+      "metastoma", "biventer",
+      "Hydrangea", "hogmace",
+      "seizing", "nonsuppressed",
+      "oratorize", "uncarefully",
+      "benzothiofuran", "penult",
+      "balanocele", "macropterous",
+      "dishpan", "marten",
+      "absvolt", "jirble",
+      "parmelioid", "airfreighter",
+      "acocotl", "archesporial",
+      "hypoplastral", "preoral",
+      "quailberry", "cinque",
+      "terrestrially", "stroking",
+      "limpet", "moodishness",
+      "canicule", "archididascalian",
+      "pompiloid", "overstaid",
+      "introducer", "Italical",
+      "Christianopaganism", "prescriptible",
+      "subofficer", "danseuse",
+      "cloy", "saguran",
+      "frictionlessly", "deindividualization",
+      "Bulanda", "ventricous",
+      "subfoliar", "basto",
+      "scapuloradial", "suspend",
+      "stiffish", "Sphenodontidae",
+      "eternal", "verbid",
+      "mammonish", "upcushion",
+      "barkometer", "concretion",
+      "preagitate", "incomprehensible",
+      "tristich", "visceral",
+      "hemimelus", "patroller",
+      "stentorophonic", "pinulus",
+      "kerykeion", "brutism",
+      "monstership", "merciful",
+      "overinstruct", "defensibly",
+      "bettermost", "splenauxe",
+      "Mormyrus", "unreprimanded",
+      "taver", "ell",
+      "proacquittal", "infestation",
+      "overwoven", "Lincolnlike",
+      "chacona", "Tamil",
+      "classificational", "lebensraum",
+      "reeveland", "intuition",
+      "Whilkut", "focaloid",
+      "Eleusinian", "micromembrane",
+      "byroad", "nonrepetition",
+      "bacterioblast", "brag",
+      "ribaldrous", "phytoma",
+      "counteralliance", "pelvimetry",
+      "pelf", "relaster",
+      "thermoresistant", "aneurism",
+      "molossic", "euphonym",
+      "upswell", "ladhood",
+      "phallaceous", "inertly",
+      "gunshop", "stereotypography",
+      "laryngic", "refasten",
+      "twinling", "oflete",
+      "hepatorrhaphy", "electrotechnics",
+      "cockal", "guitarist",
+      "topsail", "Cimmerianism",
+      "larklike", "Llandovery",
+      "pyrocatechol", "immatchable",
+      "chooser", "metrocratic",
+      "craglike", "quadrennial",
+      "nonpoisonous", "undercolored",
+      "knob", "ultratense",
+      "balladmonger", "slait",
+      "sialadenitis", "bucketer",
+      "magnificently", "unstipulated",
+      "unscourged", "unsupercilious",
+      "packsack", "pansophism",
+      "soorkee", "percent",
+      "subirrigate", "champer",
+      "metapolitics", "spherulitic",
+      "involatile", "metaphonical",
+      "stachyuraceous", "speckedness",
+      "bespin", "proboscidiform",
+      "gul", "squit",
+      "yeelaman", "peristeropode",
+      "opacousness", "shibuichi",
+      "retinize", "yote",
+      "misexposition", "devilwise",
+      "pumpkinification", "vinny",
+      "bonze", "glossing",
+      "decardinalize", "transcortical",
+      "serphoid", "deepmost",
+      "guanajuatite", "wemless",
+      "arval", "lammy",
+      "Effie", "Saponaria",
+      "tetrahedral", "prolificy",
+      "excerpt", "dunkadoo",
+      "Spencerism", "insatiately",
+      "Gilaki", "oratorship",
+      "arduousness", "unbashfulness",
+      "Pithecolobium", "unisexuality",
+      "veterinarian", "detractive",
+      "liquidity", "acidophile",
+      "proauction", "sural",
+      "totaquina", "Vichyite",
+      "uninhabitedness", "allegedly",
+      "Gothish", "manny",
+      "Inger", "flutist",
+      "ticktick", "Ludgatian",
+      "homotransplant", "orthopedical",
+      "diminutively", "monogoneutic",
+      "Kenipsim", "sarcologist",
+      "drome", "stronghearted",
+      "Fameuse", "Swaziland",
+      "alen", "chilblain",
+      "beatable", "agglomeratic",
+      "constitutor", "tendomucoid",
+      "porencephalous", "arteriasis",
+      "boser", "tantivy",
+      "rede", "lineamental",
+      "uncontradictableness", "homeotypical",
+      "masa", "folious",
+      "dosseret", "neurodegenerative",
+      "subtransverse", "Chiasmodontidae",
+      "palaeotheriodont", "unstressedly",
+      "chalcites", "piquantness",
+      "lampyrine", "Aplacentalia",
+      "projecting", "elastivity",
+      "isopelletierin", "bladderwort",
+      "strander", "almud",
+      "iniquitously", "theologal",
+      "bugre", "chargeably",
+      "imperceptivity", "meriquinoidal",
+      "mesophyte", "divinator",
+      "perfunctory", "counterappellant",
+      "synovial", "charioteer",
+      "crystallographical", "comprovincial",
+      "infrastapedial", "pleasurehood",
+      "inventurous", "ultrasystematic",
+      "subangulated", "supraoesophageal",
+      "Vaishnavism", "transude",
+      "chrysochrous", "ungrave",
+      "reconciliable", "uninterpleaded",
+      "erlking", "wherefrom",
+      "aprosopia", "antiadiaphorist",
+      "metoxazine", "incalculable",
+      "umbellic", "predebit",
+      "foursquare", "unimmortal",
+      "nonmanufacture", "slangy",
+      "predisputant", "familist",
+      "preaffiliate", "friarhood",
+      "corelysis", "zoonitic",
+      "halloo", "paunchy",
+      "neuromimesis", "aconitine",
+      "hackneyed", "unfeeble",
+      "cubby", "autoschediastical",
+      "naprapath", "lyrebird",
+      "inexistency", "leucophoenicite",
+      "ferrogoslarite", "reperuse",
+      "uncombable", "tambo",
+      "propodiale", "diplomatize",
+      "Russifier", "clanned",
+      "corona", "michigan",
+      "nonutilitarian", "transcorporeal",
+      "bought", "Cercosporella",
+      "stapedius", "glandularly",
+      "pictorially", "weism",
+      "disilane", "rainproof",
+      "Caphtor", "scrubbed",
+      "oinomancy", "pseudoxanthine",
+      "nonlustrous", "redesertion",
+      "Oryzorictinae", "gala",
+      "Mycogone", "reappreciate",
+      "cyanoguanidine", "seeingness",
+      "breadwinner", "noreast",
+      "furacious", "epauliere",
+      "omniscribent", "Passiflorales",
+      "uninductive", "inductivity",
+      "Orbitolina", "Semecarpus",
+      "migrainoid", "steprelationship",
+      "phlogisticate", "mesymnion",
+      "sloped", "edificator",
+      "beneficent", "culm",
+      "paleornithology", "unurban",
+      "throbless", "amplexifoliate",
+      "sesquiquintile", "sapience",
+      "astucious", "dithery",
+      "boor", "ambitus",
+      "scotching", "uloid",
+      "uncompromisingness", "hoove",
+      "waird", "marshiness",
+      "Jerusalem", "mericarp",
+      "unevoked", "benzoperoxide",
+      "outguess", "pyxie",
+      "hymnic", "euphemize",
+      "mendacity", "erythremia",
+      "rosaniline", "unchatteled",
+      "lienteria", "Bushongo",
+      "dialoguer", "unrepealably",
+      "rivethead", "antideflation",
+      "vinegarish", "manganosiderite",
+      "doubtingness", "ovopyriform",
+      "Cephalodiscus", "Muscicapa",
+      "Animalivora", "angina",
+      "planispheric", "ipomoein",
+      "cuproiodargyrite", "sandbox",
+      "scrat", "Munnopsidae",
+      "shola", "pentafid",
+      "overstudiousness", "times",
+      "nonprofession", "appetible",
+      "valvulotomy", "goladar",
+      "uniarticular", "oxyterpene",
+      "unlapsing", "omega",
+      "trophonema", "seminonflammable",
+      "circumzenithal", "starer",
+      "depthwise", "liberatress",
+      "unleavened", "unrevolting",
+      "groundneedle", "topline",
+      "wandoo", "umangite",
+      "ordinant", "unachievable",
+      "oversand", "snare",
+      "avengeful", "unexplicit",
+      "mustafina", "sonable",
+      "rehabilitative", "eulogization",
+      "papery", "technopsychology",
+      "impressor", "cresylite",
+      "entame", "transudatory",
+      "scotale", "pachydermatoid",
+      "imaginary", "yeat",
+      "slipped", "stewardship",
+      "adatom", "cockstone",
+      "skyshine", "heavenful",
+      "comparability", "exprobratory",
+      "dermorhynchous", "parquet",
+      "cretaceous", "vesperal",
+      "raphis", "undangered",
+      "Glecoma", "engrain",
+      "counteractively", "Zuludom",
+      "orchiocatabasis", "Auriculariales",
+      "warriorwise", "extraorganismal",
+      "overbuilt", "alveolite",
+      "tetchy", "terrificness",
+      "widdle", "unpremonished",
+      "rebilling", "sequestrum",
+      "equiconvex", "heliocentricism",
+      "catabaptist", "okonite",
+      "propheticism", "helminthagogic",
+      "calycular", "giantly",
+      "wingable", "golem",
+      "unprovided", "commandingness",
+      "greave", "haply",
+      "doina", "depressingly",
+      "subdentate", "impairment",
+      "decidable", "neurotrophic",
+      "unpredict", "bicorporeal",
+      "pendulant", "flatman",
+      "intrabred", "toplike",
+      "Prosobranchiata", "farrantly",
+      "toxoplasmosis", "gorilloid",
+      "dipsomaniacal", "aquiline",
+      "atlantite", "ascitic",
+      "perculsive", "prospectiveness",
+      "saponaceous", "centrifugalization",
+      "dinical", "infravaginal",
+      "beadroll", "affaite",
+      "Helvidian", "tickleproof",
+      "abstractionism", "enhedge",
+      "outwealth", "overcontribute",
+      "coldfinch", "gymnastic",
+      "Pincian", "Munychian",
+      "codisjunct", "quad",
+      "coracomandibular", "phoenicochroite",
+      "amender", "selectivity",
+      "putative", "semantician",
+      "lophotrichic", "Spatangoidea",
+      "saccharogenic", "inferent",
+      "Triconodonta", "arrendation",
+      "sheepskin", "taurocolla",
+      "bunghole", "Machiavel",
+      "triakistetrahedral", "dehairer",
+      "prezygapophysial", "cylindric",
+      "pneumonalgia", "sleigher",
+      "emir", "Socraticism",
+      "licitness", "massedly",
+      "instructiveness", "sturdied",
+      "redecrease", "starosta",
+      "evictor", "orgiastic",
+      "squdge", "meloplasty",
+      "Tsonecan", "repealableness",
+      "swoony", "myesthesia",
+      "molecule", "autobiographist",
+      "reciprocation", "refective",
+      "unobservantness", "tricae",
+      "ungouged", "floatability",
+      "Mesua", "fetlocked",
+      "chordacentrum", "sedentariness",
+      "various", "laubanite",
+      "nectopod", "zenick",
+      "sequentially", "analgic",
+      "biodynamics", "posttraumatic",
+      "nummi", "pyroacetic",
+      "bot", "redescend",
+      "dispermy", "undiffusive",
+      "circular", "trillion",
+      "Uraniidae", "ploration",
+      "discipular", "potentness",
+      "sud", "Hu",
+      "Eryon", "plugger",
+      "subdrainage", "jharal",
+      "abscission", "supermarket",
+      "countergabion", "glacierist",
+      "lithotresis", "minniebush",
+      "zanyism", "eucalypteol",
+      "sterilely", "unrealize",
+      "unpatched", "hypochondriacism",
+      "critically", "cheesecutter",
+     };
+  }
+
+  /**
+   * This is the main routine for launching a distributed random write job.
+   * It runs 10 maps/node and each node writes 1 gig of data to a DFS file.
+   * The reduce doesn't do anything.
+   * 
+   * @throws IOException 
+   */
+  public int run(String[] args) throws Exception {    
+    if (args.length == 0) {
+      return printUsage();    
+    }
+    Job job = createJob(getConf());
+    FileOutputFormat.setOutputPath(job, new Path(args[0]));
+    Date startTime = new Date();
+    System.out.println("Job started: " + startTime);
+    int ret = job.waitForCompletion(true) ? 0 : 1;
+    Date endTime = new Date();
+    System.out.println("Job ended: " + endTime);
+    System.out.println("The job took " + 
+                       (endTime.getTime() - startTime.getTime()) /1000 + 
+                       " seconds.");
+    
+    return ret;
+  }
+
+  static int printUsage() {
+    System.out.println("randomtextwriter " +
+                       "[-outFormat <output format class>] " + 
+                       "<output>");
+    ToolRunner.printGenericCommandUsage(System.out);
+    return 2;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new RandomTextWriterJob(),
+        args);
+    System.exit(res);
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/SleepJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/SleepJob.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/SleepJob.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/SleepJob.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,274 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop;
+
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Dummy class for testing MR framefork. Sleeps for a defined period 
+ * of time in mapper and reducer. Generates fake input for map / reduce 
+ * jobs. Note that generated number of input pairs is in the order 
+ * of <code>numMappers * mapSleepTime / 100</code>, so the job uses
+ * some disk space.
+ */
+public class SleepJob extends Configured implements Tool {
+  public static String MAP_SLEEP_COUNT = "mapreduce.sleepjob.map.sleep.count";
+  public static String REDUCE_SLEEP_COUNT = 
+    "mapreduce.sleepjob.reduce.sleep.count";
+  public static String MAP_SLEEP_TIME = "mapreduce.sleepjob.map.sleep.time";
+  public static String REDUCE_SLEEP_TIME = 
+    "mapreduce.sleepjob.reduce.sleep.time";
+
+  public static class SleepJobPartitioner extends 
+      Partitioner<IntWritable, NullWritable> {
+    public int getPartition(IntWritable k, NullWritable v, int numPartitions) {
+      return k.get() % numPartitions;
+    }
+  }
+  
+  public static class EmptySplit extends InputSplit implements Writable {
+    public void write(DataOutput out) throws IOException { }
+    public void readFields(DataInput in) throws IOException { }
+    public long getLength() { return 0L; }
+    public String[] getLocations() { return new String[0]; }
+  }
+
+  public static class SleepInputFormat 
+      extends InputFormat<IntWritable,IntWritable> {
+    
+    public List<InputSplit> getSplits(JobContext jobContext) {
+      List<InputSplit> ret = new ArrayList<InputSplit>();
+      int numSplits = jobContext.getConfiguration().
+                        getInt(MRJobConfig.NUM_MAPS, 1);
+      for (int i = 0; i < numSplits; ++i) {
+        ret.add(new EmptySplit());
+      }
+      return ret;
+    }
+    
+    public RecordReader<IntWritable,IntWritable> createRecordReader(
+        InputSplit ignored, TaskAttemptContext taskContext)
+        throws IOException {
+      Configuration conf = taskContext.getConfiguration();
+      final int count = conf.getInt(MAP_SLEEP_COUNT, 1);
+      if (count < 0) throw new IOException("Invalid map count: " + count);
+      final int redcount = conf.getInt(REDUCE_SLEEP_COUNT, 1);
+      if (redcount < 0)
+        throw new IOException("Invalid reduce count: " + redcount);
+      final int emitPerMapTask = (redcount * taskContext.getNumReduceTasks());
+      
+      return new RecordReader<IntWritable,IntWritable>() {
+        private int records = 0;
+        private int emitCount = 0;
+        private IntWritable key = null;
+        private IntWritable value = null;
+        public void initialize(InputSplit split, TaskAttemptContext context) {
+        }
+
+        public boolean nextKeyValue()
+            throws IOException {
+          key = new IntWritable();
+          key.set(emitCount);
+          int emit = emitPerMapTask / count;
+          if ((emitPerMapTask) % count > records) {
+            ++emit;
+          }
+          emitCount += emit;
+          value = new IntWritable();
+          value.set(emit);
+          return records++ < count;
+        }
+        public IntWritable getCurrentKey() { return key; }
+        public IntWritable getCurrentValue() { return value; }
+        public void close() throws IOException { }
+        public float getProgress() throws IOException {
+          return records / ((float)count);
+        }
+      };
+    }
+  }
+
+  public static class SleepMapper 
+      extends Mapper<IntWritable, IntWritable, IntWritable, NullWritable> {
+    private long mapSleepDuration = 100;
+    private int mapSleepCount = 1;
+    private int count = 0;
+
+    protected void setup(Context context) 
+      throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      this.mapSleepCount =
+        conf.getInt(MAP_SLEEP_COUNT, mapSleepCount);
+      this.mapSleepDuration =
+        conf.getLong(MAP_SLEEP_TIME , 100) / mapSleepCount;
+    }
+
+    public void map(IntWritable key, IntWritable value, Context context
+               ) throws IOException, InterruptedException {
+      //it is expected that every map processes mapSleepCount number of records. 
+      try {
+        context.setStatus("Sleeping... (" +
+          (mapSleepDuration * (mapSleepCount - count)) + ") ms left");
+        Thread.sleep(mapSleepDuration);
+      }
+      catch (InterruptedException ex) {
+        throw (IOException)new IOException(
+            "Interrupted while sleeping").initCause(ex);
+      }
+      ++count;
+      // output reduceSleepCount * numReduce number of random values, so that
+      // each reducer will get reduceSleepCount number of keys.
+      int k = key.get();
+      for (int i = 0; i < value.get(); ++i) {
+        context.write(new IntWritable(k + i), NullWritable.get());
+      }
+    }
+  }
+  
+  public static class SleepReducer  
+      extends Reducer<IntWritable, NullWritable, NullWritable, NullWritable> {
+    private long reduceSleepDuration = 100;
+    private int reduceSleepCount = 1;
+    private int count = 0;
+
+    protected void setup(Context context) 
+      throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      this.reduceSleepCount =
+        conf.getInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
+      this.reduceSleepDuration =
+        conf.getLong(REDUCE_SLEEP_TIME , 100) / reduceSleepCount;
+    }
+
+    public void reduce(IntWritable key, Iterable<NullWritable> values,
+                       Context context)
+      throws IOException {
+      try {
+        context.setStatus("Sleeping... (" +
+            (reduceSleepDuration * (reduceSleepCount - count)) + ") ms left");
+        Thread.sleep(reduceSleepDuration);
+      
+      }
+      catch (InterruptedException ex) {
+        throw (IOException)new IOException(
+          "Interrupted while sleeping").initCause(ex);
+      }
+      count++;
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new SleepJob(), args);
+    System.exit(res);
+  }
+
+  public Job createJob(int numMapper, int numReducer, 
+                       long mapSleepTime, int mapSleepCount, 
+                       long reduceSleepTime, int reduceSleepCount) 
+      throws IOException {
+    Configuration conf = getConf();
+    conf.setLong(MAP_SLEEP_TIME, mapSleepTime);
+    conf.setLong(REDUCE_SLEEP_TIME, reduceSleepTime);
+    conf.setInt(MAP_SLEEP_COUNT, mapSleepCount);
+    conf.setInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
+    conf.setInt(MRJobConfig.NUM_MAPS, numMapper);
+    Job job = Job.getInstance(conf, "sleep");
+    job.setNumReduceTasks(numReducer);
+    job.setJarByClass(SleepJob.class);
+    job.setNumReduceTasks(numReducer);
+    job.setMapperClass(SleepMapper.class);
+    job.setMapOutputKeyClass(IntWritable.class);
+    job.setMapOutputValueClass(NullWritable.class);
+    job.setReducerClass(SleepReducer.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+    job.setInputFormatClass(SleepInputFormat.class);
+    job.setPartitionerClass(SleepJobPartitioner.class);
+    job.setSpeculativeExecution(false);
+    job.setJobName("Sleep job");
+    FileInputFormat.addInputPath(job, new Path("ignored"));
+    return job;
+  }
+
+  public int run(String[] args) throws Exception {
+
+    if(args.length < 1) {
+      System.err.println("SleepJob [-m numMapper] [-r numReducer]" +
+          " [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)]" +
+          " [-recordt recordSleepTime (msec)]");
+      ToolRunner.printGenericCommandUsage(System.err);
+      return 2;
+    }
+
+    int numMapper = 1, numReducer = 1;
+    long mapSleepTime = 100, reduceSleepTime = 100, recSleepTime = 100;
+    int mapSleepCount = 1, reduceSleepCount = 1;
+
+    for(int i=0; i < args.length; i++ ) {
+      if(args[i].equals("-m")) {
+        numMapper = Integer.parseInt(args[++i]);
+      }
+      else if(args[i].equals("-r")) {
+        numReducer = Integer.parseInt(args[++i]);
+      }
+      else if(args[i].equals("-mt")) {
+        mapSleepTime = Long.parseLong(args[++i]);
+      }
+      else if(args[i].equals("-rt")) {
+        reduceSleepTime = Long.parseLong(args[++i]);
+      }
+      else if (args[i].equals("-recordt")) {
+        recSleepTime = Long.parseLong(args[++i]);
+      }
+    }
+    
+    // sleep for *SleepTime duration in Task by recSleepTime per record
+    mapSleepCount = (int)Math.ceil(mapSleepTime / ((double)recSleepTime));
+    reduceSleepCount = (int)Math.ceil(reduceSleepTime / ((double)recSleepTime));
+    Job job = createJob(numMapper, numReducer, mapSleepTime,
+                mapSleepCount, reduceSleepTime, reduceSleepCount);
+    return job.waitForCompletion(true) ? 0 : 1;
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,288 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.TaskLog.LogName;
+import org.apache.hadoop.mapreduce.ID;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+
+public class MapReduceChildJVM {
+  private static final String SYSTEM_PATH_SEPARATOR = 
+    System.getProperty("path.separator");
+
+  private static final Log LOG = LogFactory.getLog(MapReduceChildJVM.class);
+
+  private static File getUserLogDir(String baseLogDir) {
+    return new File(baseLogDir, TaskLog.USERLOGS_DIR_NAME);
+  }
+
+  private static File getJobDir(String baseLogDir, JobID jobid) {
+    return new File(getUserLogDir(baseLogDir), jobid.toString());
+  }
+
+  private static File getAttemptDir(String baseLogDir, TaskAttemptID taskid,
+      boolean isCleanup) {
+    String cleanupSuffix = isCleanup ? ".cleanup" : "";
+    return new File(getJobDir(baseLogDir, taskid.getJobID()), taskid
+        + cleanupSuffix);
+  }
+
+  private static File getTaskLogFile(String baseLogDir, TaskAttemptID taskid,
+      boolean isCleanup, LogName filter) {
+    return new File(getAttemptDir(baseLogDir, taskid, isCleanup),
+        filter.toString());
+  }
+
+  private static String getChildEnv(JobConf jobConf, boolean isMap) {
+    if (isMap) {
+      return jobConf.get(JobConf.MAPRED_MAP_TASK_ENV,
+          jobConf.get(JobConf.MAPRED_TASK_ENV));
+    }
+    return jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV,
+        jobConf.get(jobConf.MAPRED_TASK_ENV));
+  }
+
+  public static void setVMEnv(Map<CharSequence,CharSequence> env,
+      List<String> classPaths, String pwd, String nmLdLibraryPath, Task task,
+      CharSequence applicationTokensFile) {
+
+    JobConf conf = task.conf;
+
+    // Add classpath.
+    CharSequence cp = env.get("CLASSPATH");
+    String classpath = StringUtils.join(SYSTEM_PATH_SEPARATOR, classPaths);
+    if (null == cp) {
+      env.put("CLASSPATH", classpath);
+    } else {
+      env.put("CLASSPATH", classpath + SYSTEM_PATH_SEPARATOR + cp);
+    }
+
+    /////// Environmental variable LD_LIBRARY_PATH
+    StringBuilder ldLibraryPath = new StringBuilder();
+
+    ldLibraryPath.append(nmLdLibraryPath);
+    ldLibraryPath.append(SYSTEM_PATH_SEPARATOR);
+    ldLibraryPath.append(pwd);
+    env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
+    /////// Environmental variable LD_LIBRARY_PATH
+
+    // for the child of task jvm, set hadoop.root.logger
+    env.put("HADOOP_ROOT_LOGGER", "INFO,TLA");
+
+    // TODO: The following is useful for instance in streaming tasks. Should be
+    // set in ApplicationMaster's env by the RM.
+    String hadoopClientOpts = System.getenv("HADOOP_CLIENT_OPTS");
+    if (hadoopClientOpts == null) {
+      hadoopClientOpts = "";
+    } else {
+      hadoopClientOpts = hadoopClientOpts + " ";
+    }
+    // FIXME: don't think this is also needed given we already set java
+    // properties.
+    long logSize = TaskLog.getTaskLogLength(conf);
+    hadoopClientOpts = hadoopClientOpts + "-Dhadoop.tasklog.taskid=" + task.getTaskID()
+                       + " -Dhadoop.tasklog.iscleanup=" + task.isTaskCleanupTask()
+                       + " -Dhadoop.tasklog.totalLogFileSize=" + logSize;
+    env.put("HADOOP_CLIENT_OPTS", hadoopClientOpts);
+
+    // add the env variables passed by the user
+    String mapredChildEnv = getChildEnv(conf, task.isMapTask());
+    if (mapredChildEnv != null && mapredChildEnv.length() > 0) {
+      String childEnvs[] = mapredChildEnv.split(",");
+      for (String cEnv : childEnvs) {
+        String[] parts = cEnv.split("="); // split on '='
+        String value = (String) env.get(parts[0]);
+        if (value != null) {
+          // replace $env with the child's env constructed by tt's
+          // example LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp
+          value = parts[1].replace("$" + parts[0], value);
+        } else {
+          // this key is not configured by the tt for the child .. get it 
+          // from the tt's env
+          // example PATH=$PATH:/tmp
+          value = System.getenv(parts[0]); // Get from NM?
+          if (value != null) {
+            // the env key is present in the tt's env
+            value = parts[1].replace("$" + parts[0], value);
+          } else {
+            // the env key is note present anywhere .. simply set it
+            // example X=$X:/tmp or X=/tmp
+            value = parts[1].replace("$" + parts[0], "");
+          }
+        }
+        env.put(parts[0], value);
+      }
+    }
+
+    // TODO: Put a random pid in env for now.
+    // Long term we will need to get it from the Child
+    env.put("JVM_PID", "12344");
+
+    env.put(Constants.HADOOP_WORK_DIR, "."); // This should work. TODO: Find
+                                              // why the var is introduced. Not
+                                              // used in tests, for e.g.
+  }
+
+  private static String getChildJavaOpts(JobConf jobConf, boolean isMapTask) {
+    if (isMapTask) {
+      return jobConf.get(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, jobConf.get(
+          JobConf.MAPRED_TASK_JAVA_OPTS,
+          JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS));
+    }
+    return jobConf
+        .get(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, jobConf.get(
+            JobConf.MAPRED_TASK_JAVA_OPTS,
+            JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS));
+  }
+
+  private static void setupLog4jProperties(Vector<CharSequence> vargs,
+      long logSize, String hadoopLogDir, Task task) {
+    vargs.add("-Dhadoop.log.dir=" + hadoopLogDir);
+    vargs.add("-Dhadoop.root.logger=DEBUG,TLA");
+    vargs.add("-Dhadoop.tasklog.taskid=" + task.getTaskID());
+    vargs.add("-Dhadoop.tasklog.iscleanup=" + task.isTaskCleanupTask());
+    vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
+  }
+
+  public static List<CharSequence> getVMCommand(
+      InetSocketAddress taskAttemptListenerAddr, Task task, String javaHome,
+      String workDir, String logDir, String childTmpDir, ID jvmID) {
+
+    TaskAttemptID attemptID = task.getTaskID();
+    JobConf conf = task.conf;
+
+    Vector<CharSequence> vargs = new Vector<CharSequence>(8);
+
+    vargs.add(javaHome + "/bin/java");
+
+    // Add child (task) java-vm options.
+    //
+    // The following symbols if present in mapred.{map|reduce}.child.java.opts 
+    // value are replaced:
+    // + @taskid@ is interpolated with value of TaskID.
+    // Other occurrences of @ will not be altered.
+    //
+    // Example with multiple arguments and substitutions, showing
+    // jvm GC logging, and start of a passwordless JVM JMX agent so can
+    // connect with jconsole and the likes to watch child memory, threads
+    // and get thread dumps.
+    //
+    //  <property>
+    //    <name>mapred.map.child.java.opts</name>
+    //    <value>-Xmx 512M -verbose:gc -Xloggc:/tmp/@taskid@.gc \
+    //           -Dcom.sun.management.jmxremote.authenticate=false \
+    //           -Dcom.sun.management.jmxremote.ssl=false \
+    //    </value>
+    //  </property>
+    //
+    //  <property>
+    //    <name>mapred.reduce.child.java.opts</name>
+    //    <value>-Xmx 1024M -verbose:gc -Xloggc:/tmp/@taskid@.gc \
+    //           -Dcom.sun.management.jmxremote.authenticate=false \
+    //           -Dcom.sun.management.jmxremote.ssl=false \
+    //    </value>
+    //  </property>
+    //
+    String javaOpts = getChildJavaOpts(conf, task.isMapTask());
+    javaOpts = javaOpts.replace("@taskid@", attemptID.toString());
+    String [] javaOptsSplit = javaOpts.split(" ");
+    
+    // Add java.library.path; necessary for loading native libraries.
+    //
+    // 1. We add the 'cwd' of the task to it's java.library.path to help 
+    //    users distribute native libraries via the DistributedCache.
+    // 2. The user can also specify extra paths to be added to the 
+    //    java.library.path via mapred.{map|reduce}.child.java.opts.
+    //
+    String libraryPath = workDir;
+    boolean hasUserLDPath = false;
+    for(int i=0; i<javaOptsSplit.length ;i++) { 
+      if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {
+        // TODO: Does the above take care of escaped space chars
+        javaOptsSplit[i] += SYSTEM_PATH_SEPARATOR + libraryPath;
+        hasUserLDPath = true;
+        break;
+      }
+    }
+    if(!hasUserLDPath) {
+      vargs.add("-Djava.library.path=" + libraryPath);
+    }
+    for (int i = 0; i < javaOptsSplit.length; i++) {
+      vargs.add(javaOptsSplit[i]);
+    }
+
+    if (childTmpDir != null) {
+      vargs.add("-Djava.io.tmpdir=" + childTmpDir);
+    }
+
+    // Setup the log4j prop
+    long logSize = TaskLog.getTaskLogLength(conf);
+    setupLog4jProperties(vargs, logSize, logDir, task);
+
+    if (conf.getProfileEnabled()) {
+      if (conf.getProfileTaskRange(task.isMapTask()
+                                   ).isIncluded(task.getPartition())) {
+        File prof = getTaskLogFile(logDir, attemptID, task.isTaskCleanupTask(),
+            TaskLog.LogName.PROFILE);
+        vargs.add(String.format(conf.getProfileParams(), prof.toString()));
+      }
+    }
+
+    // Add main class and its arguments 
+    vargs.add(YarnChild.class.getName());  // main of Child
+    // pass TaskAttemptListener's address
+    vargs.add(taskAttemptListenerAddr.getAddress().getHostAddress()); 
+    vargs.add(Integer.toString(taskAttemptListenerAddr.getPort())); 
+    vargs.add(attemptID.toString());                      // pass task identifier
+    // pass task log location
+    // TODO: The following API uses system property hadoop.log.dir
+    String attemptLogDir = getAttemptDir(logDir, attemptID, task.isTaskCleanupTask()).toString();
+    vargs.add(attemptLogDir);
+
+    // Finally add the jvmID
+    vargs.add(String.valueOf(jvmID.getId()));
+    vargs.add("1>"
+        + getTaskLogFile(logDir, attemptID, task.isTaskCleanupTask(),
+            TaskLog.LogName.STDERR));
+    vargs.add("2>"
+        + getTaskLogFile(logDir, attemptID, task.isTaskCleanupTask(),
+            TaskLog.LogName.STDOUT));
+
+    // Final commmand
+    StringBuilder mergedCommand = new StringBuilder();
+    for (CharSequence str : vargs) {
+      mergedCommand.append(str).append(" ");
+    }
+    Vector<CharSequence> vargsFinal = new Vector<CharSequence>(8);
+    vargsFinal.add("mkdir work; mkdir -p " + attemptLogDir + "; "
+        + mergedCommand.toString());
+    return vargsFinal;
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapTaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapTaskAttemptImpl.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapTaskAttemptImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapTaskAttemptImpl.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,68 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
+import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+
+public class MapTaskAttemptImpl extends TaskAttemptImpl {
+
+  private final TaskSplitMetaInfo splitInfo;
+
+  public MapTaskAttemptImpl(TaskID taskId, int attempt, 
+      EventHandler eventHandler, Path jobFile, 
+      int partition, TaskSplitMetaInfo splitInfo, Configuration conf,
+      TaskAttemptListener taskAttemptListener, 
+      OutputCommitter committer, Token<JobTokenIdentifier> jobToken,
+      Collection<Token<? extends TokenIdentifier>> fsTokens) {
+    super(taskId, attempt, eventHandler, 
+        taskAttemptListener, jobFile, partition, conf, splitInfo.getLocations(),
+        committer, jobToken, fsTokens);
+    this.splitInfo = splitInfo;
+  }
+
+  @Override
+  public Task createRemoteTask() {
+    //job file name is set in TaskAttempt, setting it null here
+    MapTask mapTask =
+      new MapTask(null, TypeConverter.fromYarn(getID()), partition,
+          splitInfo.getSplitIndex(), 1); // YARN doesn't have the concept of slots per task, set it as 1.
+    mapTask.setUser(conf.get(MRJobConfig.USER_NAME));
+    mapTask.setConf(conf);
+    return mapTask;
+  }
+
+  @Override
+  protected int getPriority() {
+    return 1;
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ReduceTaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ReduceTaskAttemptImpl.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ReduceTaskAttemptImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ReduceTaskAttemptImpl.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,69 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
+import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+
+
+public class ReduceTaskAttemptImpl extends TaskAttemptImpl {
+
+  private final int numMapTasks;
+
+  public ReduceTaskAttemptImpl(TaskID id, int attempt,
+      EventHandler eventHandler, Path jobFile, int partition,
+      int numMapTasks, Configuration conf,
+      TaskAttemptListener taskAttemptListener, OutputCommitter committer,
+      Token<JobTokenIdentifier> jobToken,
+      Collection<Token<? extends TokenIdentifier>> fsTokens) {
+    super(id, attempt, eventHandler, taskAttemptListener, jobFile, partition,
+        conf, new String[] {}, committer, jobToken, fsTokens);
+    this.numMapTasks = numMapTasks;
+  }
+
+  @Override
+  public Task createRemoteTask() {
+  //job file name is set in TaskAttempt, setting it null here
+    ReduceTask reduceTask =
+      new ReduceTask(null, TypeConverter.fromYarn(getID()), partition,
+          numMapTasks, 1); // YARN doesn't have the concept of slots per task, set it as 1.
+  reduceTask.setUser(conf.get(MRJobConfig.USER_NAME));
+  reduceTask.setConf(conf);
+    return reduceTask;
+  }
+
+  @Override
+  protected int getPriority() {
+    return 2;
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,410 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hadoop.mapred.SortedRanges.Range;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.mapreduce.v2.api.TaskType;
+
+/**
+ * This class is responsible for talking to the task umblical.
+ * It also converts all the old data structures
+ * to yarn data structures.
+ * 
+ * This class HAS to be in this package to access package private 
+ * methods/classes.
+ */
+public class TaskAttemptListenerImpl extends CompositeService 
+    implements TaskUmbilicalProtocol, TaskAttemptListener {
+
+  private static final Log LOG = LogFactory.getLog(TaskAttemptListenerImpl.class);
+
+  private AppContext context;
+  private Server server;
+  private TaskHeartbeatHandler taskHeartbeatHandler;
+  private InetSocketAddress address;
+  private Map<WrappedJvmID, org.apache.hadoop.mapred.Task> jvmIDToAttemptMap = 
+    Collections.synchronizedMap(new HashMap<WrappedJvmID, 
+        org.apache.hadoop.mapred.Task>());
+  private JobTokenSecretManager jobTokenSecretManager = null;
+  
+  public TaskAttemptListenerImpl(AppContext context,
+      JobTokenSecretManager jobTokenSecretManager) {
+    super(TaskAttemptListenerImpl.class.getName());
+    this.context = context;
+    this.jobTokenSecretManager = jobTokenSecretManager;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+   registerHeartbeatHandler();
+   super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    startRpcServer();
+    super.start();
+  }
+
+  protected void registerHeartbeatHandler() {
+    taskHeartbeatHandler = new TaskHeartbeatHandler(context.getEventHandler());
+    addService(taskHeartbeatHandler);
+  }
+
+  protected void startRpcServer() {
+    Configuration conf = getConfig();
+    try {
+      server =
+          RPC.getServer(TaskUmbilicalProtocol.class, this, "0.0.0.0", 0, 1,
+              false, conf, jobTokenSecretManager);
+      server.start();
+      InetSocketAddress listenerAddress = server.getListenerAddress();
+      this.address =
+          NetUtils.createSocketAddr(listenerAddress.getAddress()
+              .getLocalHost().getCanonicalHostName()
+              + ":" + listenerAddress.getPort());
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+  }
+
+  @Override
+  public void stop() {
+    stopRpcServer();
+    super.stop();
+  }
+
+  protected void stopRpcServer() {
+    server.stop();
+  }
+
+  @Override
+  public InetSocketAddress getAddress() {
+    return address;
+  }
+
+  /**
+   * Child checking whether it can commit.
+   * 
+   * <br/>
+   * Commit is a two-phased protocol. First the attempt informs the
+   * ApplicationMaster that it is
+   * {@link #commitPending(TaskAttemptID, TaskStatus)}. Then it repeatedly polls
+   * the ApplicationMaster whether it {@link #canCommit(TaskAttemptID)} This is
+   * a legacy from the centralized commit protocol handling by the JobTracker.
+   */
+  @Override
+  public boolean canCommit(TaskAttemptID taskAttemptID) throws IOException {
+    LOG.info("Commit go/no-go request from " + taskAttemptID.toString());
+    // An attempt is asking if it can commit its output. This can be decided
+    // only by the task which is managing the multiple attempts. So redirect the
+    // request there.
+    org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID =
+        TypeConverter.toYarn(taskAttemptID);
+
+    taskHeartbeatHandler.receivedPing(attemptID);
+
+    Job job = context.getJob(attemptID.taskID.jobID);
+    Task task = job.getTask(attemptID.taskID);
+    return task.canCommit(attemptID);
+  }
+
+  /**
+   * TaskAttempt is reporting that it is in commit_pending and it is waiting for
+   * the commit Response
+   * 
+   * <br/>
+   * Commit it a two-phased protocol. First the attempt informs the
+   * ApplicationMaster that it is
+   * {@link #commitPending(TaskAttemptID, TaskStatus)}. Then it repeatedly polls
+   * the ApplicationMaster whether it {@link #canCommit(TaskAttemptID)} This is
+   * a legacy from the centralized commit protocol handling by the JobTracker.
+   */
+  @Override
+  public void commitPending(TaskAttemptID taskAttemptID, TaskStatus taskStatsu)
+          throws IOException, InterruptedException {
+    LOG.info("Commit-pending state update from " + taskAttemptID.toString());
+    // An attempt is asking if it can commit its output. This can be decided
+    // only by the task which is managing the multiple attempts. So redirect the
+    // request there.
+    org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID =
+        TypeConverter.toYarn(taskAttemptID);
+
+    taskHeartbeatHandler.receivedPing(attemptID);
+
+    context.getEventHandler().handle(
+        new TaskAttemptEvent(attemptID, 
+            TaskAttemptEventType.TA_COMMIT_PENDING));
+  }
+
+  @Override
+  public void done(TaskAttemptID taskAttemptID) throws IOException {
+    LOG.info("Done acknowledgement from " + taskAttemptID.toString());
+
+    org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID =
+        TypeConverter.toYarn(taskAttemptID);
+
+    taskHeartbeatHandler.receivedPing(attemptID);
+
+    context.getEventHandler().handle(
+        new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
+  }
+
+  @Override
+  public void fatalError(TaskAttemptID taskAttemptID, String msg)
+      throws IOException {
+    // This happens only in Child and in the Task.
+    LOG.fatal("Task: " + taskAttemptID + " - exited : " + msg);
+    reportDiagnosticInfo(taskAttemptID, "Error: " + msg);
+
+    org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID =
+        TypeConverter.toYarn(taskAttemptID);
+    context.getEventHandler().handle(
+        new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
+  }
+
+  @Override
+  public void fsError(TaskAttemptID taskAttemptID, String message)
+      throws IOException {
+    // This happens only in Child.
+    LOG.fatal("Task: " + taskAttemptID + " - failed due to FSError: "
+        + message);
+    reportDiagnosticInfo(taskAttemptID, "FSError: " + message);
+
+    org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID =
+        TypeConverter.toYarn(taskAttemptID);
+    context.getEventHandler().handle(
+        new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
+  }
+
+  @Override
+  public void shuffleError(TaskAttemptID taskAttemptID, String message) throws IOException {
+    // TODO: This isn't really used in any MR code. Ask for removal.    
+  }
+
+  @Override
+  public MapTaskCompletionEventsUpdate getMapCompletionEvents(
+      JobID jobIdentifier, int fromEventId, int maxEvents,
+      TaskAttemptID taskAttemptID) throws IOException {
+    LOG.info("MapCompletionEvents request from " + taskAttemptID.toString()
+        + ". fromEventID " + fromEventId + " maxEvents " + maxEvents);
+
+    // TODO: shouldReset is never used. See TT. Ask for Removal.
+    boolean shouldReset = false;
+    org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID =
+      TypeConverter.toYarn(taskAttemptID);
+    org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent[] events =
+        context.getJob(attemptID.taskID.jobID).getTaskAttemptCompletionEvents(
+            fromEventId, maxEvents);
+
+    taskHeartbeatHandler.receivedPing(attemptID);
+
+    //filter the events to return only map completion events in old format
+    List<TaskCompletionEvent> mapEvents = new ArrayList<TaskCompletionEvent>();
+    for (org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent event : events) {
+      if (TaskType.MAP.equals(event.attemptId.taskID.taskType)) {
+        mapEvents.add(TypeConverter.fromYarn(event));
+      }
+    }
+    
+    return new MapTaskCompletionEventsUpdate(
+        mapEvents.toArray(new TaskCompletionEvent[0]), shouldReset);
+  }
+
+  @Override
+  public boolean ping(TaskAttemptID taskAttemptID) throws IOException {
+    LOG.info("Ping from " + taskAttemptID.toString());
+    taskHeartbeatHandler.receivedPing(TypeConverter.toYarn(taskAttemptID));
+    return true;
+  }
+
+  @Override
+  public void reportDiagnosticInfo(TaskAttemptID taskAttemptID, String diagnosticInfo)
+ throws IOException {
+    LOG.info("Diagnostics report from " + taskAttemptID.toString() + ": "
+        + diagnosticInfo);
+
+    org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID =
+      TypeConverter.toYarn(taskAttemptID);
+    taskHeartbeatHandler.receivedPing(attemptID);
+
+    // This is mainly used for cases where we want to propagate exception traces
+    // of tasks that fail.
+
+    // This call exists as a hadoop mapreduce legacy wherein all changes in
+    // counters/progress/phase/output-size are reported through statusUpdate()
+    // call but not diagnosticInformation.
+    context.getEventHandler().handle(
+        new TaskAttemptDiagnosticsUpdateEvent(attemptID, diagnosticInfo));
+  }
+
+  @Override
+  public boolean statusUpdate(TaskAttemptID taskAttemptID,
+      TaskStatus taskStatus) throws IOException, InterruptedException {
+    LOG.info("Status update from " + taskAttemptID.toString());
+    org.apache.hadoop.mapreduce.v2.api.TaskAttemptID yarnAttemptID =
+        TypeConverter.toYarn(taskAttemptID);
+    taskHeartbeatHandler.receivedPing(yarnAttemptID);
+    TaskAttemptStatus taskAttemptStatus =
+        new TaskAttemptStatus();
+    taskAttemptStatus.id = yarnAttemptID;
+    // Task sends the updated progress to the TT.
+    taskAttemptStatus.progress = taskStatus.getProgress();
+    LOG.info("Progress of TaskAttempt " + taskAttemptID + " is : "
+        + taskStatus.getProgress());
+    // Task sends the diagnostic information to the TT
+    taskAttemptStatus.diagnosticInfo = taskStatus.getDiagnosticInfo();
+    // Task sends the updated state-string to the TT.
+    taskAttemptStatus.stateString = taskStatus.getStateString();
+    // Set the output-size when map-task finishes. Set by the task itself.
+    taskAttemptStatus.outputSize = taskStatus.getOutputSize();
+    // Task sends the updated phase to the TT.
+    taskAttemptStatus.phase = TypeConverter.toYarn(taskStatus.getPhase());
+    // Counters are updated by the task.
+    taskAttemptStatus.counters =
+        TypeConverter.toYarn(taskStatus.getCounters());
+
+    //set the fetch failures
+    if (taskStatus.getFetchFailedMaps() != null 
+        && taskStatus.getFetchFailedMaps().size() > 0) {
+      taskAttemptStatus.fetchFailedMaps = 
+        new ArrayList<org.apache.hadoop.mapreduce.v2.api.TaskAttemptID>();
+      for (TaskAttemptID failedMapId : taskStatus.getFetchFailedMaps()) {
+        taskAttemptStatus.fetchFailedMaps.add(
+            TypeConverter.toYarn(failedMapId));
+      }
+    }
+
+    // Task sends the information about the nextRecordRange to the TT
+
+//    TODO: The following are not needed here, but needed to be set somewhere inside AppMaster.
+//    taskStatus.getRunState(); // Set by the TT/JT. Transform into a state TODO
+//    taskStatus.getStartTime(); // Used to be set by the TaskTracker. This should be set by getTask().
+//    taskStatus.getFinishTime(); // Used to be set by TT/JT. Should be set when task finishes
+//    // This was used by TT to do counter updates only once every minute. So this
+//    // isn't ever changed by the Task itself.
+//    taskStatus.getIncludeCounters();
+
+    context.getEventHandler().handle(
+        new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id,
+            taskAttemptStatus));
+    return true;
+  }
+
+  @Override
+  public long getProtocolVersion(String arg0, long arg1) throws IOException {
+    return TaskUmbilicalProtocol.versionID;
+  }
+
+  @Override
+  public void reportNextRecordRange(TaskAttemptID taskAttemptID, Range range)
+      throws IOException {
+    // This is used when the feature of skipping records is enabled.
+
+    // This call exists as a hadoop mapreduce legacy wherein all changes in
+    // counters/progress/phase/output-size are reported through statusUpdate()
+    // call but not the next record range information.
+    throw new IOException("Not yet implemented.");
+  }
+
+  @Override
+  public JvmTask getTask(JvmContext context) throws IOException {
+
+    // A rough imitation of code from TaskTracker.
+
+    JVMId jvmId = context.jvmId;
+    LOG.info("JVM with ID : " + jvmId + " asked for a task");
+
+    // TODO: Is it an authorised container to get a task? Otherwise return null.
+
+    // TODO: Is the request for task-launch still valid?
+
+    // TODO: Child.java's firstTaskID isn't really firstTaskID. Ask for update
+    // to jobId and task-type.
+
+    WrappedJvmID wJvmID = new WrappedJvmID(jvmId.getJobId(), jvmId.isMap,
+        jvmId.getId());
+    org.apache.hadoop.mapred.Task task = jvmIDToAttemptMap.get(wJvmID);
+    if (task != null) { //there may be lag in the attempt getting added here
+      LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
+      JvmTask jvmTask = new JvmTask(task, false);
+      
+      //remove the task as it is no more needed and free up the memory
+      jvmIDToAttemptMap.remove(wJvmID);
+      
+      return jvmTask;
+    }
+    return new JvmTask(null, false);
+  }
+
+  @Override
+  public void updatePrivateDistributedCacheSizes(
+      org.apache.hadoop.mapreduce.JobID jobId, long[] sizes) throws IOException {
+    // TODO Auto-generated method stub
+  }
+
+  @Override
+  public void register(org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID,
+      org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {
+    //create the mapping so that it is easy to look up
+    //when it comes back to ask for Task.
+    jvmIDToAttemptMap.put(jvmID, task);
+    //register this attempt
+    taskHeartbeatHandler.register(attemptID);
+  }
+
+  @Override
+  public void unregister(org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID,
+      WrappedJvmID jvmID) {
+    //remove the mapping if not already removed
+    jvmIDToAttemptMap.remove(jvmID);
+
+    //unregister this attempt
+    taskHeartbeatHandler.unregister(attemptID);
+  }
+
+}