You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/06/15 23:56:38 UTC
svn commit: r547807 - in /lucene/hadoop/trunk: ./
src/contrib/streaming/src/test/org/apache/hadoop/streaming/
src/examples/org/apache/hadoop/examples/ src/java/org/apache/hadoop/mapred/
src/test/org/apache/hadoop/mapred/
Author: cutting
Date: Fri Jun 15 14:56:37 2007
New Revision: 547807
URL: http://svn.apache.org/viewvc?view=rev&rev=547807
Log:
HADOOP-1320. Rewrite RandomWriter example to bypass reduce. Contributed by Arun.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java
lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=547807&r1=547806&r2=547807
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Jun 15 14:56:37 2007
@@ -147,6 +147,9 @@
46. HADOOP-1417. Disable a few FindBugs checks that generate a lot
of spurious warnings. (Nigel Daley via cutting)
+ 47. HADOOP-1320. Rewrite RandomWriter example to bypass reduce.
+ (Arun C Murthy via cutting)
+
Release 0.13.0 - 2007-06-08
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java?view=diff&rev=547807&r1=547806&r2=547807
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java Fri Jun 15 14:56:37 2007
@@ -71,6 +71,7 @@
public void testCommandLine()
{
+ String outFileName = "part-00000";
File outFile = null;
try {
try {
@@ -85,7 +86,7 @@
// So don't specify -config or -cluster
job = new StreamJob(genArgs(), mayExit);
job.go();
- outFile = new File(OUTPUT_DIR, "tip_m_map_0000").getAbsoluteFile();
+ outFile = new File(OUTPUT_DIR, outFileName).getAbsoluteFile();
String output = StreamUtil.slurp(outFile);
System.err.println("outEx1=" + outputExpect);
System.err.println(" out1=" + output);
@@ -94,7 +95,7 @@
failTrace(e);
} finally {
outFile.delete();
- File outFileCRC = new File(OUTPUT_DIR, ".tip_m_map_0000.crc").getAbsoluteFile();
+ File outFileCRC = new File(OUTPUT_DIR, "."+outFileName+".crc").getAbsoluteFile();
INPUT_FILE.delete();
outFileCRC.delete();
OUTPUT_DIR.getAbsoluteFile().delete();
Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java?view=diff&rev=547807&r1=547806&r2=547807
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java Fri Jun 15 14:56:37 2007
@@ -22,13 +22,12 @@
import java.util.Date;
import java.util.Random;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.IdentityReducer;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.util.ToolBase;
/**
* This program uses map/reduce to just run a distributed job where there is
@@ -61,8 +60,11 @@
* <value>1099511627776</value>
* </property>
* </configuration></xmp>
+ *
+ * Equivalently, {@link RandomWriter} also supports all the above options
+ * and ones supported by {@link ToolBase} via the command-line.
*/
-public class RandomWriter {
+public class RandomWriter extends ToolBase {
/**
* User counters
@@ -88,7 +90,7 @@
InputSplit[] result = new InputSplit[numSplits];
Path outDir = job.getOutputPath();
for(int i=0; i < result.length; ++i) {
- result[i] = new FileSplit(new Path(outDir, "part-" + i), 0, 1, job);
+ result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, job);
}
return result;
}
@@ -133,8 +135,6 @@
}
static class Map extends MapReduceBase implements Mapper {
- private FileSystem fileSys = null;
- private JobConf jobConf = null;
private long numBytesToWrite;
private int minKeySize;
private int keySizeRange;
@@ -143,7 +143,6 @@
private Random random = new Random();
private BytesWritable randomKey = new BytesWritable();
private BytesWritable randomValue = new BytesWritable();
- private Path outputDir = null;
private void randomizeBytes(byte[] data, int offset, int length) {
for(int i=offset + length - 1; i >= offset; --i) {
@@ -158,12 +157,6 @@
Writable value,
OutputCollector output,
Reporter reporter) throws IOException {
- String filename = ((Text) key).toString();
- SequenceFile.Writer writer =
- SequenceFile.createWriter(fileSys, jobConf,
- new Path(outputDir, filename),
- BytesWritable.class, BytesWritable.class,
- CompressionType.NONE, reporter);
int itemCount = 0;
while (numBytesToWrite > 0) {
int keyLength = minKeySize +
@@ -174,7 +167,7 @@
(valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
randomValue.setSize(valueLength);
randomizeBytes(randomValue.get(), 0, randomValue.getSize());
- writer.append(randomKey, randomValue);
+ output.collect(randomKey, randomValue);
numBytesToWrite -= keyLength + valueLength;
reporter.incrCounter(Counters.BYTES_WRITTEN, keyLength + valueLength);
reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
@@ -184,7 +177,6 @@
}
}
reporter.setStatus("done with " + itemCount + " records.");
- writer.close();
}
/**
@@ -192,14 +184,6 @@
* the data.
*/
public void configure(JobConf job) {
- jobConf = job;
- try {
- fileSys = FileSystem.get(job);
- } catch (IOException e) {
- throw new RuntimeException("Can't get default file system", e);
- }
- outputDir = job.getOutputPath();
-
numBytesToWrite = job.getLong("test.randomwrite.bytes_per_map",
1*1024*1024*1024);
minKeySize = job.getInt("test.randomwrite.min_key", 10);
@@ -219,18 +203,15 @@
*
* @throws IOException
*/
- public static void main(String[] args) throws IOException {
+ public int run(String[] args) throws Exception {
if (args.length == 0) {
System.out.println("Usage: writer <out-dir> [<config>]");
- return;
+ return -1;
}
+
Path outDir = new Path(args[0]);
- JobConf job;
- if (args.length >= 2) {
- job = new JobConf(new Path(args[1]));
- } else {
- job = new JobConf();
- }
+ JobConf job = new JobConf(conf);
+
job.setJarByClass(RandomWriter.class);
job.setJobName("random-writer");
job.setOutputPath(outDir);
@@ -241,7 +222,7 @@
job.setInputFormat(RandomInputFormat.class);
job.setMapperClass(Map.class);
job.setReducerClass(IdentityReducer.class);
- job.setOutputFormat(NullOutputFormat.class);
+ job.setOutputFormat(SequenceFileOutputFormat.class);
JobClient client = new JobClient(job);
ClusterStatus cluster = client.getClusterStatus();
@@ -250,7 +231,7 @@
1*1024*1024*1024);
if (numBytesToWritePerMap == 0) {
System.err.println("Cannot have test.randomwrite.bytes_per_map set to 0");
- System.exit(-1);
+ return -2;
}
long totalBytesToWrite = job.getLong("test.randomwrite.total_bytes",
numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
@@ -262,7 +243,9 @@
job.setNumMapTasks(numMaps);
System.out.println("Running " + numMaps + " maps.");
- job.setNumReduceTasks(1);
+
+ // reducer NONE
+ job.setNumReduceTasks(0);
Date startTime = new Date();
System.out.println("Job started: " + startTime);
@@ -272,6 +255,13 @@
System.out.println("The job took " +
(endTime.getTime() - startTime.getTime()) /1000 +
" seconds.");
+
+ return 0;
}
+ public static void main(String[] args) throws Exception {
+ int res = new RandomWriter().doMain(new Configuration(), args);
+ System.exit(res);
+ }
+
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=547807&r1=547806&r2=547807
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Fri Jun 15 14:56:37 2007
@@ -239,7 +239,7 @@
JobConf job, Reporter reporter) throws IOException {
this.job = job;
this.reporter = reporter;
- String finalName = getTipId();
+ String finalName = getOutputName(getPartition());
FileSystem fs = FileSystem.get(this.job);
out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=547807&r1=547806&r2=547807
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Fri Jun 15 14:56:37 2007
@@ -26,7 +26,6 @@
import java.net.URL;
import java.net.URLClassLoader;
import java.text.DecimalFormat;
-import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -373,20 +372,6 @@
throw ioe;
}
done(umbilical);
- }
-
- /** Construct output file names so that, when an output directory listing is
- * sorted lexicographically, positions correspond to output partitions.*/
-
- private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
-
- static {
- NUMBER_FORMAT.setMinimumIntegerDigits(5);
- NUMBER_FORMAT.setGroupingUsed(false);
- }
-
- static synchronized String getOutputName(int partition) {
- return "part-" + NUMBER_FORMAT.format(partition);
}
private class ReduceCopier implements MRConstants {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=547807&r1=547806&r2=547807
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Fri Jun 15 14:56:37 2007
@@ -22,6 +22,7 @@
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
+import java.text.NumberFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -54,8 +55,23 @@
REDUCE_INPUT_RECORDS,
REDUCE_OUTPUT_RECORDS
}
+
+ ///////////////////////////////////////////////////////////
+ // Helper methods to construct task-output paths
+ ///////////////////////////////////////////////////////////
-
+ /** Construct output file names so that, when an output directory listing is
+ * sorted lexicographically, positions correspond to output partitions.*/
+ private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+ static {
+ NUMBER_FORMAT.setMinimumIntegerDigits(5);
+ NUMBER_FORMAT.setGroupingUsed(false);
+ }
+
+ static synchronized String getOutputName(int partition) {
+ return "part-" + NUMBER_FORMAT.format(partition);
+ }
+
////////////////////////////////////////////
// Fields
////////////////////////////////////////////
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java?view=diff&rev=547807&r1=547806&r2=547807
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java Fri Jun 15 14:56:37 2007
@@ -305,7 +305,7 @@
assertTrue("job was complete", rj.isComplete());
assertTrue("job was successful", rj.isSuccessful());
Path output = new Path(outDir,
- ReduceTask.getOutputName(0));
+ Task.getOutputName(0));
assertTrue("reduce output exists " + output, fs.exists(output));
SequenceFile.Reader rdr =
new SequenceFile.Reader(fs, output, conf);