You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/10/02 20:16:37 UTC

svn commit: r452156 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/MapOutputLocation.java

Author: cutting
Date: Mon Oct  2 11:16:37 2006
New Revision: 452156

URL: http://svn.apache.org/viewvc?view=rev&rev=452156
Log:
HADOOP-552.  Improved error checking when copying map output files to reduce nodes.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=452156&r1=452155&r2=452156
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Oct  2 11:16:37 2006
@@ -100,6 +100,9 @@
     IllegalStateException's were logged, sets content-length
     correctly, and better handles some errors.  (omalley via cutting)
 
+25. HADOOP-552.  Improved error checking when copying map output files
+    to reduce nodes.  (omalley via cutting)
+
 
 Release 0.6.2 - 2006-09-18
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java?view=diff&rev=452156&r1=452155&r2=452156
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java Mon Oct  2 11:16:37 2006
@@ -19,7 +19,7 @@
 import java.io.IOException;
 
 import java.io.*;
-import java.net.URL;
+import java.net.*;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.*;
@@ -104,24 +104,46 @@
                       Path localFilename, 
                       int reduce,
                       Progressable pingee) throws IOException {
-    URL path = new URL(toString() + "&reduce=" + reduce);
-    InputStream input = path.openConnection().getInputStream();
-    OutputStream output = fileSys.create(localFilename);
+    boolean good = false;
     long totalBytes = 0;
+    URL path = new URL(toString() + "&reduce=" + reduce);
     try {
-      byte[] buffer = new byte[64 * 1024];
-      int len = input.read(buffer);
-      while (len > 0) {
-        totalBytes += len;
-        output.write(buffer, 0 ,len);
-        if (pingee != null) {
-          pingee.progress();
+      URLConnection connection = path.openConnection();
+      InputStream input = connection.getInputStream();
+      try {
+        OutputStream output = fileSys.create(localFilename);
+        try {
+          byte[] buffer = new byte[64 * 1024];
+          int len = input.read(buffer);
+          while (len > 0) {
+            totalBytes += len;
+            output.write(buffer, 0 ,len);
+            if (pingee != null) {
+              pingee.progress();
+            }
+            len = input.read(buffer);
+          }
+        } finally {
+          output.close();
         }
-        len = input.read(buffer);
+      } finally {
+        input.close();
+      }
+      good = ((int) totalBytes) == connection.getContentLength();
+      if (!good) {
+        throw new IOException("Incomplete map output received for " + path +
+                              " (" + totalBytes + " instead of " + 
+                              connection.getContentLength() + ")");
       }
     } finally {
-      input.close();
-      output.close();
+      if (!good) {
+        try {
+          fileSys.delete(localFilename);
+          totalBytes = 0;
+        } catch (Throwable th) {
+          // IGNORED because we are cleaning up
+        }
+      }
     }
     return totalBytes;
   }