You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2007/08/10 22:25:51 UTC
svn commit: r564751 - in /lucene/hadoop/branches/branch-0.14: ./
src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Author: omalley
Date: Fri Aug 10 13:25:50 2007
New Revision: 564751
URL: http://svn.apache.org/viewvc?view=rev&rev=564751
Log:
Merge -r 564740:564744 from trunk to 0.14 branch. Fixes HADOOP-71.
Modified:
lucene/hadoop/branches/branch-0.14/CHANGES.txt
lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/LineRecordReader.java
lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/MapTask.java
lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java
lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/Task.java
lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
Modified: lucene/hadoop/branches/branch-0.14/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/CHANGES.txt?view=diff&rev=564751&r1=564750&r2=564751
==============================================================================
--- lucene/hadoop/branches/branch-0.14/CHANGES.txt (original)
+++ lucene/hadoop/branches/branch-0.14/CHANGES.txt Fri Aug 10 13:25:50 2007
@@ -463,6 +463,9 @@
148. HADOOP-1680. Improvements to Block CRC upgrade messages.
(Raghu Angadi via dhruba)
+149. HADOOP-71. Allow Text and SequenceFile Map/Reduce inputs from non-default
+ filesystems. (omalley)
+
Release 0.13.0 - 2007-06-08
1. HADOOP-1047. Fix TestReplication to succeed more reliably.
Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/LineRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/LineRecordReader.java?view=diff&rev=564751&r1=564750&r2=564751
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/LineRecordReader.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/LineRecordReader.java Fri Aug 10 13:25:50 2007
@@ -69,7 +69,7 @@
final CompressionCodec codec = compressionCodecs.getCodec(file);
// open the file and seek to the start of the split
- FileSystem fs = FileSystem.get(job);
+ FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
InputStream in = fileIn;
boolean skipFirstLine = false;
Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=564751&r1=564750&r2=564751
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/MapTask.java Fri Aug 10 13:25:50 2007
@@ -210,14 +210,11 @@
private Reporter reporter = null;
- private JobConf job;
-
public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
JobConf job, Reporter reporter) throws IOException {
- this.job = job;
this.reporter = reporter;
String finalName = getOutputName(getPartition());
- FileSystem fs = FileSystem.get(this.job);
+ FileSystem fs = FileSystem.get(job);
out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
}
Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java?view=diff&rev=564751&r1=564750&r2=564751
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java Fri Aug 10 13:25:50 2007
@@ -20,10 +20,11 @@
import java.io.IOException;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.*;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.*;
import org.apache.hadoop.util.ReflectionUtils;
/** An {@link RecordReader} for {@link SequenceFile}s. */
@@ -36,8 +37,9 @@
public SequenceFileRecordReader(Configuration conf, FileSplit split)
throws IOException {
- FileSystem fs = FileSystem.get(conf);
- this.in = new SequenceFile.Reader(fs, split.getPath(), conf);
+ Path path = split.getPath();
+ FileSystem fs = path.getFileSystem(conf);
+ this.in = new SequenceFile.Reader(fs, path, conf);
this.end = split.getStart() + split.getLength();
this.conf = conf;
Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=564751&r1=564750&r2=564751
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/Task.java Fri Aug 10 13:25:50 2007
@@ -180,7 +180,8 @@
private Path getTaskOutputPath(JobConf conf) {
Path p = new Path(conf.getOutputPath(), ("_" + taskId));
try {
- return p.makeQualified(FileSystem.get(conf));
+ FileSystem fs = p.getFileSystem(conf);
+ return p.makeQualified(fs);
} catch (IOException ie) {
LOG.warn(StringUtils.stringifyException(ie));
return p;
@@ -390,21 +391,23 @@
* @throws IOException
*/
void saveTaskOutput() throws IOException {
- FileSystem fs = FileSystem.get(conf);
- if (taskOutputPath != null && fs.exists(taskOutputPath)) {
- Path jobOutputPath = taskOutputPath.getParent();
-
- // Move the task outputs to their final place
- moveTaskOutputs(fs, jobOutputPath, taskOutputPath);
-
- // Delete the temporary task-specific output directory
- if (!fs.delete(taskOutputPath)) {
- LOG.info("Failed to delete the temporary output directory of task: " +
- getTaskId() + " - " + taskOutputPath);
+ if (taskOutputPath != null) {
+ FileSystem fs = taskOutputPath.getFileSystem(conf);
+ if (fs.exists(taskOutputPath)) {
+ Path jobOutputPath = taskOutputPath.getParent();
+
+ // Move the task outputs to their final place
+ moveTaskOutputs(fs, jobOutputPath, taskOutputPath);
+
+ // Delete the temporary task-specific output directory
+ if (!fs.delete(taskOutputPath)) {
+ LOG.info("Failed to delete the temporary output directory of task: " +
+ getTaskId() + " - " + taskOutputPath);
+ }
+
+ LOG.info("Saved output of task '" + getTaskId() + "' to " + jobOutputPath);
}
-
- LOG.info("Saved output of task '" + getTaskId() + "' to " + jobOutputPath);
}
}
@@ -439,13 +442,14 @@
* @throws IOException
*/
void discardTaskOutput() throws IOException {
- FileSystem fs = FileSystem.get(conf);
-
- if (taskOutputPath != null && fs.exists(taskOutputPath)) {
- // Delete the temporary task-specific output directory
- FileUtil.fullyDelete(fs, taskOutputPath);
- LOG.info("Discarded output of task '" + getTaskId() + "' - "
- + taskOutputPath);
+ if (taskOutputPath != null) {
+ FileSystem fs = taskOutputPath.getFileSystem(conf);
+ if (fs.exists(taskOutputPath)) {
+ // Delete the temporary task-specific output directory
+ FileUtil.fullyDelete(fs, taskOutputPath);
+ LOG.info("Discarded output of task '" + getTaskId() + "' - "
+ + taskOutputPath);
+ }
}
}
Modified: lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?view=diff&rev=564751&r1=564750&r2=564751
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Fri Aug 10 13:25:50 2007
@@ -51,18 +51,19 @@
}
}
public static TestResult launchWordCount(JobConf conf,
+ Path inDir,
+ Path outDir,
String input,
int numMaps,
int numReduces) throws IOException {
- final Path inDir = new Path("/testing/wc/input");
- final Path outDir = new Path("/testing/wc/output");
- FileSystem fs = FileSystem.get(conf);
- fs.delete(outDir);
- if (!fs.mkdirs(inDir)) {
+ FileSystem inFs = inDir.getFileSystem(conf);
+ FileSystem outFs = outDir.getFileSystem(conf);
+ outFs.delete(outDir);
+ if (!inFs.mkdirs(inDir)) {
throw new IOException("Mkdirs failed to create " + inDir.toString());
}
{
- DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+ DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
file.writeBytes(input);
file.close();
}
@@ -177,7 +178,9 @@
// Keeping tasks that match this pattern
jobConf.setKeepTaskFilesPattern("task_[^_]*_[0-9]*_m_000001_.*");
TestResult result;
- result = launchWordCount(jobConf,
+ final Path inDir = new Path("/testing/wc/input");
+ final Path outDir = new Path("/testing/wc/output");
+ result = launchWordCount(jobConf, inDir, outDir,
"The quick brown fox\nhas many silly\n" +
"red fox sox\n",
3, 1);
@@ -188,8 +191,25 @@
checkTaskDirectories(mr, new String[]{jobid}, new String[]{taskid});
// test with maps=0
jobConf = mr.createJobConf();
- result = launchWordCount(jobConf, "owen is oom", 0, 1);
+ result = launchWordCount(jobConf, inDir, outDir, "owen is oom", 0, 1);
assertEquals("is\t1\noom\t1\nowen\t1\n", result.output);
+ // Run a job with input and output going to localfs even though the
+ // default fs is hdfs.
+ {
+ FileSystem localfs = FileSystem.getLocal(jobConf);
+ String TEST_ROOT_DIR =
+ new File(System.getProperty("test.build.data","/tmp"))
+ .toString().replace(' ', '+');
+ Path localIn = localfs.makeQualified
+ (new Path(TEST_ROOT_DIR + "/local/in"));
+ Path localOut = localfs.makeQualified
+ (new Path(TEST_ROOT_DIR + "/local/out"));
+ result = launchWordCount(jobConf, localIn, localOut,
+ "all your base belong to us", 1, 1);
+ assertEquals("all\t1\nbase\t1\nbelong\t1\nto\t1\nus\t1\nyour\t1\n",
+ result.output);
+ assertTrue("outputs on localfs", localfs.exists(localOut));
+ }
} finally {
if (fileSys != null) { fileSys.close(); }
if (dfs != null) { dfs.shutdown(); }