You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2009/04/28 13:27:38 UTC
svn commit: r769339 - in /hadoop/core/trunk: ./
src/core/org/apache/hadoop/io/ src/examples/org/apache/hadoop/examples/
src/test/org/apache/hadoop/mapred/
Author: sharad
Date: Tue Apr 28 11:27:37 2009
New Revision: 769339
URL: http://svn.apache.org/viewvc?rev=769339&view=rev
Log:
HADOOP-5699. Change PiEstimator to use new mapreduce api. Contributed by Amareshwari Sriramadasu.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/core/org/apache/hadoop/io/BooleanWritable.java
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestComparators.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=769339&r1=769338&r2=769339&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Apr 28 11:27:37 2009
@@ -34,6 +34,12 @@
HADOOP-5681. Change examples RandomWriter and RandomTextWriter to
use new mapreduce API. (Amareshwari Sriramadasu via sharad)
+ HADOOP-5680. Change org.apache.hadoop.examples.SleepJob to use new
+ mapreduce api. (Amareshwari Sriramadasu via sharad)
+
+ HADOOP-5699. Change org.apache.hadoop.examples.PiEstimator to use
+ new mapreduce api. (Amareshwari Sriramadasu via sharad)
+
NEW FEATURES
HADOOP-4268. Change fsck to use ClientProtocol methods so that the
@@ -256,9 +262,6 @@
HADOOP-5589. Eliminate source limit of 64 for map-side joins imposed by
TupleWritable encoding. (Jingkei Ly via cdouglas)
- HADOOP-5680. Change org.apache.hadoop.examples.SleepJob to use new
- mapreduce api. (Amareshwari Sriramadasu via sharad)
-
OPTIMIZATIONS
HADOOP-5595. NameNode does not need to run a replicator to choose a
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/io/BooleanWritable.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/io/BooleanWritable.java?rev=769339&r1=769338&r2=769339&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/io/BooleanWritable.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/io/BooleanWritable.java Tue Apr 28 11:27:37 2009
@@ -100,9 +100,7 @@
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
- boolean a = (readInt(b1, s1) == 1) ? true : false;
- boolean b = (readInt(b2, s2) == 1) ? true : false;
- return ((a == b) ? 0 : (a == false) ? -1 : 1);
+ return compareBytes(b1, s1, l1, b2, s2, l2);
}
}
Modified: hadoop/core/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java?rev=769339&r1=769338&r2=769339&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java (original)
+++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java Tue Apr 28 11:27:37 2009
@@ -20,8 +20,8 @@
import java.io.IOException;
import java.math.BigDecimal;
-import java.util.Iterator;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -31,17 +31,11 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -137,19 +131,18 @@
* Generate points in a unit square
* and then count points inside/outside of the inscribed circle of the square.
*/
- public static class PiMapper extends MapReduceBase
- implements Mapper<LongWritable, LongWritable, BooleanWritable, LongWritable> {
+ public static class PiMapper extends
+ Mapper<LongWritable, LongWritable, BooleanWritable, LongWritable> {
/** Map method.
* @param offset samples starting from the (offset+1)th sample.
* @param size the number of samples for this map
- * @param out output {ture->numInside, false->numOutside}
- * @param reporter
+ * @param context output {ture->numInside, false->numOutside}
*/
public void map(LongWritable offset,
LongWritable size,
- OutputCollector<BooleanWritable, LongWritable> out,
- Reporter reporter) throws IOException {
+ Context context)
+ throws IOException, InterruptedException {
final HaltonSequence haltonsequence = new HaltonSequence(offset.get());
long numInside = 0L;
@@ -171,13 +164,13 @@
//report status
i++;
if (i % 1000 == 0) {
- reporter.setStatus("Generated " + i + " samples.");
+ context.setStatus("Generated " + i + " samples.");
}
}
//output map results
- out.collect(new BooleanWritable(true), new LongWritable(numInside));
- out.collect(new BooleanWritable(false), new LongWritable(numOutside));
+ context.write(new BooleanWritable(true), new LongWritable(numInside));
+ context.write(new BooleanWritable(false), new LongWritable(numOutside));
}
}
@@ -185,34 +178,29 @@
* Reducer class for Pi estimation.
* Accumulate points inside/outside results from the mappers.
*/
- public static class PiReducer extends MapReduceBase
- implements Reducer<BooleanWritable, LongWritable, WritableComparable<?>, Writable> {
+ public static class PiReducer extends
+ Reducer<BooleanWritable, LongWritable, WritableComparable<?>, Writable> {
private long numInside = 0;
private long numOutside = 0;
- private JobConf conf; //configuration for accessing the file system
- /** Store job configuration. */
- @Override
- public void configure(JobConf job) {
- conf = job;
- }
-
/**
* Accumulate number of points inside/outside results from the mappers.
* @param isInside Is the points inside?
* @param values An iterator to a list of point counts
- * @param output dummy, not used here.
- * @param reporter
+ * @param context dummy, not used here.
*/
public void reduce(BooleanWritable isInside,
- Iterator<LongWritable> values,
- OutputCollector<WritableComparable<?>, Writable> output,
- Reporter reporter) throws IOException {
+ Iterable<LongWritable> values, Context context)
+ throws IOException, InterruptedException {
if (isInside.get()) {
- for(; values.hasNext(); numInside += values.next().get());
+ for (LongWritable val : values) {
+ numInside += val.get();
+ }
} else {
- for(; values.hasNext(); numOutside += values.next().get());
+ for (LongWritable val : values) {
+ numOutside += val.get();
+ }
}
}
@@ -220,10 +208,11 @@
* Reduce task done, write output to a file.
*/
@Override
- public void close() throws IOException {
+ public void cleanup(Context context) throws IOException {
//write output to a file
Path outDir = new Path(TMP_DIR, "out");
Path outFile = new Path(outDir, "reduce-out");
+ Configuration conf = context.getConfiguration();
FileSystem fileSys = FileSystem.get(conf);
SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
outFile, LongWritable.class, LongWritable.class,
@@ -238,34 +227,35 @@
*
* @return the estimated value of Pi
*/
- public static BigDecimal estimate(int numMaps, long numPoints, JobConf jobConf
- ) throws IOException {
+ public static BigDecimal estimate(int numMaps, long numPoints, Configuration conf
+ ) throws IOException, ClassNotFoundException, InterruptedException {
+ Job job = new Job(conf);
//setup job conf
- jobConf.setJobName(PiEstimator.class.getSimpleName());
+ job.setJobName(PiEstimator.class.getSimpleName());
+ job.setJarByClass(PiEstimator.class);
- jobConf.setInputFormat(SequenceFileInputFormat.class);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
- jobConf.setOutputKeyClass(BooleanWritable.class);
- jobConf.setOutputValueClass(LongWritable.class);
- jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(BooleanWritable.class);
+ job.setOutputValueClass(LongWritable.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
- jobConf.setMapperClass(PiMapper.class);
- jobConf.setNumMapTasks(numMaps);
+ job.setMapperClass(PiMapper.class);
- jobConf.setReducerClass(PiReducer.class);
- jobConf.setNumReduceTasks(1);
+ job.setReducerClass(PiReducer.class);
+ job.setNumReduceTasks(1);
// turn off speculative execution, because DFS doesn't handle
// multiple writers to the same file.
- jobConf.setSpeculativeExecution(false);
+ job.setSpeculativeExecution(false);
//setup input/output directories
final Path inDir = new Path(TMP_DIR, "in");
final Path outDir = new Path(TMP_DIR, "out");
- FileInputFormat.setInputPaths(jobConf, inDir);
- FileOutputFormat.setOutputPath(jobConf, outDir);
+ FileInputFormat.setInputPaths(job, inDir);
+ FileOutputFormat.setOutputPath(job, outDir);
- final FileSystem fs = FileSystem.get(jobConf);
+ final FileSystem fs = FileSystem.get(conf);
if (fs.exists(TMP_DIR)) {
throw new IOException("Tmp directory " + fs.makeQualified(TMP_DIR)
+ " already exists. Please remove it first.");
@@ -281,7 +271,7 @@
final LongWritable offset = new LongWritable(i * numPoints);
final LongWritable size = new LongWritable(numPoints);
final SequenceFile.Writer writer = SequenceFile.createWriter(
- fs, jobConf, file,
+ fs, conf, file,
LongWritable.class, LongWritable.class, CompressionType.NONE);
try {
writer.append(offset, size);
@@ -294,7 +284,7 @@
//start a map/reduce job
System.out.println("Starting Job");
final long startTime = System.currentTimeMillis();
- JobClient.runJob(jobConf);
+ job.waitForCompletion(true);
final double duration = (System.currentTimeMillis() - startTime)/1000.0;
System.out.println("Job Finished in " + duration + " seconds");
@@ -302,7 +292,7 @@
Path inFile = new Path(outDir, "reduce-out");
LongWritable numInside = new LongWritable();
LongWritable numOutside = new LongWritable();
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, conf);
try {
reader.next(numInside, numOutside);
} finally {
@@ -329,7 +319,7 @@
if (args.length != 2) {
System.err.println("Usage: "+getClass().getName()+" <nMaps> <nSamples>");
ToolRunner.printGenericCommandUsage(System.err);
- return -1;
+ return 2;
}
final int nMaps = Integer.parseInt(args[0]);
@@ -338,9 +328,8 @@
System.out.println("Number of Maps = " + nMaps);
System.out.println("Samples per Map = " + nSamples);
- final JobConf jobConf = new JobConf(getConf(), getClass());
System.out.println("Estimated value of Pi is "
- + estimate(nMaps, nSamples, jobConf));
+ + estimate(nMaps, nSamples, getConf()));
return 0;
}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestComparators.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestComparators.java?rev=769339&r1=769338&r2=769339&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestComparators.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestComparators.java Tue Apr 28 11:27:37 2009
@@ -19,7 +19,6 @@
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
-import org.apache.hadoop.io.BooleanWritable.Comparator;
import junit.framework.TestCase;
import java.io.*;
import java.util.*;
@@ -237,7 +236,8 @@
return -super.compare(b1, s1, l1, b2, s2, l2);
}
static { // register this comparator
- WritableComparator.define(DecreasingIntComparator.class, new Comparator());
+ WritableComparator.define(DecreasingIntComparator.class,
+ new IntWritable.Comparator());
}
}
@@ -268,7 +268,8 @@
}
static {
- WritableComparator.define(CompositeIntGroupFn.class, new Comparator());
+ WritableComparator.define(CompositeIntGroupFn.class,
+ new IntWritable.Comparator());
}
}
@@ -284,7 +285,8 @@
}
static {
- WritableComparator.define(CompositeIntReverseGroupFn.class, new Comparator());
+ WritableComparator.define(CompositeIntReverseGroupFn.class,
+ new IntWritable.Comparator());
}
}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?rev=769339&r1=769338&r2=769339&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Tue Apr 28 11:27:37 2009
@@ -45,7 +45,8 @@
new File(System.getProperty("test.build.data","/tmp"))
.toURI().toString().replace(' ', '+');
- public void testWithLocal() throws IOException {
+ public void testWithLocal()
+ throws IOException, InterruptedException, ClassNotFoundException {
MiniMRCluster mr = null;
try {
mr = new MiniMRCluster(2, "file:///", 3);
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=769339&r1=769338&r2=769339&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Tue Apr 28 11:27:37 2009
@@ -169,7 +169,8 @@
}
}
- public static void runPI(MiniMRCluster mr, JobConf jobconf) throws IOException {
+ public static void runPI(MiniMRCluster mr, JobConf jobconf)
+ throws IOException, InterruptedException, ClassNotFoundException {
LOG.info("runPI");
double estimate = org.apache.hadoop.examples.PiEstimator.estimate(
NUM_MAPS, NUM_SAMPLES, jobconf).doubleValue();
@@ -232,7 +233,8 @@
}
}
- public void testWithDFS() throws IOException {
+ public void testWithDFS()
+ throws IOException, InterruptedException, ClassNotFoundException {
MiniDFSCluster dfs = null;
MiniMRCluster mr = null;
FileSystem fileSys = null;
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java?rev=769339&r1=769338&r2=769339&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java Tue Apr 28 11:27:37 2009
@@ -55,7 +55,8 @@
}
}
- static void runPI(MiniMRCluster mr, JobConf jobconf) throws IOException {
+ static void runPI(MiniMRCluster mr, JobConf jobconf)
+ throws IOException, InterruptedException, ClassNotFoundException {
LOG.info("runPI");
double estimate = PiEstimator.estimate(NUM_MAPS, NUM_SAMPLES, jobconf).doubleValue();
double error = Math.abs(Math.PI - estimate);
@@ -66,7 +67,8 @@
* Run the pi test with a specifix value of
* mapred.jobtracker.maxtasks.per.job. Returns true if the job succeeded.
*/
- private boolean runOneTest(int maxTasks) throws IOException {
+ private boolean runOneTest(int maxTasks)
+ throws IOException, InterruptedException, ClassNotFoundException {
MiniDFSCluster dfs = null;
MiniMRCluster mr = null;
FileSystem fileSys = null;
@@ -96,7 +98,8 @@
return success;
}
- public void testTaskLimits() throws IOException {
+ public void testTaskLimits()
+ throws IOException, InterruptedException, ClassNotFoundException {
System.out.println("Job 1 running with max set to 2");
boolean status = runOneTest(2);