You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/03/17 21:21:54 UTC
svn commit: r1082677 [2/38] - in /hadoop/mapreduce/branches/MR-279: ./
assembly/ ivy/ mr-client/ mr-client/hadoop-mapreduce-client-app/
mr-client/hadoop-mapreduce-client-app/src/
mr-client/hadoop-mapreduce-client-app/src/main/ mr-client/hadoop-mapreduc...
Modified: hadoop/mapreduce/branches/MR-279/.gitignore
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/.gitignore?rev=1082677&r1=1082676&r2=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/.gitignore (original)
+++ hadoop/mapreduce/branches/MR-279/.gitignore Thu Mar 17 20:21:13 2011
@@ -42,3 +42,7 @@ src/docs/build
src/docs/cn/build
src/docs/cn/src/documentation/sitemap.xmap
src/docs/cn/uming.conf
+.gitignore
+target
+SecurityAuth.audit
+conf/yarn-site.xml
Added: hadoop/mapreduce/branches/MR-279/INSTALL
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/INSTALL?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/INSTALL (added)
+++ hadoop/mapreduce/branches/MR-279/INSTALL Thu Mar 17 20:21:13 2011
@@ -0,0 +1,61 @@
+To compile Hadoop Mapreduce next following, do the following:
+
+Step 1) Download Hadoop Common
+
+svn checkout http://svn.apache.org/repos/asf/hadoop/common/branches/yahoo-merge/
+ant veryclean mvn-install
+
+Step 2) Download Hadoop HDFS
+
+svn checkout http://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1052/
+ant veryclean mvn-install -Dresolvers=internal
+
+Step 3) Go to the root directory of hadoop mapreduce
+
+Step 4) Run
+
+mvn clean install assembly:assembly
+ant veryclean jar jar-test -Dresolvers=internal
+
+In case you want to skip the tests run:
+
+mvn clean install assembly:assembly -Dmaven.test.skip.exec=true
+ant veryclean jar jar-test -Dresolvers=internal
+
+You will see a tarball in
+ls target/hadoop-mapreduce-1.0-SNAPSHOT-bin.tar.gz
+
+Step 5) Untar the tarball in a clean and different directory.
+say HADOOP_YARN_INSTALL
+
+To run Hadoop Mapreduce next applications :
+
+Step 6) cd $HADOOP_YARN_INSTALL
+
+Step 7) export the following variables:
+
+HADOOP_MAPRED_HOME=
+HADOOP_COMMON_HOME=
+HADOOP_HDFS_HOME=
+YARN_HOME=directory where you untarred yarn
+HADOOP_CONF_DIR=
+YARN_CONF_DIR=$HADOOP_CONF_DIR
+
+Step 8) bin/yarn-daemon.sh start resourcemanager
+
+Step 9) bin/yarn-daemon.sh start nodemanager
+
+Step 10) Create the following symlinks in hadoop-common/lib
+
+ln -s $HADOOP_YARN_INSTALL/modules/hadoop-mapreduce-client-app-1.0-SNAPSHOT.jar .
+ln -s $HADOOP_YARN_INSTALL/modules/yarn-api-1.0-SNAPSHOT.jar .
+ln -s $HADOOP_YARN_INSTALL/modules/hadoop-mapreduce-client-common-1.0-SNAPSHOT.jar .
+ln -s $HADOOP_YARN_INSTALL/modules/yarn-common-1.0-SNAPSHOT.jar .
+ln -s $HADOOP_YARN_INSTALL/modules/hadoop-mapreduce-client-core-1.0-SNAPSHOT.jar .
+ln -s $HADOOP_YARN_INSTALL/modules/yarn-server-common-1.0-SNAPSHOT.jar .
+
+Step 11) You are all set, an example on how to run a job is:
+
+$HADOOP_COMMON_HOME/bin/hadoop jar $HADOOP_MAPRED_HOME/build/hadoop-mapred-examples-0.22.0-SNAPSHOT.jar randomwriter -Dmapreduce.job.user.name=$USER -Dmapreduce.randomwriter.bytespermap=10000 -Ddfs.blocksize=536870912 -Ddfs.block.size=536870912 -libjars $HADOOP_YARN_INSTALL/hadoop-mapreduce-1.0-SNAPSHOT/modules/hadoop-mapreduce-client-jobclient-1.0-SNAPSHOT.jar output
+
+
Added: hadoop/mapreduce/branches/MR-279/assembly/all.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/assembly/all.xml?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/assembly/all.xml (added)
+++ hadoop/mapreduce/branches/MR-279/assembly/all.xml Thu Mar 17 20:21:13 2011
@@ -0,0 +1,59 @@
+
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+ <id>bin</id>
+ <formats>
+ <format>tar.gz</format>
+ </formats>
+ <includeBaseDirectory>true</includeBaseDirectory>
+ <!-- TODO: this layout is wrong. We need module specific bin files in module specific dirs -->
+ <fileSets>
+ <fileSet>
+ <directory>yarn/yarn-server/yarn-server-nodemanager/target/classes/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <includes>
+ <include>container-executor</include>
+ </includes>
+ <fileMode>0755</fileMode>
+ </fileSet>
+ <fileSet>
+ <directory>yarn/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <includes>
+ <include>*</include>
+ </includes>
+ <fileMode>0755</fileMode>
+ </fileSet>
+ <fileSet>
+ <directory>yarn/conf</directory>
+ <outputDirectory>conf</outputDirectory>
+ <includes>
+ <include>**/*</include>
+ </includes>
+ </fileSet>
+ </fileSets>
+ <moduleSets>
+ <moduleSet>
+ <excludes>
+ <exclude>org.apache.hadoop:yarn-server-tests</exclude>
+ </excludes>
+ <binaries>
+ <outputDirectory>modules</outputDirectory>
+ <includeDependencies>false</includeDependencies>
+ <unpack>false</unpack>
+ </binaries>
+ </moduleSet>
+ </moduleSets>
+ <dependencySets>
+ <dependencySet>
+ <useProjectArtifact>false</useProjectArtifact>
+ <outputDirectory>/lib</outputDirectory>
+ <!-- Exclude hadoop artifacts. They will be found via HADOOP* env -->
+ <excludes>
+ <exclude>org.apache.hadoop:hadoop-common</exclude>
+ <exclude>org.apache.hadoop:hadoop-hdfs</exclude>
+ </excludes>
+ </dependencySet>
+ </dependencySets>
+</assembly>
Modified: hadoop/mapreduce/branches/MR-279/build.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/build.xml?rev=1082677&r1=1082676&r2=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/build.xml (original)
+++ hadoop/mapreduce/branches/MR-279/build.xml Thu Mar 17 20:21:13 2011
@@ -1354,10 +1354,8 @@
</target>
<target name="clean-cache" description="Clean. Delete ivy cache">
- <delete dir="${user.home}/.ivy2/cache/org.apache.hadoop/hadoop-common"/>
- <delete dir="${user.home}/.ivy2/cache/org.apache.hadoop/hadoop-common-test"/>
- <delete dir="${user.home}/.ivy2/cache/org.apache.hadoop/hadoop-hdfs"/>
- <delete dir="${user.home}/.ivy2/cache/org.apache.hadoop/hadoop-hdfs-test"/>
+ <delete dir="${user.home}/.ivy2/cache/org.apache.hadoop"/>
+ <delete dir="${user.home}/.ivy2/cache/org.apache.hadoop.mapreduce"/>
</target>
<target name="mvn-install-mapred" depends="mvn-taskdef,examples,tools,set-version">
Modified: hadoop/mapreduce/branches/MR-279/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/ivy.xml?rev=1082677&r1=1082676&r2=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/ivy.xml (original)
+++ hadoop/mapreduce/branches/MR-279/ivy.xml Thu Mar 17 20:21:13 2011
@@ -72,6 +72,9 @@
<dependency org="checkstyle" name="checkstyle" rev="${checkstyle.version}"
conf="checkstyle->default"/>
+ <dependency org="org.apache.hadoop" name="hadoop-mapreduce-client-core"
+ rev="${hadoop-mapreduce-client-core.version}" conf="common->default"/>
+
<dependency org="jdiff" name="jdiff" rev="${jdiff.version}"
conf="jdiff->default"/>
<dependency org="xerces" name="xerces" rev="${xerces.version}"
Modified: hadoop/mapreduce/branches/MR-279/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/ivy/libraries.properties?rev=1082677&r1=1082676&r2=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/ivy/libraries.properties (original)
+++ hadoop/mapreduce/branches/MR-279/ivy/libraries.properties Thu Mar 17 20:21:13 2011
@@ -32,3 +32,6 @@ jdiff.version=1.0.9
rats-lib.version=0.6
xerces.version=1.4.4
+
+yarn.version=1.0-SNAPSHOT
+hadoop-mapreduce-client-core.version=1.0-SNAPSHOT
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/pom.xml?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/pom.xml (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/pom.xml Thu Mar 17 20:21:13 2011
@@ -0,0 +1,100 @@
+<?xml version="1.0"?>
+<project>
+ <parent>
+ <artifactId>hadoop-mapreduce-client</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ <version>${yarn.version}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-app</artifactId>
+ <name>hadoop-mapreduce-client-app</name>
+ <version>${yarn.version}</version>
+ <url>http://maven.apache.org</url>
+
+ <dependencies>
+ <!-- begin MNG-4223 workaround -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>yarn-api</artifactId>
+ <version>${yarn.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>yarn-common</artifactId>
+ <version>${yarn.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>yarn-server</artifactId>
+ <version>${yarn.version}</version>
+ <type>pom</type>
+ <scope>test</scope>
+ </dependency>
+ <!-- end MNG-4223 workaround -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>yarn-common</artifactId>
+ <version>${yarn.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ <version>${yarn.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>yarn-server-common</artifactId>
+ <version>${yarn.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>yarn-server-nodemanager</artifactId>
+ <version>${yarn.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>yarn-server-resourcemanager</artifactId>
+ <version>${yarn.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.3.1</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ <phase>test-compile</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>build-classpath</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>build-classpath</goal>
+ </goals>
+ <configuration>
+ <outputFile>target/classes/mrapp-generated-classpath</outputFile>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/FailingMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/FailingMapper.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/FailingMapper.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/FailingMapper.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,44 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * Fails the Mapper. First attempt throws exception. Rest do System.exit.
+ *
+ */
+public class FailingMapper extends Mapper<Text, Text, Text, Text> {
+ public void map(Text key, Text value,
+ Context context) throws IOException,InterruptedException {
+ if (context.getTaskAttemptID().getId() == 0) {
+ System.out.println("Attempt:" + context.getTaskAttemptID() +
+ " Failing mapper throwing exception");
+ throw new IOException("Attempt:" + context.getTaskAttemptID() +
+ " Failing mapper throwing exception");
+ } else {
+ System.out.println("Attempt:" + context.getTaskAttemptID() +
+ " Exiting");
+ System.exit(-1);
+ }
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/RandomTextWriterJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/RandomTextWriterJob.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/RandomTextWriterJob.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/RandomTextWriterJob.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,758 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class RandomTextWriterJob extends Configured implements Tool {
+
+ public static final String TOTAL_BYTES =
+ "mapreduce.randomtextwriter.totalbytes";
+ public static final String BYTES_PER_MAP =
+ "mapreduce.randomtextwriter.bytespermap";
+ public static final String MAX_VALUE = "mapreduce.randomtextwriter.maxwordsvalue";
+ public static final String MIN_VALUE = "mapreduce.randomtextwriter.minwordsvalue";
+ public static final String MIN_KEY = "mapreduce.randomtextwriter.minwordskey";
+ public static final String MAX_KEY = "mapreduce.randomtextwriter.maxwordskey";
+
+ static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
+
+ public Job createJob(Configuration conf) throws IOException {
+ long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, 10 * 1024);
+ long totalBytesToWrite = conf.getLong(TOTAL_BYTES, numBytesToWritePerMap);
+ int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
+ if (numMaps == 0 && totalBytesToWrite > 0) {
+ numMaps = 1;
+ conf.setLong(BYTES_PER_MAP, totalBytesToWrite);
+ }
+ conf.setInt(MRJobConfig.NUM_MAPS, numMaps);
+
+ Job job = new Job(conf);
+
+ job.setJarByClass(RandomTextWriterJob.class);
+ job.setJobName("random-text-writer");
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+
+ job.setInputFormatClass(RandomInputFormat.class);
+ job.setMapperClass(RandomTextMapper.class);
+
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ //FileOutputFormat.setOutputPath(job, new Path("random-output"));
+ job.setNumReduceTasks(0);
+ return job;
+ }
+
+ public static class RandomInputFormat extends InputFormat<Text, Text> {
+
+ /**
+ * Generate the requested number of file splits, with the filename
+ * set to the filename of the output file.
+ */
+ public List<InputSplit> getSplits(JobContext job) throws IOException {
+ List<InputSplit> result = new ArrayList<InputSplit>();
+ Path outDir = FileOutputFormat.getOutputPath(job);
+ int numSplits =
+ job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
+ for(int i=0; i < numSplits; ++i) {
+ result.add(new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1,
+ (String[])null));
+ }
+ return result;
+ }
+
+ /**
+ * Return a single record (filename, "") where the filename is taken from
+ * the file split.
+ */
+ public static class RandomRecordReader extends RecordReader<Text, Text> {
+ Path name;
+ Text key = null;
+ Text value = new Text();
+ public RandomRecordReader(Path p) {
+ name = p;
+ }
+
+ public void initialize(InputSplit split,
+ TaskAttemptContext context)
+ throws IOException, InterruptedException {
+
+ }
+
+ public boolean nextKeyValue() {
+ if (name != null) {
+ key = new Text();
+ key.set(name.getName());
+ name = null;
+ return true;
+ }
+ return false;
+ }
+
+ public Text getCurrentKey() {
+ return key;
+ }
+
+ public Text getCurrentValue() {
+ return value;
+ }
+
+ public void close() {}
+
+ public float getProgress() {
+ return 0.0f;
+ }
+ }
+
+ public RecordReader<Text, Text> createRecordReader(InputSplit split,
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ return new RandomRecordReader(((FileSplit) split).getPath());
+ }
+ }
+
+ public static class RandomTextMapper extends Mapper<Text, Text, Text, Text> {
+
+ private long numBytesToWrite;
+ private int minWordsInKey;
+ private int wordsInKeyRange;
+ private int minWordsInValue;
+ private int wordsInValueRange;
+ private Random random = new Random();
+
+ /**
+ * Save the configuration value that we need to write the data.
+ */
+ public void setup(Context context) {
+ Configuration conf = context.getConfiguration();
+ numBytesToWrite = conf.getLong(BYTES_PER_MAP,
+ 1*1024*1024*1024);
+ minWordsInKey = conf.getInt(MIN_KEY, 5);
+ wordsInKeyRange = (conf.getInt(MAX_KEY, 10) - minWordsInKey);
+ minWordsInValue = conf.getInt(MIN_VALUE, 10);
+ wordsInValueRange = (conf.getInt(MAX_VALUE, 100) - minWordsInValue);
+ }
+
+ /**
+ * Given an output filename, write a bunch of random records to it.
+ */
+ public void map(Text key, Text value,
+ Context context) throws IOException,InterruptedException {
+ int itemCount = 0;
+ while (numBytesToWrite > 0) {
+ // Generate the key/value
+ int noWordsKey = minWordsInKey +
+ (wordsInKeyRange != 0 ? random.nextInt(wordsInKeyRange) : 0);
+ int noWordsValue = minWordsInValue +
+ (wordsInValueRange != 0 ? random.nextInt(wordsInValueRange) : 0);
+ Text keyWords = generateSentence(noWordsKey);
+ Text valueWords = generateSentence(noWordsValue);
+
+ // Write the sentence
+ context.write(keyWords, valueWords);
+
+ numBytesToWrite -= (keyWords.getLength() + valueWords.getLength());
+
+ // Update counters, progress etc.
+ context.getCounter(Counters.BYTES_WRITTEN).increment(
+ keyWords.getLength() + valueWords.getLength());
+ context.getCounter(Counters.RECORDS_WRITTEN).increment(1);
+ if (++itemCount % 200 == 0) {
+ context.setStatus("wrote record " + itemCount + ". " +
+ numBytesToWrite + " bytes left.");
+ }
+ }
+ context.setStatus("done with " + itemCount + " records.");
+ }
+
+ private Text generateSentence(int noWords) {
+ StringBuffer sentence = new StringBuffer();
+ String space = " ";
+ for (int i=0; i < noWords; ++i) {
+ sentence.append(words[random.nextInt(words.length)]);
+ sentence.append(space);
+ }
+ return new Text(sentence.toString());
+ }
+
+ private static String[] words = {
+ "diurnalness", "Homoiousian",
+ "spiranthic", "tetragynian",
+ "silverhead", "ungreat",
+ "lithograph", "exploiter",
+ "physiologian", "by",
+ "hellbender", "Filipendula",
+ "undeterring", "antiscolic",
+ "pentagamist", "hypoid",
+ "cacuminal", "sertularian",
+ "schoolmasterism", "nonuple",
+ "gallybeggar", "phytonic",
+ "swearingly", "nebular",
+ "Confervales", "thermochemically",
+ "characinoid", "cocksuredom",
+ "fallacious", "feasibleness",
+ "debromination", "playfellowship",
+ "tramplike", "testa",
+ "participatingly", "unaccessible",
+ "bromate", "experientialist",
+ "roughcast", "docimastical",
+ "choralcelo", "blightbird",
+ "peptonate", "sombreroed",
+ "unschematized", "antiabolitionist",
+ "besagne", "mastication",
+ "bromic", "sviatonosite",
+ "cattimandoo", "metaphrastical",
+ "endotheliomyoma", "hysterolysis",
+ "unfulminated", "Hester",
+ "oblongly", "blurredness",
+ "authorling", "chasmy",
+ "Scorpaenidae", "toxihaemia",
+ "Dictograph", "Quakerishly",
+ "deaf", "timbermonger",
+ "strammel", "Thraupidae",
+ "seditious", "plerome",
+ "Arneb", "eristically",
+ "serpentinic", "glaumrie",
+ "socioromantic", "apocalypst",
+ "tartrous", "Bassaris",
+ "angiolymphoma", "horsefly",
+ "kenno", "astronomize",
+ "euphemious", "arsenide",
+ "untongued", "parabolicness",
+ "uvanite", "helpless",
+ "gemmeous", "stormy",
+ "templar", "erythrodextrin",
+ "comism", "interfraternal",
+ "preparative", "parastas",
+ "frontoorbital", "Ophiosaurus",
+ "diopside", "serosanguineous",
+ "ununiformly", "karyological",
+ "collegian", "allotropic",
+ "depravity", "amylogenesis",
+ "reformatory", "epidymides",
+ "pleurotropous", "trillium",
+ "dastardliness", "coadvice",
+ "embryotic", "benthonic",
+ "pomiferous", "figureheadship",
+ "Megaluridae", "Harpa",
+ "frenal", "commotion",
+ "abthainry", "cobeliever",
+ "manilla", "spiciferous",
+ "nativeness", "obispo",
+ "monilioid", "biopsic",
+ "valvula", "enterostomy",
+ "planosubulate", "pterostigma",
+ "lifter", "triradiated",
+ "venialness", "tum",
+ "archistome", "tautness",
+ "unswanlike", "antivenin",
+ "Lentibulariaceae", "Triphora",
+ "angiopathy", "anta",
+ "Dawsonia", "becomma",
+ "Yannigan", "winterproof",
+ "antalgol", "harr",
+ "underogating", "ineunt",
+ "cornberry", "flippantness",
+ "scyphostoma", "approbation",
+ "Ghent", "Macraucheniidae",
+ "scabbiness", "unanatomized",
+ "photoelasticity", "eurythermal",
+ "enation", "prepavement",
+ "flushgate", "subsequentially",
+ "Edo", "antihero",
+ "Isokontae", "unforkedness",
+ "porriginous", "daytime",
+ "nonexecutive", "trisilicic",
+ "morphiomania", "paranephros",
+ "botchedly", "impugnation",
+ "Dodecatheon", "obolus",
+ "unburnt", "provedore",
+ "Aktistetae", "superindifference",
+ "Alethea", "Joachimite",
+ "cyanophilous", "chorograph",
+ "brooky", "figured",
+ "periclitation", "quintette",
+ "hondo", "ornithodelphous",
+ "unefficient", "pondside",
+ "bogydom", "laurinoxylon",
+ "Shiah", "unharmed",
+ "cartful", "noncrystallized",
+ "abusiveness", "cromlech",
+ "japanned", "rizzomed",
+ "underskin", "adscendent",
+ "allectory", "gelatinousness",
+ "volcano", "uncompromisingly",
+ "cubit", "idiotize",
+ "unfurbelowed", "undinted",
+ "magnetooptics", "Savitar",
+ "diwata", "ramosopalmate",
+ "Pishquow", "tomorn",
+ "apopenptic", "Haversian",
+ "Hysterocarpus", "ten",
+ "outhue", "Bertat",
+ "mechanist", "asparaginic",
+ "velaric", "tonsure",
+ "bubble", "Pyrales",
+ "regardful", "glyphography",
+ "calabazilla", "shellworker",
+ "stradametrical", "havoc",
+ "theologicopolitical", "sawdust",
+ "diatomaceous", "jajman",
+ "temporomastoid", "Serrifera",
+ "Ochnaceae", "aspersor",
+ "trailmaking", "Bishareen",
+ "digitule", "octogynous",
+ "epididymitis", "smokefarthings",
+ "bacillite", "overcrown",
+ "mangonism", "sirrah",
+ "undecorated", "psychofugal",
+ "bismuthiferous", "rechar",
+ "Lemuridae", "frameable",
+ "thiodiazole", "Scanic",
+ "sportswomanship", "interruptedness",
+ "admissory", "osteopaedion",
+ "tingly", "tomorrowness",
+ "ethnocracy", "trabecular",
+ "vitally", "fossilism",
+ "adz", "metopon",
+ "prefatorial", "expiscate",
+ "diathermacy", "chronist",
+ "nigh", "generalizable",
+ "hysterogen", "aurothiosulphuric",
+ "whitlowwort", "downthrust",
+ "Protestantize", "monander",
+ "Itea", "chronographic",
+ "silicize", "Dunlop",
+ "eer", "componental",
+ "spot", "pamphlet",
+ "antineuritic", "paradisean",
+ "interruptor", "debellator",
+ "overcultured", "Florissant",
+ "hyocholic", "pneumatotherapy",
+ "tailoress", "rave",
+ "unpeople", "Sebastian",
+ "thermanesthesia", "Coniferae",
+ "swacking", "posterishness",
+ "ethmopalatal", "whittle",
+ "analgize", "scabbardless",
+ "naught", "symbiogenetically",
+ "trip", "parodist",
+ "columniform", "trunnel",
+ "yawler", "goodwill",
+ "pseudohalogen", "swangy",
+ "cervisial", "mediateness",
+ "genii", "imprescribable",
+ "pony", "consumptional",
+ "carposporangial", "poleax",
+ "bestill", "subfebrile",
+ "sapphiric", "arrowworm",
+ "qualminess", "ultraobscure",
+ "thorite", "Fouquieria",
+ "Bermudian", "prescriber",
+ "elemicin", "warlike",
+ "semiangle", "rotular",
+ "misthread", "returnability",
+ "seraphism", "precostal",
+ "quarried", "Babylonism",
+ "sangaree", "seelful",
+ "placatory", "pachydermous",
+ "bozal", "galbulus",
+ "spermaphyte", "cumbrousness",
+ "pope", "signifier",
+ "Endomycetaceae", "shallowish",
+ "sequacity", "periarthritis",
+ "bathysphere", "pentosuria",
+ "Dadaism", "spookdom",
+ "Consolamentum", "afterpressure",
+ "mutter", "louse",
+ "ovoviviparous", "corbel",
+ "metastoma", "biventer",
+ "Hydrangea", "hogmace",
+ "seizing", "nonsuppressed",
+ "oratorize", "uncarefully",
+ "benzothiofuran", "penult",
+ "balanocele", "macropterous",
+ "dishpan", "marten",
+ "absvolt", "jirble",
+ "parmelioid", "airfreighter",
+ "acocotl", "archesporial",
+ "hypoplastral", "preoral",
+ "quailberry", "cinque",
+ "terrestrially", "stroking",
+ "limpet", "moodishness",
+ "canicule", "archididascalian",
+ "pompiloid", "overstaid",
+ "introducer", "Italical",
+ "Christianopaganism", "prescriptible",
+ "subofficer", "danseuse",
+ "cloy", "saguran",
+ "frictionlessly", "deindividualization",
+ "Bulanda", "ventricous",
+ "subfoliar", "basto",
+ "scapuloradial", "suspend",
+ "stiffish", "Sphenodontidae",
+ "eternal", "verbid",
+ "mammonish", "upcushion",
+ "barkometer", "concretion",
+ "preagitate", "incomprehensible",
+ "tristich", "visceral",
+ "hemimelus", "patroller",
+ "stentorophonic", "pinulus",
+ "kerykeion", "brutism",
+ "monstership", "merciful",
+ "overinstruct", "defensibly",
+ "bettermost", "splenauxe",
+ "Mormyrus", "unreprimanded",
+ "taver", "ell",
+ "proacquittal", "infestation",
+ "overwoven", "Lincolnlike",
+ "chacona", "Tamil",
+ "classificational", "lebensraum",
+ "reeveland", "intuition",
+ "Whilkut", "focaloid",
+ "Eleusinian", "micromembrane",
+ "byroad", "nonrepetition",
+ "bacterioblast", "brag",
+ "ribaldrous", "phytoma",
+ "counteralliance", "pelvimetry",
+ "pelf", "relaster",
+ "thermoresistant", "aneurism",
+ "molossic", "euphonym",
+ "upswell", "ladhood",
+ "phallaceous", "inertly",
+ "gunshop", "stereotypography",
+ "laryngic", "refasten",
+ "twinling", "oflete",
+ "hepatorrhaphy", "electrotechnics",
+ "cockal", "guitarist",
+ "topsail", "Cimmerianism",
+ "larklike", "Llandovery",
+ "pyrocatechol", "immatchable",
+ "chooser", "metrocratic",
+ "craglike", "quadrennial",
+ "nonpoisonous", "undercolored",
+ "knob", "ultratense",
+ "balladmonger", "slait",
+ "sialadenitis", "bucketer",
+ "magnificently", "unstipulated",
+ "unscourged", "unsupercilious",
+ "packsack", "pansophism",
+ "soorkee", "percent",
+ "subirrigate", "champer",
+ "metapolitics", "spherulitic",
+ "involatile", "metaphonical",
+ "stachyuraceous", "speckedness",
+ "bespin", "proboscidiform",
+ "gul", "squit",
+ "yeelaman", "peristeropode",
+ "opacousness", "shibuichi",
+ "retinize", "yote",
+ "misexposition", "devilwise",
+ "pumpkinification", "vinny",
+ "bonze", "glossing",
+ "decardinalize", "transcortical",
+ "serphoid", "deepmost",
+ "guanajuatite", "wemless",
+ "arval", "lammy",
+ "Effie", "Saponaria",
+ "tetrahedral", "prolificy",
+ "excerpt", "dunkadoo",
+ "Spencerism", "insatiately",
+ "Gilaki", "oratorship",
+ "arduousness", "unbashfulness",
+ "Pithecolobium", "unisexuality",
+ "veterinarian", "detractive",
+ "liquidity", "acidophile",
+ "proauction", "sural",
+ "totaquina", "Vichyite",
+ "uninhabitedness", "allegedly",
+ "Gothish", "manny",
+ "Inger", "flutist",
+ "ticktick", "Ludgatian",
+ "homotransplant", "orthopedical",
+ "diminutively", "monogoneutic",
+ "Kenipsim", "sarcologist",
+ "drome", "stronghearted",
+ "Fameuse", "Swaziland",
+ "alen", "chilblain",
+ "beatable", "agglomeratic",
+ "constitutor", "tendomucoid",
+ "porencephalous", "arteriasis",
+ "boser", "tantivy",
+ "rede", "lineamental",
+ "uncontradictableness", "homeotypical",
+ "masa", "folious",
+ "dosseret", "neurodegenerative",
+ "subtransverse", "Chiasmodontidae",
+ "palaeotheriodont", "unstressedly",
+ "chalcites", "piquantness",
+ "lampyrine", "Aplacentalia",
+ "projecting", "elastivity",
+ "isopelletierin", "bladderwort",
+ "strander", "almud",
+ "iniquitously", "theologal",
+ "bugre", "chargeably",
+ "imperceptivity", "meriquinoidal",
+ "mesophyte", "divinator",
+ "perfunctory", "counterappellant",
+ "synovial", "charioteer",
+ "crystallographical", "comprovincial",
+ "infrastapedial", "pleasurehood",
+ "inventurous", "ultrasystematic",
+ "subangulated", "supraoesophageal",
+ "Vaishnavism", "transude",
+ "chrysochrous", "ungrave",
+ "reconciliable", "uninterpleaded",
+ "erlking", "wherefrom",
+ "aprosopia", "antiadiaphorist",
+ "metoxazine", "incalculable",
+ "umbellic", "predebit",
+ "foursquare", "unimmortal",
+ "nonmanufacture", "slangy",
+ "predisputant", "familist",
+ "preaffiliate", "friarhood",
+ "corelysis", "zoonitic",
+ "halloo", "paunchy",
+ "neuromimesis", "aconitine",
+ "hackneyed", "unfeeble",
+ "cubby", "autoschediastical",
+ "naprapath", "lyrebird",
+ "inexistency", "leucophoenicite",
+ "ferrogoslarite", "reperuse",
+ "uncombable", "tambo",
+ "propodiale", "diplomatize",
+ "Russifier", "clanned",
+ "corona", "michigan",
+ "nonutilitarian", "transcorporeal",
+ "bought", "Cercosporella",
+ "stapedius", "glandularly",
+ "pictorially", "weism",
+ "disilane", "rainproof",
+ "Caphtor", "scrubbed",
+ "oinomancy", "pseudoxanthine",
+ "nonlustrous", "redesertion",
+ "Oryzorictinae", "gala",
+ "Mycogone", "reappreciate",
+ "cyanoguanidine", "seeingness",
+ "breadwinner", "noreast",
+ "furacious", "epauliere",
+ "omniscribent", "Passiflorales",
+ "uninductive", "inductivity",
+ "Orbitolina", "Semecarpus",
+ "migrainoid", "steprelationship",
+ "phlogisticate", "mesymnion",
+ "sloped", "edificator",
+ "beneficent", "culm",
+ "paleornithology", "unurban",
+ "throbless", "amplexifoliate",
+ "sesquiquintile", "sapience",
+ "astucious", "dithery",
+ "boor", "ambitus",
+ "scotching", "uloid",
+ "uncompromisingness", "hoove",
+ "waird", "marshiness",
+ "Jerusalem", "mericarp",
+ "unevoked", "benzoperoxide",
+ "outguess", "pyxie",
+ "hymnic", "euphemize",
+ "mendacity", "erythremia",
+ "rosaniline", "unchatteled",
+ "lienteria", "Bushongo",
+ "dialoguer", "unrepealably",
+ "rivethead", "antideflation",
+ "vinegarish", "manganosiderite",
+ "doubtingness", "ovopyriform",
+ "Cephalodiscus", "Muscicapa",
+ "Animalivora", "angina",
+ "planispheric", "ipomoein",
+ "cuproiodargyrite", "sandbox",
+ "scrat", "Munnopsidae",
+ "shola", "pentafid",
+ "overstudiousness", "times",
+ "nonprofession", "appetible",
+ "valvulotomy", "goladar",
+ "uniarticular", "oxyterpene",
+ "unlapsing", "omega",
+ "trophonema", "seminonflammable",
+ "circumzenithal", "starer",
+ "depthwise", "liberatress",
+ "unleavened", "unrevolting",
+ "groundneedle", "topline",
+ "wandoo", "umangite",
+ "ordinant", "unachievable",
+ "oversand", "snare",
+ "avengeful", "unexplicit",
+ "mustafina", "sonable",
+ "rehabilitative", "eulogization",
+ "papery", "technopsychology",
+ "impressor", "cresylite",
+ "entame", "transudatory",
+ "scotale", "pachydermatoid",
+ "imaginary", "yeat",
+ "slipped", "stewardship",
+ "adatom", "cockstone",
+ "skyshine", "heavenful",
+ "comparability", "exprobratory",
+ "dermorhynchous", "parquet",
+ "cretaceous", "vesperal",
+ "raphis", "undangered",
+ "Glecoma", "engrain",
+ "counteractively", "Zuludom",
+ "orchiocatabasis", "Auriculariales",
+ "warriorwise", "extraorganismal",
+ "overbuilt", "alveolite",
+ "tetchy", "terrificness",
+ "widdle", "unpremonished",
+ "rebilling", "sequestrum",
+ "equiconvex", "heliocentricism",
+ "catabaptist", "okonite",
+ "propheticism", "helminthagogic",
+ "calycular", "giantly",
+ "wingable", "golem",
+ "unprovided", "commandingness",
+ "greave", "haply",
+ "doina", "depressingly",
+ "subdentate", "impairment",
+ "decidable", "neurotrophic",
+ "unpredict", "bicorporeal",
+ "pendulant", "flatman",
+ "intrabred", "toplike",
+ "Prosobranchiata", "farrantly",
+ "toxoplasmosis", "gorilloid",
+ "dipsomaniacal", "aquiline",
+ "atlantite", "ascitic",
+ "perculsive", "prospectiveness",
+ "saponaceous", "centrifugalization",
+ "dinical", "infravaginal",
+ "beadroll", "affaite",
+ "Helvidian", "tickleproof",
+ "abstractionism", "enhedge",
+ "outwealth", "overcontribute",
+ "coldfinch", "gymnastic",
+ "Pincian", "Munychian",
+ "codisjunct", "quad",
+ "coracomandibular", "phoenicochroite",
+ "amender", "selectivity",
+ "putative", "semantician",
+ "lophotrichic", "Spatangoidea",
+ "saccharogenic", "inferent",
+ "Triconodonta", "arrendation",
+ "sheepskin", "taurocolla",
+ "bunghole", "Machiavel",
+ "triakistetrahedral", "dehairer",
+ "prezygapophysial", "cylindric",
+ "pneumonalgia", "sleigher",
+ "emir", "Socraticism",
+ "licitness", "massedly",
+ "instructiveness", "sturdied",
+ "redecrease", "starosta",
+ "evictor", "orgiastic",
+ "squdge", "meloplasty",
+ "Tsonecan", "repealableness",
+ "swoony", "myesthesia",
+ "molecule", "autobiographist",
+ "reciprocation", "refective",
+ "unobservantness", "tricae",
+ "ungouged", "floatability",
+ "Mesua", "fetlocked",
+ "chordacentrum", "sedentariness",
+ "various", "laubanite",
+ "nectopod", "zenick",
+ "sequentially", "analgic",
+ "biodynamics", "posttraumatic",
+ "nummi", "pyroacetic",
+ "bot", "redescend",
+ "dispermy", "undiffusive",
+ "circular", "trillion",
+ "Uraniidae", "ploration",
+ "discipular", "potentness",
+ "sud", "Hu",
+ "Eryon", "plugger",
+ "subdrainage", "jharal",
+ "abscission", "supermarket",
+ "countergabion", "glacierist",
+ "lithotresis", "minniebush",
+ "zanyism", "eucalypteol",
+ "sterilely", "unrealize",
+ "unpatched", "hypochondriacism",
+ "critically", "cheesecutter",
+ };
+ }
+
+ /**
+ * This is the main routine for launching a distributed random write job.
+ * It runs 10 maps/node and each node writes 1 gig of data to a DFS file.
+ * The reduce doesn't do anything.
+ *
+ * @throws IOException
+ */
+ public int run(String[] args) throws Exception {
+ if (args.length == 0) {
+ return printUsage();
+ }
+ Job job = createJob(getConf());
+ FileOutputFormat.setOutputPath(job, new Path(args[0]));
+ Date startTime = new Date();
+ System.out.println("Job started: " + startTime);
+ int ret = job.waitForCompletion(true) ? 0 : 1;
+ Date endTime = new Date();
+ System.out.println("Job ended: " + endTime);
+ System.out.println("The job took " +
+ (endTime.getTime() - startTime.getTime()) /1000 +
+ " seconds.");
+
+ return ret;
+ }
+
+ static int printUsage() {
+ System.out.println("randomtextwriter " +
+ "[-outFormat <output format class>] " +
+ "<output>");
+ ToolRunner.printGenericCommandUsage(System.out);
+ return 2;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new RandomTextWriterJob(),
+ args);
+ System.exit(res);
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/SleepJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/SleepJob.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/SleepJob.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/SleepJob.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,274 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop;
+
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Dummy class for testing MR framefork. Sleeps for a defined period
+ * of time in mapper and reducer. Generates fake input for map / reduce
+ * jobs. Note that generated number of input pairs is in the order
+ * of <code>numMappers * mapSleepTime / 100</code>, so the job uses
+ * some disk space.
+ */
+public class SleepJob extends Configured implements Tool {
+ public static String MAP_SLEEP_COUNT = "mapreduce.sleepjob.map.sleep.count";
+ public static String REDUCE_SLEEP_COUNT =
+ "mapreduce.sleepjob.reduce.sleep.count";
+ public static String MAP_SLEEP_TIME = "mapreduce.sleepjob.map.sleep.time";
+ public static String REDUCE_SLEEP_TIME =
+ "mapreduce.sleepjob.reduce.sleep.time";
+
+ public static class SleepJobPartitioner extends
+ Partitioner<IntWritable, NullWritable> {
+ public int getPartition(IntWritable k, NullWritable v, int numPartitions) {
+ return k.get() % numPartitions;
+ }
+ }
+
+ public static class EmptySplit extends InputSplit implements Writable {
+ public void write(DataOutput out) throws IOException { }
+ public void readFields(DataInput in) throws IOException { }
+ public long getLength() { return 0L; }
+ public String[] getLocations() { return new String[0]; }
+ }
+
+ public static class SleepInputFormat
+ extends InputFormat<IntWritable,IntWritable> {
+
+ public List<InputSplit> getSplits(JobContext jobContext) {
+ List<InputSplit> ret = new ArrayList<InputSplit>();
+ int numSplits = jobContext.getConfiguration().
+ getInt(MRJobConfig.NUM_MAPS, 1);
+ for (int i = 0; i < numSplits; ++i) {
+ ret.add(new EmptySplit());
+ }
+ return ret;
+ }
+
+ public RecordReader<IntWritable,IntWritable> createRecordReader(
+ InputSplit ignored, TaskAttemptContext taskContext)
+ throws IOException {
+ Configuration conf = taskContext.getConfiguration();
+ final int count = conf.getInt(MAP_SLEEP_COUNT, 1);
+ if (count < 0) throw new IOException("Invalid map count: " + count);
+ final int redcount = conf.getInt(REDUCE_SLEEP_COUNT, 1);
+ if (redcount < 0)
+ throw new IOException("Invalid reduce count: " + redcount);
+ final int emitPerMapTask = (redcount * taskContext.getNumReduceTasks());
+
+ return new RecordReader<IntWritable,IntWritable>() {
+ private int records = 0;
+ private int emitCount = 0;
+ private IntWritable key = null;
+ private IntWritable value = null;
+ public void initialize(InputSplit split, TaskAttemptContext context) {
+ }
+
+ public boolean nextKeyValue()
+ throws IOException {
+ key = new IntWritable();
+ key.set(emitCount);
+ int emit = emitPerMapTask / count;
+ if ((emitPerMapTask) % count > records) {
+ ++emit;
+ }
+ emitCount += emit;
+ value = new IntWritable();
+ value.set(emit);
+ return records++ < count;
+ }
+ public IntWritable getCurrentKey() { return key; }
+ public IntWritable getCurrentValue() { return value; }
+ public void close() throws IOException { }
+ public float getProgress() throws IOException {
+ return records / ((float)count);
+ }
+ };
+ }
+ }
+
+ public static class SleepMapper
+ extends Mapper<IntWritable, IntWritable, IntWritable, NullWritable> {
+ private long mapSleepDuration = 100;
+ private int mapSleepCount = 1;
+ private int count = 0;
+
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ this.mapSleepCount =
+ conf.getInt(MAP_SLEEP_COUNT, mapSleepCount);
+ this.mapSleepDuration =
+ conf.getLong(MAP_SLEEP_TIME , 100) / mapSleepCount;
+ }
+
+ public void map(IntWritable key, IntWritable value, Context context
+ ) throws IOException, InterruptedException {
+ //it is expected that every map processes mapSleepCount number of records.
+ try {
+ context.setStatus("Sleeping... (" +
+ (mapSleepDuration * (mapSleepCount - count)) + ") ms left");
+ Thread.sleep(mapSleepDuration);
+ }
+ catch (InterruptedException ex) {
+ throw (IOException)new IOException(
+ "Interrupted while sleeping").initCause(ex);
+ }
+ ++count;
+ // output reduceSleepCount * numReduce number of random values, so that
+ // each reducer will get reduceSleepCount number of keys.
+ int k = key.get();
+ for (int i = 0; i < value.get(); ++i) {
+ context.write(new IntWritable(k + i), NullWritable.get());
+ }
+ }
+ }
+
+ public static class SleepReducer
+ extends Reducer<IntWritable, NullWritable, NullWritable, NullWritable> {
+ private long reduceSleepDuration = 100;
+ private int reduceSleepCount = 1;
+ private int count = 0;
+
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ this.reduceSleepCount =
+ conf.getInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
+ this.reduceSleepDuration =
+ conf.getLong(REDUCE_SLEEP_TIME , 100) / reduceSleepCount;
+ }
+
+ public void reduce(IntWritable key, Iterable<NullWritable> values,
+ Context context)
+ throws IOException {
+ try {
+ context.setStatus("Sleeping... (" +
+ (reduceSleepDuration * (reduceSleepCount - count)) + ") ms left");
+ Thread.sleep(reduceSleepDuration);
+
+ }
+ catch (InterruptedException ex) {
+ throw (IOException)new IOException(
+ "Interrupted while sleeping").initCause(ex);
+ }
+ count++;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new SleepJob(), args);
+ System.exit(res);
+ }
+
+ public Job createJob(int numMapper, int numReducer,
+ long mapSleepTime, int mapSleepCount,
+ long reduceSleepTime, int reduceSleepCount)
+ throws IOException {
+ Configuration conf = getConf();
+ conf.setLong(MAP_SLEEP_TIME, mapSleepTime);
+ conf.setLong(REDUCE_SLEEP_TIME, reduceSleepTime);
+ conf.setInt(MAP_SLEEP_COUNT, mapSleepCount);
+ conf.setInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
+ conf.setInt(MRJobConfig.NUM_MAPS, numMapper);
+ Job job = Job.getInstance(conf, "sleep");
+ job.setNumReduceTasks(numReducer);
+ job.setJarByClass(SleepJob.class);
+ job.setNumReduceTasks(numReducer);
+ job.setMapperClass(SleepMapper.class);
+ job.setMapOutputKeyClass(IntWritable.class);
+ job.setMapOutputValueClass(NullWritable.class);
+ job.setReducerClass(SleepReducer.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
+ job.setInputFormatClass(SleepInputFormat.class);
+ job.setPartitionerClass(SleepJobPartitioner.class);
+ job.setSpeculativeExecution(false);
+ job.setJobName("Sleep job");
+ FileInputFormat.addInputPath(job, new Path("ignored"));
+ return job;
+ }
+
+ public int run(String[] args) throws Exception {
+
+ if(args.length < 1) {
+ System.err.println("SleepJob [-m numMapper] [-r numReducer]" +
+ " [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)]" +
+ " [-recordt recordSleepTime (msec)]");
+ ToolRunner.printGenericCommandUsage(System.err);
+ return 2;
+ }
+
+ int numMapper = 1, numReducer = 1;
+ long mapSleepTime = 100, reduceSleepTime = 100, recSleepTime = 100;
+ int mapSleepCount = 1, reduceSleepCount = 1;
+
+ for(int i=0; i < args.length; i++ ) {
+ if(args[i].equals("-m")) {
+ numMapper = Integer.parseInt(args[++i]);
+ }
+ else if(args[i].equals("-r")) {
+ numReducer = Integer.parseInt(args[++i]);
+ }
+ else if(args[i].equals("-mt")) {
+ mapSleepTime = Long.parseLong(args[++i]);
+ }
+ else if(args[i].equals("-rt")) {
+ reduceSleepTime = Long.parseLong(args[++i]);
+ }
+ else if (args[i].equals("-recordt")) {
+ recSleepTime = Long.parseLong(args[++i]);
+ }
+ }
+
+ // sleep for *SleepTime duration in Task by recSleepTime per record
+ mapSleepCount = (int)Math.ceil(mapSleepTime / ((double)recSleepTime));
+ reduceSleepCount = (int)Math.ceil(reduceSleepTime / ((double)recSleepTime));
+ Job job = createJob(numMapper, numReducer, mapSleepTime,
+ mapSleepCount, reduceSleepTime, reduceSleepCount);
+ return job.waitForCompletion(true) ? 0 : 1;
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,288 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.TaskLog.LogName;
+import org.apache.hadoop.mapreduce.ID;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+
+public class MapReduceChildJVM {
+ private static final String SYSTEM_PATH_SEPARATOR =
+ System.getProperty("path.separator");
+
+ private static final Log LOG = LogFactory.getLog(MapReduceChildJVM.class);
+
+ private static File getUserLogDir(String baseLogDir) {
+ return new File(baseLogDir, TaskLog.USERLOGS_DIR_NAME);
+ }
+
+ private static File getJobDir(String baseLogDir, JobID jobid) {
+ return new File(getUserLogDir(baseLogDir), jobid.toString());
+ }
+
+ private static File getAttemptDir(String baseLogDir, TaskAttemptID taskid,
+ boolean isCleanup) {
+ String cleanupSuffix = isCleanup ? ".cleanup" : "";
+ return new File(getJobDir(baseLogDir, taskid.getJobID()), taskid
+ + cleanupSuffix);
+ }
+
+ private static File getTaskLogFile(String baseLogDir, TaskAttemptID taskid,
+ boolean isCleanup, LogName filter) {
+ return new File(getAttemptDir(baseLogDir, taskid, isCleanup),
+ filter.toString());
+ }
+
+ private static String getChildEnv(JobConf jobConf, boolean isMap) {
+ if (isMap) {
+ return jobConf.get(JobConf.MAPRED_MAP_TASK_ENV,
+ jobConf.get(JobConf.MAPRED_TASK_ENV));
+ }
+ return jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV,
+ jobConf.get(jobConf.MAPRED_TASK_ENV));
+ }
+
+ public static void setVMEnv(Map<CharSequence,CharSequence> env,
+ List<String> classPaths, String pwd, String nmLdLibraryPath, Task task,
+ CharSequence applicationTokensFile) {
+
+ JobConf conf = task.conf;
+
+ // Add classpath.
+ CharSequence cp = env.get("CLASSPATH");
+ String classpath = StringUtils.join(SYSTEM_PATH_SEPARATOR, classPaths);
+ if (null == cp) {
+ env.put("CLASSPATH", classpath);
+ } else {
+ env.put("CLASSPATH", classpath + SYSTEM_PATH_SEPARATOR + cp);
+ }
+
+ /////// Environmental variable LD_LIBRARY_PATH
+ StringBuilder ldLibraryPath = new StringBuilder();
+
+ ldLibraryPath.append(nmLdLibraryPath);
+ ldLibraryPath.append(SYSTEM_PATH_SEPARATOR);
+ ldLibraryPath.append(pwd);
+ env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
+ /////// Environmental variable LD_LIBRARY_PATH
+
+ // for the child of task jvm, set hadoop.root.logger
+ env.put("HADOOP_ROOT_LOGGER", "INFO,TLA");
+
+ // TODO: The following is useful for instance in streaming tasks. Should be
+ // set in ApplicationMaster's env by the RM.
+ String hadoopClientOpts = System.getenv("HADOOP_CLIENT_OPTS");
+ if (hadoopClientOpts == null) {
+ hadoopClientOpts = "";
+ } else {
+ hadoopClientOpts = hadoopClientOpts + " ";
+ }
+ // FIXME: don't think this is also needed given we already set java
+ // properties.
+ long logSize = TaskLog.getTaskLogLength(conf);
+ hadoopClientOpts = hadoopClientOpts + "-Dhadoop.tasklog.taskid=" + task.getTaskID()
+ + " -Dhadoop.tasklog.iscleanup=" + task.isTaskCleanupTask()
+ + " -Dhadoop.tasklog.totalLogFileSize=" + logSize;
+ env.put("HADOOP_CLIENT_OPTS", hadoopClientOpts);
+
+ // add the env variables passed by the user
+ String mapredChildEnv = getChildEnv(conf, task.isMapTask());
+ if (mapredChildEnv != null && mapredChildEnv.length() > 0) {
+ String childEnvs[] = mapredChildEnv.split(",");
+ for (String cEnv : childEnvs) {
+ String[] parts = cEnv.split("="); // split on '='
+ String value = (String) env.get(parts[0]);
+ if (value != null) {
+ // replace $env with the child's env constructed by tt's
+ // example LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp
+ value = parts[1].replace("$" + parts[0], value);
+ } else {
+ // this key is not configured by the tt for the child .. get it
+ // from the tt's env
+ // example PATH=$PATH:/tmp
+ value = System.getenv(parts[0]); // Get from NM?
+ if (value != null) {
+ // the env key is present in the tt's env
+ value = parts[1].replace("$" + parts[0], value);
+ } else {
+ // the env key is note present anywhere .. simply set it
+ // example X=$X:/tmp or X=/tmp
+ value = parts[1].replace("$" + parts[0], "");
+ }
+ }
+ env.put(parts[0], value);
+ }
+ }
+
+ // TODO: Put a random pid in env for now.
+ // Long term we will need to get it from the Child
+ env.put("JVM_PID", "12344");
+
+ env.put(Constants.HADOOP_WORK_DIR, "."); // This should work. TODO: Find
+ // why the var is introduced. Not
+ // used in tests, for e.g.
+ }
+
+ private static String getChildJavaOpts(JobConf jobConf, boolean isMapTask) {
+ if (isMapTask) {
+ return jobConf.get(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, jobConf.get(
+ JobConf.MAPRED_TASK_JAVA_OPTS,
+ JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS));
+ }
+ return jobConf
+ .get(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, jobConf.get(
+ JobConf.MAPRED_TASK_JAVA_OPTS,
+ JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS));
+ }
+
+ private static void setupLog4jProperties(Vector<CharSequence> vargs,
+ long logSize, String hadoopLogDir, Task task) {
+ vargs.add("-Dhadoop.log.dir=" + hadoopLogDir);
+ vargs.add("-Dhadoop.root.logger=DEBUG,TLA");
+ vargs.add("-Dhadoop.tasklog.taskid=" + task.getTaskID());
+ vargs.add("-Dhadoop.tasklog.iscleanup=" + task.isTaskCleanupTask());
+ vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
+ }
+
+ public static List<CharSequence> getVMCommand(
+ InetSocketAddress taskAttemptListenerAddr, Task task, String javaHome,
+ String workDir, String logDir, String childTmpDir, ID jvmID) {
+
+ TaskAttemptID attemptID = task.getTaskID();
+ JobConf conf = task.conf;
+
+ Vector<CharSequence> vargs = new Vector<CharSequence>(8);
+
+ vargs.add(javaHome + "/bin/java");
+
+ // Add child (task) java-vm options.
+ //
+ // The following symbols if present in mapred.{map|reduce}.child.java.opts
+ // value are replaced:
+ // + @taskid@ is interpolated with value of TaskID.
+ // Other occurrences of @ will not be altered.
+ //
+ // Example with multiple arguments and substitutions, showing
+ // jvm GC logging, and start of a passwordless JVM JMX agent so can
+ // connect with jconsole and the likes to watch child memory, threads
+ // and get thread dumps.
+ //
+ // <property>
+ // <name>mapred.map.child.java.opts</name>
+ // <value>-Xmx 512M -verbose:gc -Xloggc:/tmp/@taskid@.gc \
+ // -Dcom.sun.management.jmxremote.authenticate=false \
+ // -Dcom.sun.management.jmxremote.ssl=false \
+ // </value>
+ // </property>
+ //
+ // <property>
+ // <name>mapred.reduce.child.java.opts</name>
+ // <value>-Xmx 1024M -verbose:gc -Xloggc:/tmp/@taskid@.gc \
+ // -Dcom.sun.management.jmxremote.authenticate=false \
+ // -Dcom.sun.management.jmxremote.ssl=false \
+ // </value>
+ // </property>
+ //
+ String javaOpts = getChildJavaOpts(conf, task.isMapTask());
+ javaOpts = javaOpts.replace("@taskid@", attemptID.toString());
+ String [] javaOptsSplit = javaOpts.split(" ");
+
+ // Add java.library.path; necessary for loading native libraries.
+ //
+ // 1. We add the 'cwd' of the task to it's java.library.path to help
+ // users distribute native libraries via the DistributedCache.
+ // 2. The user can also specify extra paths to be added to the
+ // java.library.path via mapred.{map|reduce}.child.java.opts.
+ //
+ String libraryPath = workDir;
+ boolean hasUserLDPath = false;
+ for(int i=0; i<javaOptsSplit.length ;i++) {
+ if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {
+ // TODO: Does the above take care of escaped space chars
+ javaOptsSplit[i] += SYSTEM_PATH_SEPARATOR + libraryPath;
+ hasUserLDPath = true;
+ break;
+ }
+ }
+ if(!hasUserLDPath) {
+ vargs.add("-Djava.library.path=" + libraryPath);
+ }
+ for (int i = 0; i < javaOptsSplit.length; i++) {
+ vargs.add(javaOptsSplit[i]);
+ }
+
+ if (childTmpDir != null) {
+ vargs.add("-Djava.io.tmpdir=" + childTmpDir);
+ }
+
+ // Setup the log4j prop
+ long logSize = TaskLog.getTaskLogLength(conf);
+ setupLog4jProperties(vargs, logSize, logDir, task);
+
+ if (conf.getProfileEnabled()) {
+ if (conf.getProfileTaskRange(task.isMapTask()
+ ).isIncluded(task.getPartition())) {
+ File prof = getTaskLogFile(logDir, attemptID, task.isTaskCleanupTask(),
+ TaskLog.LogName.PROFILE);
+ vargs.add(String.format(conf.getProfileParams(), prof.toString()));
+ }
+ }
+
+ // Add main class and its arguments
+ vargs.add(YarnChild.class.getName()); // main of Child
+ // pass TaskAttemptListener's address
+ vargs.add(taskAttemptListenerAddr.getAddress().getHostAddress());
+ vargs.add(Integer.toString(taskAttemptListenerAddr.getPort()));
+ vargs.add(attemptID.toString()); // pass task identifier
+ // pass task log location
+ // TODO: The following API uses system property hadoop.log.dir
+ String attemptLogDir = getAttemptDir(logDir, attemptID, task.isTaskCleanupTask()).toString();
+ vargs.add(attemptLogDir);
+
+ // Finally add the jvmID
+ vargs.add(String.valueOf(jvmID.getId()));
+ vargs.add("1>"
+ + getTaskLogFile(logDir, attemptID, task.isTaskCleanupTask(),
+ TaskLog.LogName.STDERR));
+ vargs.add("2>"
+ + getTaskLogFile(logDir, attemptID, task.isTaskCleanupTask(),
+ TaskLog.LogName.STDOUT));
+
+ // Final commmand
+ StringBuilder mergedCommand = new StringBuilder();
+ for (CharSequence str : vargs) {
+ mergedCommand.append(str).append(" ");
+ }
+ Vector<CharSequence> vargsFinal = new Vector<CharSequence>(8);
+ vargsFinal.add("mkdir work; mkdir -p " + attemptLogDir + "; "
+ + mergedCommand.toString());
+ return vargsFinal;
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapTaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapTaskAttemptImpl.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapTaskAttemptImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapTaskAttemptImpl.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,68 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
+import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+
+public class MapTaskAttemptImpl extends TaskAttemptImpl {
+
+ private final TaskSplitMetaInfo splitInfo;
+
+ public MapTaskAttemptImpl(TaskID taskId, int attempt,
+ EventHandler eventHandler, Path jobFile,
+ int partition, TaskSplitMetaInfo splitInfo, Configuration conf,
+ TaskAttemptListener taskAttemptListener,
+ OutputCommitter committer, Token<JobTokenIdentifier> jobToken,
+ Collection<Token<? extends TokenIdentifier>> fsTokens) {
+ super(taskId, attempt, eventHandler,
+ taskAttemptListener, jobFile, partition, conf, splitInfo.getLocations(),
+ committer, jobToken, fsTokens);
+ this.splitInfo = splitInfo;
+ }
+
+ @Override
+ public Task createRemoteTask() {
+ //job file name is set in TaskAttempt, setting it null here
+ MapTask mapTask =
+ new MapTask(null, TypeConverter.fromYarn(getID()), partition,
+ splitInfo.getSplitIndex(), 1); // YARN doesn't have the concept of slots per task, set it as 1.
+ mapTask.setUser(conf.get(MRJobConfig.USER_NAME));
+ mapTask.setConf(conf);
+ return mapTask;
+ }
+
+ @Override
+ protected int getPriority() {
+ return 1;
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ReduceTaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ReduceTaskAttemptImpl.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ReduceTaskAttemptImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ReduceTaskAttemptImpl.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,69 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
+import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+
+
+public class ReduceTaskAttemptImpl extends TaskAttemptImpl {
+
+ private final int numMapTasks;
+
+ public ReduceTaskAttemptImpl(TaskID id, int attempt,
+ EventHandler eventHandler, Path jobFile, int partition,
+ int numMapTasks, Configuration conf,
+ TaskAttemptListener taskAttemptListener, OutputCommitter committer,
+ Token<JobTokenIdentifier> jobToken,
+ Collection<Token<? extends TokenIdentifier>> fsTokens) {
+ super(id, attempt, eventHandler, taskAttemptListener, jobFile, partition,
+ conf, new String[] {}, committer, jobToken, fsTokens);
+ this.numMapTasks = numMapTasks;
+ }
+
+ @Override
+ public Task createRemoteTask() {
+ //job file name is set in TaskAttempt, setting it null here
+ ReduceTask reduceTask =
+ new ReduceTask(null, TypeConverter.fromYarn(getID()), partition,
+ numMapTasks, 1); // YARN doesn't have the concept of slots per task, set it as 1.
+ reduceTask.setUser(conf.get(MRJobConfig.USER_NAME));
+ reduceTask.setConf(conf);
+ return reduceTask;
+ }
+
+ @Override
+ protected int getPriority() {
+ return 2;
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,410 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hadoop.mapred.SortedRanges.Range;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.mapreduce.v2.api.TaskType;
+
+/**
+ * This class is responsible for talking to the task umblical.
+ * It also converts all the old data structures
+ * to yarn data structures.
+ *
+ * This class HAS to be in this package to access package private
+ * methods/classes.
+ */
+public class TaskAttemptListenerImpl extends CompositeService
+ implements TaskUmbilicalProtocol, TaskAttemptListener {
+
+ private static final Log LOG = LogFactory.getLog(TaskAttemptListenerImpl.class);
+
+ private AppContext context;
+ private Server server;
+ private TaskHeartbeatHandler taskHeartbeatHandler;
+ private InetSocketAddress address;
+ private Map<WrappedJvmID, org.apache.hadoop.mapred.Task> jvmIDToAttemptMap =
+ Collections.synchronizedMap(new HashMap<WrappedJvmID,
+ org.apache.hadoop.mapred.Task>());
+ private JobTokenSecretManager jobTokenSecretManager = null;
+
+ public TaskAttemptListenerImpl(AppContext context,
+ JobTokenSecretManager jobTokenSecretManager) {
+ super(TaskAttemptListenerImpl.class.getName());
+ this.context = context;
+ this.jobTokenSecretManager = jobTokenSecretManager;
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ registerHeartbeatHandler();
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ startRpcServer();
+ super.start();
+ }
+
+ protected void registerHeartbeatHandler() {
+ taskHeartbeatHandler = new TaskHeartbeatHandler(context.getEventHandler());
+ addService(taskHeartbeatHandler);
+ }
+
+ protected void startRpcServer() {
+ Configuration conf = getConfig();
+ try {
+ server =
+ RPC.getServer(TaskUmbilicalProtocol.class, this, "0.0.0.0", 0, 1,
+ false, conf, jobTokenSecretManager);
+ server.start();
+ InetSocketAddress listenerAddress = server.getListenerAddress();
+ this.address =
+ NetUtils.createSocketAddr(listenerAddress.getAddress()
+ .getLocalHost().getCanonicalHostName()
+ + ":" + listenerAddress.getPort());
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ stopRpcServer();
+ super.stop();
+ }
+
+ protected void stopRpcServer() {
+ server.stop();
+ }
+
+ @Override
+ public InetSocketAddress getAddress() {
+ return address;
+ }
+
+ /**
+ * Child checking whether it can commit.
+ *
+ * <br/>
+ * Commit is a two-phased protocol. First the attempt informs the
+ * ApplicationMaster that it is
+ * {@link #commitPending(TaskAttemptID, TaskStatus)}. Then it repeatedly polls
+ * the ApplicationMaster whether it {@link #canCommit(TaskAttemptID)} This is
+ * a legacy from the centralized commit protocol handling by the JobTracker.
+ */
+ @Override
+ public boolean canCommit(TaskAttemptID taskAttemptID) throws IOException {
+ LOG.info("Commit go/no-go request from " + taskAttemptID.toString());
+ // An attempt is asking if it can commit its output. This can be decided
+ // only by the task which is managing the multiple attempts. So redirect the
+ // request there.
+ org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID =
+ TypeConverter.toYarn(taskAttemptID);
+
+ taskHeartbeatHandler.receivedPing(attemptID);
+
+ Job job = context.getJob(attemptID.taskID.jobID);
+ Task task = job.getTask(attemptID.taskID);
+ return task.canCommit(attemptID);
+ }
+
+ /**
+ * TaskAttempt is reporting that it is in commit_pending and it is waiting for
+ * the commit Response
+ *
+ * <br/>
+ * Commit it a two-phased protocol. First the attempt informs the
+ * ApplicationMaster that it is
+ * {@link #commitPending(TaskAttemptID, TaskStatus)}. Then it repeatedly polls
+ * the ApplicationMaster whether it {@link #canCommit(TaskAttemptID)} This is
+ * a legacy from the centralized commit protocol handling by the JobTracker.
+ */
+ @Override
+ public void commitPending(TaskAttemptID taskAttemptID, TaskStatus taskStatsu)
+ throws IOException, InterruptedException {
+ LOG.info("Commit-pending state update from " + taskAttemptID.toString());
+ // An attempt is asking if it can commit its output. This can be decided
+ // only by the task which is managing the multiple attempts. So redirect the
+ // request there.
+ org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID =
+ TypeConverter.toYarn(taskAttemptID);
+
+ taskHeartbeatHandler.receivedPing(attemptID);
+
+ context.getEventHandler().handle(
+ new TaskAttemptEvent(attemptID,
+ TaskAttemptEventType.TA_COMMIT_PENDING));
+ }
+
+ @Override
+ public void done(TaskAttemptID taskAttemptID) throws IOException {
+ LOG.info("Done acknowledgement from " + taskAttemptID.toString());
+
+ org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID =
+ TypeConverter.toYarn(taskAttemptID);
+
+ taskHeartbeatHandler.receivedPing(attemptID);
+
+ context.getEventHandler().handle(
+ new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
+ }
+
+ @Override
+ public void fatalError(TaskAttemptID taskAttemptID, String msg)
+ throws IOException {
+ // This happens only in Child and in the Task.
+ LOG.fatal("Task: " + taskAttemptID + " - exited : " + msg);
+ reportDiagnosticInfo(taskAttemptID, "Error: " + msg);
+
+ org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID =
+ TypeConverter.toYarn(taskAttemptID);
+ context.getEventHandler().handle(
+ new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
+ }
+
+ @Override
+ public void fsError(TaskAttemptID taskAttemptID, String message)
+ throws IOException {
+ // This happens only in Child.
+ LOG.fatal("Task: " + taskAttemptID + " - failed due to FSError: "
+ + message);
+ reportDiagnosticInfo(taskAttemptID, "FSError: " + message);
+
+ org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID =
+ TypeConverter.toYarn(taskAttemptID);
+ context.getEventHandler().handle(
+ new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
+ }
+
+ @Override
+ public void shuffleError(TaskAttemptID taskAttemptID, String message) throws IOException {
+ // TODO: This isn't really used in any MR code. Ask for removal.
+ }
+
+ @Override
+ public MapTaskCompletionEventsUpdate getMapCompletionEvents(
+ JobID jobIdentifier, int fromEventId, int maxEvents,
+ TaskAttemptID taskAttemptID) throws IOException {
+ LOG.info("MapCompletionEvents request from " + taskAttemptID.toString()
+ + ". fromEventID " + fromEventId + " maxEvents " + maxEvents);
+
+ // TODO: shouldReset is never used. See TT. Ask for Removal.
+ boolean shouldReset = false;
+ org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID =
+ TypeConverter.toYarn(taskAttemptID);
+ org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent[] events =
+ context.getJob(attemptID.taskID.jobID).getTaskAttemptCompletionEvents(
+ fromEventId, maxEvents);
+
+ taskHeartbeatHandler.receivedPing(attemptID);
+
+ //filter the events to return only map completion events in old format
+ List<TaskCompletionEvent> mapEvents = new ArrayList<TaskCompletionEvent>();
+ for (org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent event : events) {
+ if (TaskType.MAP.equals(event.attemptId.taskID.taskType)) {
+ mapEvents.add(TypeConverter.fromYarn(event));
+ }
+ }
+
+ return new MapTaskCompletionEventsUpdate(
+ mapEvents.toArray(new TaskCompletionEvent[0]), shouldReset);
+ }
+
+ @Override
+ public boolean ping(TaskAttemptID taskAttemptID) throws IOException {
+ LOG.info("Ping from " + taskAttemptID.toString());
+ taskHeartbeatHandler.receivedPing(TypeConverter.toYarn(taskAttemptID));
+ return true;
+ }
+
+ @Override
+ public void reportDiagnosticInfo(TaskAttemptID taskAttemptID, String diagnosticInfo)
+ throws IOException {
+ LOG.info("Diagnostics report from " + taskAttemptID.toString() + ": "
+ + diagnosticInfo);
+
+ org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID =
+ TypeConverter.toYarn(taskAttemptID);
+ taskHeartbeatHandler.receivedPing(attemptID);
+
+ // This is mainly used for cases where we want to propagate exception traces
+ // of tasks that fail.
+
+ // This call exists as a hadoop mapreduce legacy wherein all changes in
+ // counters/progress/phase/output-size are reported through statusUpdate()
+ // call but not diagnosticInformation.
+ context.getEventHandler().handle(
+ new TaskAttemptDiagnosticsUpdateEvent(attemptID, diagnosticInfo));
+ }
+
+ @Override
+ public boolean statusUpdate(TaskAttemptID taskAttemptID,
+ TaskStatus taskStatus) throws IOException, InterruptedException {
+ LOG.info("Status update from " + taskAttemptID.toString());
+ org.apache.hadoop.mapreduce.v2.api.TaskAttemptID yarnAttemptID =
+ TypeConverter.toYarn(taskAttemptID);
+ taskHeartbeatHandler.receivedPing(yarnAttemptID);
+ TaskAttemptStatus taskAttemptStatus =
+ new TaskAttemptStatus();
+ taskAttemptStatus.id = yarnAttemptID;
+ // Task sends the updated progress to the TT.
+ taskAttemptStatus.progress = taskStatus.getProgress();
+ LOG.info("Progress of TaskAttempt " + taskAttemptID + " is : "
+ + taskStatus.getProgress());
+ // Task sends the diagnostic information to the TT
+ taskAttemptStatus.diagnosticInfo = taskStatus.getDiagnosticInfo();
+ // Task sends the updated state-string to the TT.
+ taskAttemptStatus.stateString = taskStatus.getStateString();
+ // Set the output-size when map-task finishes. Set by the task itself.
+ taskAttemptStatus.outputSize = taskStatus.getOutputSize();
+ // Task sends the updated phase to the TT.
+ taskAttemptStatus.phase = TypeConverter.toYarn(taskStatus.getPhase());
+ // Counters are updated by the task.
+ taskAttemptStatus.counters =
+ TypeConverter.toYarn(taskStatus.getCounters());
+
+ //set the fetch failures
+ if (taskStatus.getFetchFailedMaps() != null
+ && taskStatus.getFetchFailedMaps().size() > 0) {
+ taskAttemptStatus.fetchFailedMaps =
+ new ArrayList<org.apache.hadoop.mapreduce.v2.api.TaskAttemptID>();
+ for (TaskAttemptID failedMapId : taskStatus.getFetchFailedMaps()) {
+ taskAttemptStatus.fetchFailedMaps.add(
+ TypeConverter.toYarn(failedMapId));
+ }
+ }
+
+ // Task sends the information about the nextRecordRange to the TT
+
+// TODO: The following are not needed here, but needed to be set somewhere inside AppMaster.
+// taskStatus.getRunState(); // Set by the TT/JT. Transform into a state TODO
+// taskStatus.getStartTime(); // Used to be set by the TaskTracker. This should be set by getTask().
+// taskStatus.getFinishTime(); // Used to be set by TT/JT. Should be set when task finishes
+// // This was used by TT to do counter updates only once every minute. So this
+// // isn't ever changed by the Task itself.
+// taskStatus.getIncludeCounters();
+
+ context.getEventHandler().handle(
+ new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id,
+ taskAttemptStatus));
+ return true;
+ }
+
+ @Override
+ public long getProtocolVersion(String arg0, long arg1) throws IOException {
+ return TaskUmbilicalProtocol.versionID;
+ }
+
+ @Override
+ public void reportNextRecordRange(TaskAttemptID taskAttemptID, Range range)
+ throws IOException {
+ // This is used when the feature of skipping records is enabled.
+
+ // This call exists as a hadoop mapreduce legacy wherein all changes in
+ // counters/progress/phase/output-size are reported through statusUpdate()
+ // call but not the next record range information.
+ throw new IOException("Not yet implemented.");
+ }
+
+ @Override
+ public JvmTask getTask(JvmContext context) throws IOException {
+
+ // A rough imitation of code from TaskTracker.
+
+ JVMId jvmId = context.jvmId;
+ LOG.info("JVM with ID : " + jvmId + " asked for a task");
+
+ // TODO: Is it an authorised container to get a task? Otherwise return null.
+
+ // TODO: Is the request for task-launch still valid?
+
+ // TODO: Child.java's firstTaskID isn't really firstTaskID. Ask for update
+ // to jobId and task-type.
+
+ WrappedJvmID wJvmID = new WrappedJvmID(jvmId.getJobId(), jvmId.isMap,
+ jvmId.getId());
+ org.apache.hadoop.mapred.Task task = jvmIDToAttemptMap.get(wJvmID);
+ if (task != null) { //there may be lag in the attempt getting added here
+ LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
+ JvmTask jvmTask = new JvmTask(task, false);
+
+ //remove the task as it is no more needed and free up the memory
+ jvmIDToAttemptMap.remove(wJvmID);
+
+ return jvmTask;
+ }
+ return new JvmTask(null, false);
+ }
+
+ @Override
+ public void updatePrivateDistributedCacheSizes(
+ org.apache.hadoop.mapreduce.JobID jobId, long[] sizes) throws IOException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void register(org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID,
+ org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {
+ //create the mapping so that it is easy to look up
+ //when it comes back to ask for Task.
+ jvmIDToAttemptMap.put(jvmID, task);
+ //register this attempt
+ taskHeartbeatHandler.register(attemptID);
+ }
+
+ @Override
+ public void unregister(org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID,
+ WrappedJvmID jvmID) {
+ //remove the mapping if not already removed
+ jvmIDToAttemptMap.remove(jvmID);
+
+ //unregister this attempt
+ taskHeartbeatHandler.unregister(attemptID);
+ }
+
+}