You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by en...@apache.org on 2007/09/13 08:31:51 UTC
svn commit: r575188 - in /lucene/hadoop/trunk: CHANGES.txt
src/examples/org/apache/hadoop/examples/ExampleDriver.java
src/examples/org/apache/hadoop/examples/SleepJob.java
Author: enis
Date: Wed Sep 12 23:31:51 2007
New Revision: 575188
URL: http://svn.apache.org/viewvc?rev=575188&view=rev
Log:
HADOOP-1880. SleepJob : An example job that sleeps at each map and reduce task.
Added:
lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=575188&r1=575187&r2=575188&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Sep 12 23:31:51 2007
@@ -53,6 +53,9 @@
HADOOP-1351. Add "bin/hadoop job [-fail-task|-kill-task]" sub-commands
to terminate a particular task-attempt. (Enis Soztutar via acmurthy)
+ HADOOP-1880. SleepJob : An example job that sleeps at each map and
+ reduce task. (enis)
+
OPTIMIZATIONS
HADOOP-1565. Reduce memory usage of NameNode by replacing
Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java?rev=575188&r1=575187&r2=575188&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java Wed Sep 12 23:31:51 2007
@@ -17,8 +17,9 @@
*/
package org.apache.hadoop.examples;
+import org.apache.hadoop.examples.dancing.DistributedPentomino;
+import org.apache.hadoop.examples.dancing.Sudoku;
import org.apache.hadoop.util.ProgramDriver;
-import org.apache.hadoop.examples.dancing.*;
/**
* A description of an example program based on its class and a
@@ -42,6 +43,7 @@
pgd.addClass("pentomino", DistributedPentomino.class,
"A map/reduce tile laying program to find solutions to pentomino problems.");
pgd.addClass("sudoku", Sudoku.class, "A sudoku solver.");
+ pgd.addClass("sleep", SleepJob.class, "A job that sleeps at each map and reduce task.");
pgd.driver(argv);
}
catch(Throwable e){
Added: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java?rev=575188&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java (added)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java Wed Sep 12 23:31:51 2007
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.examples;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Dummy class for testing MR framefork. Sleeps for a defined period
+ * of time in mapper and reducer. Generates fake input for map / reduce
+ * jobs. Note that generated number of input pairs is in the order
+ * of <code>numMappers * mapSleepTime / 100</code>, so the job uses
+ * some disk space.
+ */
+public class SleepJob extends Configured implements Tool,
+ Mapper<IntWritable, IntWritable, IntWritable, IntWritable>,
+ Reducer<IntWritable, IntWritable, IntWritable, IntWritable>,
+ Partitioner<IntWritable, IntWritable>{
+
+ private long mapSleepTime = 100;
+ private long reduceSleepTime = 100;
+ private long mapSleepCount = 1;
+ private long reduceSleepCount = 1;
+ private int numReduce;
+
+ private boolean firstRecord = true;
+ private long count = 0;
+
+ public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
+ return key.get() % numPartitions;
+ }
+
+ public void map(IntWritable key, IntWritable value,
+ OutputCollector<IntWritable, IntWritable> output, Reporter reporter) throws IOException {
+
+ //it is expected that every map processes mapSleepCount number of records.
+ try {
+ long left = mapSleepCount - count ;
+ if(left < 0) left = 0;
+ reporter.setStatus("Sleeping... (" + ( mapSleepTime / mapSleepCount * left) + ") ms left");
+ Thread.sleep(mapSleepTime / mapSleepCount);
+ }
+ catch (InterruptedException ex) {
+ }
+ count++;
+ if(firstRecord) {
+
+ //output reduceSleepCount * numReduce number of random values, so that each reducer will get
+ //reduceSleepCount number of keys.
+ for(int i=0; i < reduceSleepCount * numReduce; i++) {
+ output.collect(new IntWritable(i), value);
+ }
+ }
+ firstRecord = false;
+ }
+
+ public void reduce(IntWritable key, Iterator<IntWritable> values,
+ OutputCollector<IntWritable, IntWritable> output, Reporter reporter) throws IOException {
+
+ try {
+ long left = reduceSleepCount - count ;
+ if(left < 0) left = 0;
+
+ reporter.setStatus("Sleeping... ("
+ +( reduceSleepTime / reduceSleepCount * left) + ") ms left");
+ Thread.sleep(reduceSleepTime / reduceSleepCount);
+
+ }
+ catch (InterruptedException ex) {
+ }
+ firstRecord = false;
+ count++;
+ }
+
+ public void configure(JobConf job) {
+ this.mapSleepTime = job.getLong("sleep.job.map.sleep.time" , mapSleepTime);
+ this.reduceSleepTime = job.getLong("sleep.job.reduce.sleep.time" , reduceSleepTime);
+ this.mapSleepCount = job.getLong("sleep.job.map.sleep.count", mapSleepCount);
+ this.reduceSleepCount = job.getLong("sleep.job.reduce.sleep.count", reduceSleepCount);
+ numReduce = job.getNumReduceTasks();
+ }
+
+ public void close() throws IOException {
+ }
+
+ public static void main(String[] args) throws Exception{
+ int res = ToolRunner.run(new Configuration(), new SleepJob(), args);
+ System.exit(res);
+ }
+
+ public int run(int numMapper, int numReducer, long mapSleepTime
+ , long mapSleepCount, long reduceSleepTime
+ , long reduceSleepCount) throws Exception {
+ Random random = new Random();
+ FileSystem fs = FileSystem.get(getConf());
+ Path tempPath = new Path("/tmp/sleep.job.data");
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf()
+ , tempPath, IntWritable.class, IntWritable.class);
+ for(int i=0; i<numMapper * mapSleepCount ;i++) {
+ writer.append(new IntWritable(random.nextInt()), new IntWritable(random.nextInt()));
+ }
+ writer.close();
+ try {
+ JobConf job = new JobConf(getConf(), SleepJob.class);
+ job.setNumMapTasks(numMapper);
+ job.setNumReduceTasks(numReducer);
+ job.setMapperClass(SleepJob.class);
+ job.setMapOutputKeyClass(IntWritable.class);
+ job.setMapOutputValueClass(IntWritable.class);
+ job.setReducerClass(SleepJob.class);
+ job.setOutputFormat(NullOutputFormat.class);
+ job.setInputFormat(SequenceFileInputFormat.class);
+ job.setSpeculativeExecution(false);
+ job.setJobName("Sleep job");
+ job.addInputPath(tempPath);
+ job.setLong("sleep.job.map.sleep.time", mapSleepTime);
+ job.setLong("sleep.job.reduce.sleep.time", reduceSleepTime);
+ job.setLong("sleep.job.map.sleep.count", mapSleepCount);
+ job.setLong("sleep.job.reduce.sleep.count", reduceSleepCount);
+
+ JobClient.runJob(job);
+ }
+ finally {
+ fs.delete(tempPath);
+ }
+ return 0;
+ }
+
+ public int run(String[] args) throws Exception {
+
+ if(args.length < 1) {
+ System.err.println("SleepJob [-m numMapper] [-r numReducer]" +
+ " [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)] ");
+ ToolRunner.printGenericCommandUsage(System.err);
+ return -1;
+ }
+
+ int numMapper = 1, numReducer = 1;
+ long mapSleepTime = 100, reduceSleepTime = 100;
+ long mapSleepCount = 1, reduceSleepCount = 1;
+
+ for(int i=0; i < args.length; i++ ) {
+ if(args[i].equals("-m")) {
+ numMapper = Integer.parseInt(args[++i]);
+ }
+ else if(args[i].equals("-r")) {
+ numReducer = Integer.parseInt(args[++i]);
+ }
+ else if(args[i].equals("-mt")) {
+ mapSleepTime = Long.parseLong(args[++i]);
+ }
+ else if(args[i].equals("-rt")) {
+ reduceSleepTime = Long.parseLong(args[++i]);
+ }
+ }
+
+ mapSleepCount = (long)Math.ceil(mapSleepTime / 100.0d);
+ reduceSleepCount = (long)Math.ceil(reduceSleepTime / 100.0d);
+
+ return run(numMapper, numReducer, mapSleepTime, mapSleepCount
+ , reduceSleepTime, reduceSleepCount);
+ }
+
+}