You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2007/10/24 20:15:10 UTC

svn commit: r587950 - in /lucene/hadoop/branches/branch-0.15: CHANGES.txt src/java/org/apache/hadoop/util/CopyFiles.java src/java/org/apache/hadoop/util/CopyFiles_Counter.properties

Author: omalley
Date: Wed Oct 24 11:15:09 2007
New Revision: 587950

URL: http://svn.apache.org/viewvc?rev=587950&view=rev
Log:
Merge -r 587947:587948 from trunk to branch 0.15. Fixes HADOOP-2048.

Added:
    lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/util/CopyFiles_Counter.properties
      - copied unchanged from r587948, lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles_Counter.properties
Modified:
    lucene/hadoop/branches/branch-0.15/CHANGES.txt
    lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/util/CopyFiles.java

Modified: lucene/hadoop/branches/branch-0.15/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/CHANGES.txt?rev=587950&r1=587949&r2=587950&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/CHANGES.txt (original)
+++ lucene/hadoop/branches/branch-0.15/CHANGES.txt Wed Oct 24 11:15:09 2007
@@ -322,6 +322,10 @@
     HADOOP-2080.  Fixed calculation of the checksum file size when the values
     are large. (omalley)
 
+    HADOOP-2048.  Change error handling in distcp so that each map copies
+    as much as possible before reporting the error. Also report progress on
+    every copy. (Chris Douglas via omalley)
+ 
   IMPROVEMENTS
 
     HADOOP-1908. Restructure data node code so that block sending and 

Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/util/CopyFiles.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/util/CopyFiles.java?rev=587950&r1=587949&r2=587950&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/util/CopyFiles.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/util/CopyFiles.java Wed Oct 24 11:15:09 2007
@@ -27,10 +27,12 @@
 import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Enumeration;
 import java.util.EnumSet;
+import java.util.Enumeration;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.Stack;
 import java.util.StringTokenizer;
@@ -75,7 +77,7 @@
     "\n-i                     Ignore failures" +
     "\n-log <logdir>          Write logs to <logdir>" +
     "\n-overwrite             Overwrite destination" +
-    "\n-update                Overwrite if src modif time later than dst" +
+    "\n-update                Overwrite if src size different from dst size" +
     "\n-f <urilist_uri>       Use list at <urilist_uri> as src list" +
     "\n\nNOTE: if -overwrite or -update are set, each source URI is " +
     "\n      interpreted as an isomorphic update to an existing directory." +
@@ -87,9 +89,10 @@
 
   private static final long BYTES_PER_MAP =  256 * 1024 * 1024;
   private static final int MAX_MAPS_PER_NODE = 20;
-
   private static final int SYNC_FILE_MAX = 10;
 
+  static enum Counter { COPY, SKIP, FAIL, BYTESCOPIED, BYTESEXPECTED }
+
   private JobConf conf;
 
   public void setConf(Configuration conf) {
@@ -227,10 +230,19 @@
     private JobConf job;
 
     // stats
-    private static final long reportInterval = BYTES_PER_MAP / 8;
-    private long bytesSinceLastReport = 0L;
-    private long totalBytesCopied = 0L;
     private static final DecimalFormat pcntfmt = new DecimalFormat("0.00");
+    private int failcount = 0;
+    private int skipcount = 0;
+    private int copycount = 0;
+
+    // hack
+    private Reporter rep;
+
+    private void updateStatus() {
+      rep.setStatus("Copied: " + copycount + " Skipped: " + skipcount +
+                    " Failed: " + failcount);
+    }
+
 
     /**
      * Copy a file to a destination.
@@ -268,6 +280,7 @@
         dstpath = destParent;
       }
 
+      long cbcopied = 0L;
       FSDataInputStream in = null;
       FSDataOutputStream out = null;
       try {
@@ -275,29 +288,37 @@
            && (!overwrite && !(update
                && needsUpdate(srcstat, destFileSys.getFileStatus(dstpath))))) {
           outc.collect(null, new Text("SKIP: " + srcstat.getPath()));
-          reporter.setStatus("Skipped " + srcstat.getPath());
+          ++skipcount;
+          reporter.incrCounter(Counter.SKIP, 1);
+          updateStatus();
           return;
         }
         // open src file
         in = srcstat.getPath().getFileSystem(job).open(srcstat.getPath());
-        long totalBytes = srcstat.getLen();
+        final long cblen = srcstat.getLen();
+        reporter.incrCounter(Counter.BYTESEXPECTED, cblen);
         // open dst file
         out = preserve_status
           ? destFileSys.create(dstpath, true, sizeBuf, srcstat.getReplication(),
              srcstat.getBlockSize(), reporter)
           : destFileSys.create(dstpath, reporter);
         // copy file
-        int nread;
-        while ((nread = in.read(buffer)) >= 0) {
-          out.write(buffer, 0, nread);
-          bytesSinceLastReport += nread;
-          if (bytesSinceLastReport > reportInterval) {
-            totalBytesCopied += bytesSinceLastReport;
-            bytesSinceLastReport = 0L;
-            reporter.setStatus("Copy " + dstpath + ": " +
-                pcntfmt.format(100.0 * totalBytesCopied / totalBytes) + "% and "
-                + StringUtils.humanReadableInt(totalBytesCopied) + " bytes");
-          }
+        int cbread;
+        while ((cbread = in.read(buffer)) >= 0) {
+          out.write(buffer, 0, cbread);
+          cbcopied += cbread;
+          reporter.setStatus(pcntfmt.format(100.0 * cbcopied / cblen) +
+              " " + dstpath + " [ " +
+              StringUtils.humanReadableInt(cbcopied) + " / " +
+              StringUtils.humanReadableInt(cblen) + " ]");
+        }
+        if (cbcopied != cblen) {
+          final String badlen = "ERROR? copied " + cbcopied + " bytes (" +
+              StringUtils.humanReadableInt(cbcopied) + ") expected " +
+              cblen + " bytes (" + StringUtils.humanReadableInt(cblen) +
+              ") from " + srcstat.getPath();
+          LOG.warn(badlen);
+          outc.collect(null, new Text(badlen));
         }
       } finally {
         if (in != null)
@@ -306,10 +327,10 @@
           out.close();
       }
       // report at least once for each file
-      totalBytesCopied += bytesSinceLastReport;
-      bytesSinceLastReport = 0L;
-      reporter.setStatus("Finished. Bytes copied: " +
-                         StringUtils.humanReadableInt(totalBytesCopied));
+      ++copycount;
+      reporter.incrCounter(Counter.BYTESCOPIED, cbcopied);
+      reporter.incrCounter(Counter.COPY, 1);
+      updateStatus();
     }
 
     /** Mapper configuration.
@@ -347,26 +368,44 @@
       FileStatus srcstat = value.input;
       Path dstpath = value.output;
       try {
+        rep = reporter;
         copy(srcstat, dstpath, out, reporter);
-      } catch (IOException except) {
-        out.collect(null, new Text("Failed to copy " + srcstat.getPath() +
-              " : " + StringUtils.stringifyException(except)));
-        if (ignoreReadFailures) {
-          reporter.setStatus("Failed to copy " + srcstat.getPath() + " : " +
-              StringUtils.stringifyException(except));
-          try {
-            destFileSys.delete(dstpath);
-          } catch (Throwable ex) {
-            // ignore, we are just cleaning up
-            LOG.debug("Ignoring cleanup exception", ex);
+      } catch (IOException e) {
+        ++failcount;
+        reporter.incrCounter(Counter.FAIL, 1);
+        updateStatus();
+        final String sfailure = "FAIL " + dstpath + " : " +
+                          StringUtils.stringifyException(e);
+        out.collect(null, new Text(sfailure));
+        LOG.info(sfailure);
+        try {
+          for (int i = 0; i < 3; ++i) {
+            try {
+              if (destFileSys.delete(dstpath))
+                break;
+            } catch (Throwable ex) {
+              // ignore, we are just cleaning up
+              LOG.debug("Ignoring cleanup exception", ex);
+            }
+            // update status, so we don't get timed out
+            updateStatus();
+            Thread.sleep(3 * 1000);
           }
-        } else {
-          throw except;
+        } catch (InterruptedException inte) {
+          throw (IOException)new IOException().initCause(inte);
         }
+        updateStatus();
       }
     }
 
-    public void close() { }
+    public void close() throws IOException {
+      updateStatus();
+      if (0 == failcount || ignoreReadFailures) {
+        return;
+      }
+      throw new IOException("Copied: " + copycount + " Skipped: " + skipcount +
+          " Failed: " + failcount);
+    }
 
   }
 
@@ -514,6 +553,8 @@
     try {
       copy(conf, srcPath, destPath, logPath, flags);
     } catch (Exception e) {
+      System.err.println("With failures, global counters are inaccurate; " +
+          "consider running with -i");
       System.err.println("Copy failed: " + StringUtils.stringifyException(e));
       return -1;
     }