You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2007/10/24 20:15:10 UTC
svn commit: r587950 - in /lucene/hadoop/branches/branch-0.15: CHANGES.txt
src/java/org/apache/hadoop/util/CopyFiles.java
src/java/org/apache/hadoop/util/CopyFiles_Counter.properties
Author: omalley
Date: Wed Oct 24 11:15:09 2007
New Revision: 587950
URL: http://svn.apache.org/viewvc?rev=587950&view=rev
Log:
Merge -r 587947:587948 from trunk to branch 0.15. Fixes HADOOP-2048.
Added:
lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/util/CopyFiles_Counter.properties
- copied unchanged from r587948, lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles_Counter.properties
Modified:
lucene/hadoop/branches/branch-0.15/CHANGES.txt
lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/util/CopyFiles.java
Modified: lucene/hadoop/branches/branch-0.15/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/CHANGES.txt?rev=587950&r1=587949&r2=587950&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/CHANGES.txt (original)
+++ lucene/hadoop/branches/branch-0.15/CHANGES.txt Wed Oct 24 11:15:09 2007
@@ -322,6 +322,10 @@
HADOOP-2080. Fixed calculation of the checksum file size when the values
are large. (omalley)
+ HADOOP-2048. Change error handling in distcp so that each map copies
+ as much as possible before reporting the error. Also report progress on
+ every copy. (Chris Douglas via omalley)
+
IMPROVEMENTS
HADOOP-1908. Restructure data node code so that block sending and
Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/util/CopyFiles.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/util/CopyFiles.java?rev=587950&r1=587949&r2=587950&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/util/CopyFiles.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/util/CopyFiles.java Wed Oct 24 11:15:09 2007
@@ -27,10 +27,12 @@
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Enumeration;
import java.util.EnumSet;
+import java.util.Enumeration;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.Stack;
import java.util.StringTokenizer;
@@ -75,7 +77,7 @@
"\n-i Ignore failures" +
"\n-log <logdir> Write logs to <logdir>" +
"\n-overwrite Overwrite destination" +
- "\n-update Overwrite if src modif time later than dst" +
+ "\n-update Overwrite if src size different from dst size" +
"\n-f <urilist_uri> Use list at <urilist_uri> as src list" +
"\n\nNOTE: if -overwrite or -update are set, each source URI is " +
"\n interpreted as an isomorphic update to an existing directory." +
@@ -87,9 +89,10 @@
private static final long BYTES_PER_MAP = 256 * 1024 * 1024;
private static final int MAX_MAPS_PER_NODE = 20;
-
private static final int SYNC_FILE_MAX = 10;
+ static enum Counter { COPY, SKIP, FAIL, BYTESCOPIED, BYTESEXPECTED }
+
private JobConf conf;
public void setConf(Configuration conf) {
@@ -227,10 +230,19 @@
private JobConf job;
// stats
- private static final long reportInterval = BYTES_PER_MAP / 8;
- private long bytesSinceLastReport = 0L;
- private long totalBytesCopied = 0L;
private static final DecimalFormat pcntfmt = new DecimalFormat("0.00");
+ private int failcount = 0;
+ private int skipcount = 0;
+ private int copycount = 0;
+
+ // hack
+ private Reporter rep;
+
+ private void updateStatus() {
+ rep.setStatus("Copied: " + copycount + " Skipped: " + skipcount +
+ " Failed: " + failcount);
+ }
+
/**
* Copy a file to a destination.
@@ -268,6 +280,7 @@
dstpath = destParent;
}
+ long cbcopied = 0L;
FSDataInputStream in = null;
FSDataOutputStream out = null;
try {
@@ -275,29 +288,37 @@
&& (!overwrite && !(update
&& needsUpdate(srcstat, destFileSys.getFileStatus(dstpath))))) {
outc.collect(null, new Text("SKIP: " + srcstat.getPath()));
- reporter.setStatus("Skipped " + srcstat.getPath());
+ ++skipcount;
+ reporter.incrCounter(Counter.SKIP, 1);
+ updateStatus();
return;
}
// open src file
in = srcstat.getPath().getFileSystem(job).open(srcstat.getPath());
- long totalBytes = srcstat.getLen();
+ final long cblen = srcstat.getLen();
+ reporter.incrCounter(Counter.BYTESEXPECTED, cblen);
// open dst file
out = preserve_status
? destFileSys.create(dstpath, true, sizeBuf, srcstat.getReplication(),
srcstat.getBlockSize(), reporter)
: destFileSys.create(dstpath, reporter);
// copy file
- int nread;
- while ((nread = in.read(buffer)) >= 0) {
- out.write(buffer, 0, nread);
- bytesSinceLastReport += nread;
- if (bytesSinceLastReport > reportInterval) {
- totalBytesCopied += bytesSinceLastReport;
- bytesSinceLastReport = 0L;
- reporter.setStatus("Copy " + dstpath + ": " +
- pcntfmt.format(100.0 * totalBytesCopied / totalBytes) + "% and "
- + StringUtils.humanReadableInt(totalBytesCopied) + " bytes");
- }
+ int cbread;
+ while ((cbread = in.read(buffer)) >= 0) {
+ out.write(buffer, 0, cbread);
+ cbcopied += cbread;
+ reporter.setStatus(pcntfmt.format(100.0 * cbcopied / cblen) +
+ " " + dstpath + " [ " +
+ StringUtils.humanReadableInt(cbcopied) + " / " +
+ StringUtils.humanReadableInt(cblen) + " ]");
+ }
+ if (cbcopied != cblen) {
+ final String badlen = "ERROR? copied " + cbcopied + " bytes (" +
+ StringUtils.humanReadableInt(cbcopied) + ") expected " +
+ cblen + " bytes (" + StringUtils.humanReadableInt(cblen) +
+ ") from " + srcstat.getPath();
+ LOG.warn(badlen);
+ outc.collect(null, new Text(badlen));
}
} finally {
if (in != null)
@@ -306,10 +327,10 @@
out.close();
}
// report at least once for each file
- totalBytesCopied += bytesSinceLastReport;
- bytesSinceLastReport = 0L;
- reporter.setStatus("Finished. Bytes copied: " +
- StringUtils.humanReadableInt(totalBytesCopied));
+ ++copycount;
+ reporter.incrCounter(Counter.BYTESCOPIED, cbcopied);
+ reporter.incrCounter(Counter.COPY, 1);
+ updateStatus();
}
/** Mapper configuration.
@@ -347,26 +368,44 @@
FileStatus srcstat = value.input;
Path dstpath = value.output;
try {
+ rep = reporter;
copy(srcstat, dstpath, out, reporter);
- } catch (IOException except) {
- out.collect(null, new Text("Failed to copy " + srcstat.getPath() +
- " : " + StringUtils.stringifyException(except)));
- if (ignoreReadFailures) {
- reporter.setStatus("Failed to copy " + srcstat.getPath() + " : " +
- StringUtils.stringifyException(except));
- try {
- destFileSys.delete(dstpath);
- } catch (Throwable ex) {
- // ignore, we are just cleaning up
- LOG.debug("Ignoring cleanup exception", ex);
+ } catch (IOException e) {
+ ++failcount;
+ reporter.incrCounter(Counter.FAIL, 1);
+ updateStatus();
+ final String sfailure = "FAIL " + dstpath + " : " +
+ StringUtils.stringifyException(e);
+ out.collect(null, new Text(sfailure));
+ LOG.info(sfailure);
+ try {
+ for (int i = 0; i < 3; ++i) {
+ try {
+ if (destFileSys.delete(dstpath))
+ break;
+ } catch (Throwable ex) {
+ // ignore, we are just cleaning up
+ LOG.debug("Ignoring cleanup exception", ex);
+ }
+ // update status, so we don't get timed out
+ updateStatus();
+ Thread.sleep(3 * 1000);
}
- } else {
- throw except;
+ } catch (InterruptedException inte) {
+ throw (IOException)new IOException().initCause(inte);
}
+ updateStatus();
}
}
- public void close() { }
+ public void close() throws IOException {
+ updateStatus();
+ if (0 == failcount || ignoreReadFailures) {
+ return;
+ }
+ throw new IOException("Copied: " + copycount + " Skipped: " + skipcount +
+ " Failed: " + failcount);
+ }
}
@@ -514,6 +553,8 @@
try {
copy(conf, srcPath, destPath, logPath, flags);
} catch (Exception e) {
+ System.err.println("With failures, global counters are inaccurate; " +
+ "consider running with -i");
System.err.println("Copy failed: " + StringUtils.stringifyException(e));
return -1;
}