You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2007/08/10 22:25:51 UTC

svn commit: r564751 - in /lucene/hadoop/branches/branch-0.14: ./ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Author: omalley
Date: Fri Aug 10 13:25:50 2007
New Revision: 564751

URL: http://svn.apache.org/viewvc?view=rev&rev=564751
Log:
Merge -r 564740:564744 from trunk to 0.14 branch. Fixes HADOOP-71.

Modified:
    lucene/hadoop/branches/branch-0.14/CHANGES.txt
    lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/LineRecordReader.java
    lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/MapTask.java
    lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java
    lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/Task.java
    lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java

Modified: lucene/hadoop/branches/branch-0.14/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/CHANGES.txt?view=diff&rev=564751&r1=564750&r2=564751
==============================================================================
--- lucene/hadoop/branches/branch-0.14/CHANGES.txt (original)
+++ lucene/hadoop/branches/branch-0.14/CHANGES.txt Fri Aug 10 13:25:50 2007
@@ -463,6 +463,9 @@
 148. HADOOP-1680.  Improvements to Block CRC upgrade messages.
      (Raghu Angadi via dhruba)
 
+149. HADOOP-71.  Allow Text and SequenceFile Map/Reduce inputs from non-default 
+     filesystems. (omalley)
+
 Release 0.13.0 - 2007-06-08
 
  1. HADOOP-1047.  Fix TestReplication to succeed more reliably.

Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/LineRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/LineRecordReader.java?view=diff&rev=564751&r1=564750&r2=564751
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/LineRecordReader.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/LineRecordReader.java Fri Aug 10 13:25:50 2007
@@ -69,7 +69,7 @@
     final CompressionCodec codec = compressionCodecs.getCodec(file);
 
     // open the file and seek to the start of the split
-    FileSystem fs = FileSystem.get(job);
+    FileSystem fs = file.getFileSystem(job);
     FSDataInputStream fileIn = fs.open(split.getPath());
     InputStream in = fileIn;
     boolean skipFirstLine = false;

Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=564751&r1=564750&r2=564751
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/MapTask.java Fri Aug 10 13:25:50 2007
@@ -210,14 +210,11 @@
 
     private Reporter reporter = null;
 
-    private JobConf job;
-
     public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
         JobConf job, Reporter reporter) throws IOException {
-      this.job = job;
       this.reporter = reporter;
       String finalName = getOutputName(getPartition());
-      FileSystem fs = FileSystem.get(this.job);
+      FileSystem fs = FileSystem.get(job);
 
       out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
     }

Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java?view=diff&rev=564751&r1=564750&r2=564751
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java Fri Aug 10 13:25:50 2007
@@ -20,10 +20,11 @@
 
 import java.io.IOException;
 
-import org.apache.hadoop.fs.FileSystem;
 
-import org.apache.hadoop.io.*;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.*;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /** An {@link RecordReader} for {@link SequenceFile}s. */
@@ -36,8 +37,9 @@
 
   public SequenceFileRecordReader(Configuration conf, FileSplit split)
     throws IOException {
-    FileSystem fs = FileSystem.get(conf);
-    this.in = new SequenceFile.Reader(fs, split.getPath(), conf);
+    Path path = split.getPath();
+    FileSystem fs = path.getFileSystem(conf);
+    this.in = new SequenceFile.Reader(fs, path, conf);
     this.end = split.getStart() + split.getLength();
     this.conf = conf;
 

Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=564751&r1=564750&r2=564751
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/mapred/Task.java Fri Aug 10 13:25:50 2007
@@ -180,7 +180,8 @@
   private Path getTaskOutputPath(JobConf conf) {
     Path p = new Path(conf.getOutputPath(), ("_" + taskId));
     try {
-      return p.makeQualified(FileSystem.get(conf));
+      FileSystem fs = p.getFileSystem(conf);
+      return p.makeQualified(fs);
     } catch (IOException ie) {
       LOG.warn(StringUtils.stringifyException(ie));
       return p;
@@ -390,21 +391,23 @@
    * @throws IOException
    */
   void saveTaskOutput() throws IOException {
-    FileSystem fs = FileSystem.get(conf);
 
-    if (taskOutputPath != null && fs.exists(taskOutputPath)) {
-      Path jobOutputPath = taskOutputPath.getParent();
-
-      // Move the task outputs to their final place
-      moveTaskOutputs(fs, jobOutputPath, taskOutputPath);
-
-      // Delete the temporary task-specific output directory
-      if (!fs.delete(taskOutputPath)) {
-        LOG.info("Failed to delete the temporary output directory of task: " + 
-                getTaskId() + " - " + taskOutputPath);
+    if (taskOutputPath != null) {
+      FileSystem fs = taskOutputPath.getFileSystem(conf);
+      if (fs.exists(taskOutputPath)) {
+        Path jobOutputPath = taskOutputPath.getParent();
+
+        // Move the task outputs to their final place
+        moveTaskOutputs(fs, jobOutputPath, taskOutputPath);
+
+        // Delete the temporary task-specific output directory
+        if (!fs.delete(taskOutputPath)) {
+          LOG.info("Failed to delete the temporary output directory of task: " + 
+                  getTaskId() + " - " + taskOutputPath);
+        }
+        
+        LOG.info("Saved output of task '" + getTaskId() + "' to " + jobOutputPath);
       }
-      
-      LOG.info("Saved output of task '" + getTaskId() + "' to " + jobOutputPath);
     }
   }
   
@@ -439,13 +442,14 @@
    * @throws IOException
    */
   void discardTaskOutput() throws IOException {
-    FileSystem fs = FileSystem.get(conf);
-
-    if (taskOutputPath != null && fs.exists(taskOutputPath)) {
-      // Delete the temporary task-specific output directory
-      FileUtil.fullyDelete(fs, taskOutputPath);
-      LOG.info("Discarded output of task '" + getTaskId() + "' - " 
-              + taskOutputPath);
+    if (taskOutputPath != null) {
+      FileSystem fs = taskOutputPath.getFileSystem(conf);
+      if (fs.exists(taskOutputPath)) {
+        // Delete the temporary task-specific output directory
+        FileUtil.fullyDelete(fs, taskOutputPath);
+        LOG.info("Discarded output of task '" + getTaskId() + "' - " 
+                + taskOutputPath);
+      }
     }
   }
 

Modified: lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?view=diff&rev=564751&r1=564750&r2=564751
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Fri Aug 10 13:25:50 2007
@@ -51,18 +51,19 @@
     }
   }
   public static TestResult launchWordCount(JobConf conf,
+                                           Path inDir,
+                                           Path outDir,
                                            String input,
                                            int numMaps,
                                            int numReduces) throws IOException {
-    final Path inDir = new Path("/testing/wc/input");
-    final Path outDir = new Path("/testing/wc/output");
-    FileSystem fs = FileSystem.get(conf);
-    fs.delete(outDir);
-    if (!fs.mkdirs(inDir)) {
+    FileSystem inFs = inDir.getFileSystem(conf);
+    FileSystem outFs = outDir.getFileSystem(conf);
+    outFs.delete(outDir);
+    if (!inFs.mkdirs(inDir)) {
       throw new IOException("Mkdirs failed to create " + inDir.toString());
     }
     {
-      DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+      DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
       file.writeBytes(input);
       file.close();
     }
@@ -177,7 +178,9 @@
       // Keeping tasks that match this pattern
       jobConf.setKeepTaskFilesPattern("task_[^_]*_[0-9]*_m_000001_.*");
       TestResult result;
-      result = launchWordCount(jobConf, 
+      final Path inDir = new Path("/testing/wc/input");
+      final Path outDir = new Path("/testing/wc/output");
+      result = launchWordCount(jobConf, inDir, outDir,
                                "The quick brown fox\nhas many silly\n" + 
                                "red fox sox\n",
                                3, 1);
@@ -188,8 +191,25 @@
       checkTaskDirectories(mr, new String[]{jobid}, new String[]{taskid});
       // test with maps=0
       jobConf = mr.createJobConf();
-      result = launchWordCount(jobConf, "owen is oom", 0, 1);
+      result = launchWordCount(jobConf, inDir, outDir, "owen is oom", 0, 1);
       assertEquals("is\t1\noom\t1\nowen\t1\n", result.output);
+      // Run a job with input and output going to localfs even though the 
+      // default fs is hdfs.
+      {
+        FileSystem localfs = FileSystem.getLocal(jobConf);
+        String TEST_ROOT_DIR =
+          new File(System.getProperty("test.build.data","/tmp"))
+          .toString().replace(' ', '+');
+        Path localIn = localfs.makeQualified
+                          (new Path(TEST_ROOT_DIR + "/local/in"));
+        Path localOut = localfs.makeQualified
+                          (new Path(TEST_ROOT_DIR + "/local/out"));
+        result = launchWordCount(jobConf, localIn, localOut,
+                                 "all your base belong to us", 1, 1);
+        assertEquals("all\t1\nbase\t1\nbelong\t1\nto\t1\nus\t1\nyour\t1\n", 
+                     result.output);
+        assertTrue("outputs on localfs", localfs.exists(localOut));
+      }
     } finally {
       if (fileSys != null) { fileSys.close(); }
       if (dfs != null) { dfs.shutdown(); }