You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2007/09/06 09:04:32 UTC
svn commit: r573166 - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/util/CopyFiles.java
src/test/org/apache/hadoop/fs/TestCopyFiles.java
Author: omalley
Date: Thu Sep 6 00:04:31 2007
New Revision: 573166
URL: http://svn.apache.org/viewvc?rev=573166&view=rev
Log:
HADOOP-1569. Fixes DistCP to use the standard FileSystem interface.
Contributed by Chris Douglas.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=573166&r1=573165&r2=573166&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Sep 6 00:04:31 2007
@@ -163,6 +163,9 @@
HADOOP-1425. Replace uses of ToolBase with the Tool interface.
(Enis Soztutar via cutting)
+ HADOOP-1569. Reimplement DistCP to use the standard FileSystem/URI
+ code in Hadoop so that you can copy from and to all of the supported file
+ systems.(Chris Douglas via omalley)
Release 0.14.1 - 2007-09-04
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java?rev=573166&r1=573165&r2=573166&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java Thu Sep 6 00:04:31 2007
@@ -18,24 +18,25 @@
package org.apache.hadoop.util;
-import java.io.BufferedInputStream;
import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStreamReader;
-import java.net.HttpURLConnection;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
+import java.util.EnumSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Random;
+import java.util.Stack;
import java.util.StringTokenizer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -43,39 +44,54 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.InvalidInputException;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileRecordReader;
/**
* A Map-reduce program to recursively copy directories between
* different file-systems.
*/
public class CopyFiles implements Tool {
- private static final String HDFS = "hdfs";
- private static final String S3 = "s3";
-
- private static final String usage = "distcp "+
- "[-i] <srcurl> | -f <urilist_uri> <desturl> [-log <logpath>]";
-
- private static final long MIN_BYTES_PER_MAP = 1L << 28;
- private static final int MAX_NUM_MAPS = 10000;
- private static final int MAX_MAPS_PER_NODE = 10;
-
- private static final String readFailuresAttribute =
- "distcp.ignore.read.failures";
-
+ private static final Log LOG = LogFactory.getLog(CopyFiles.class);
+
+ private static final String usage =
+ "distcp [OPTIONS] <srcurl>* <desturl>" +
+ "\n\nOPTIONS:" +
+ "\n-p Preserve status" +
+ "\n-i Ignore failures" +
+ "\n-log <logdir> Write logs to <logdir>" +
+ "\n-overwrite Overwrite destination" +
+ "\n-update Overwrite if src modif time later than dst" +
+ "\n-f <urilist_uri> Use list at <urilist_uri> as src list" +
+ "\n\nNOTE: if -overwrite or -update are set, each source URI is " +
+ "\n interpreted as an isomorphic update to an existing directory." +
+ "\nFor example:" +
+ "\nhadoop distcp -p -update \"hdfs://A:8020/user/foo/bar\" " +
+ "\"hdfs://B:8020/user/foo/baz\"\n" +
+ "\n would update all descendants of 'baz' also in 'bar'; it would " +
+ "\n *not* update /user/foo/baz/bar\n";
+
+ private static final long BYTES_PER_MAP = 256 * 1024 * 1024;
+ private static final int MAX_MAPS_PER_NODE = 20;
+
+ private static final int SYNC_FILE_MAX = 10;
+
private JobConf conf;
-
+
public void setConf(Configuration conf) {
if (conf instanceof JobConf) {
this.conf = (JobConf) conf;
@@ -83,723 +99,354 @@
this.conf = new JobConf(conf);
}
}
-
+
public Configuration getConf() {
return conf;
}
-
- public CopyFiles() {
- }
-
+
+ @Deprecated
+ public CopyFiles() { }
+
public CopyFiles(Configuration conf) {
setConf(conf);
}
-
+
/**
- * Base-class for all mappers for distcp
+ * An input/output pair of filenames.
*/
- public static abstract class CopyFilesMapper extends MapReduceBase
- {
- /**
- * Interface to initialize *distcp* specific map tasks.
- * @param conf : The dfs/mapred configuration.
- * @param jobConf : The handle to the jobConf object to be initialized.
- * @param srcPaths : The source paths.
- * @param destPath : The destination path.
- * @param logPath : The log path.
- * @param ignoreReadFailures : Ignore read failures?
- * @throws IOException
- */
- public abstract void setup(Configuration conf, JobConf jobConf,
- String[] srcPaths, String destPath,
- Path logPath, boolean ignoreReadFailures)
- throws IOException;
-
+ static class FilePair implements Writable {
+ FileStatus input = new FileStatus();
+ Path output;
+ FilePair() { }
+ FilePair(FileStatus input, Path output) {
+ this.input = input;
+ this.output = output;
+ }
+ public void readFields(DataInput in) throws IOException {
+ input.readFields(in);
+ output = new Path(Text.readString(in));
+ }
+ public void write(DataOutput out) throws IOException {
+ input.write(out);
+ Text.writeString(out, output.toString());
+ }
+ }
+
+ /**
+ * InputFormat of a distcp job responsible for generating splits of the src
+ * file list.
+ */
+ static class CopyInputFormat implements InputFormat {
+
/**
- * Interface to cleanup *distcp* specific resources
- * @param conf : The dfs/mapred configuration.
- * @param jobConf : The handle to the jobConf object to be initialized.
- * @param srcPath : The source uri.
- * @param destPath : The destination uri.
- * @throws IOException
+ * Does nothing.
*/
- public abstract void cleanup(Configuration conf, JobConf jobConf,
- String srcPath, String destPath) throws IOException;
-
+ public void validateInput(JobConf job) throws IOException { }
+
/**
- * Make a path relative with respect to a root path.
- * absPath is always assumed to descend from root.
- * Otherwise returned path is null.
+ * Produce splits such that each is no greater than the quotient of the
+ * total size and the number of splits requested.
+ * @param job The handle to the JobConf object
+ * @param numSplits Number of splits requested
*/
- public static Path makeRelative(Path root, Path absPath) {
- if (!absPath.isAbsolute()) { return absPath; }
- String sRoot = root.toUri().getPath();
- String sPath = absPath.toUri().getPath();
- Enumeration<Object> rootTokens = new StringTokenizer(sRoot, "/");
- ArrayList rList = Collections.list(rootTokens);
- Enumeration<Object> pathTokens = new StringTokenizer(sPath, "/");
- ArrayList pList = Collections.list(pathTokens);
- Iterator rIter = rList.iterator();
- Iterator pIter = pList.iterator();
- while (rIter.hasNext()) {
- String rElem = (String) rIter.next();
- String pElem = (String) pIter.next();
- if (!rElem.equals(pElem)) { return null; }
- }
- StringBuffer sb = new StringBuffer();
- while (pIter.hasNext()) {
- String pElem = (String) pIter.next();
- sb.append(pElem);
- if (pIter.hasNext()) { sb.append("/"); }
+ public InputSplit[] getSplits(JobConf job, int numSplits)
+ throws IOException {
+ int cnfiles = job.getInt("distcp.file.count", -1);
+ long cbsize = job.getLong("distcp.total.size", -1);
+ String srcfilelist = job.get("distcp.src.list", "");
+ if (cnfiles < 0 || cbsize < 0 || "".equals(srcfilelist)) {
+ throw new RuntimeException("Invalid metadata: #files(" + cnfiles +
+ ") total_size(" + cbsize + ") listuri(" +
+ srcfilelist + ")");
+ }
+ Path src = new Path(srcfilelist);
+ FileSystem fs = src.getFileSystem(job);
+ FileStatus srcst = fs.getFileStatus(src);
+
+ ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
+ LongWritable key = new LongWritable();
+ FilePair value = new FilePair();
+ final long targetsize = cbsize / numSplits;
+ long pos = 0L;
+ long last = 0L;
+ long acc = 0L;
+ long cbrem = srcst.getLen();
+ for (SequenceFile.Reader sl = new SequenceFile.Reader(fs, src, job);
+ sl.next(key, value); last = sl.getPosition()) {
+ // if adding this split would put this split past the target size,
+ // cut the last split and put this next file in the next split.
+ if (acc + key.get() > targetsize && acc != 0) {
+ long splitsize = last - pos;
+ splits.add(new FileSplit(src, pos, splitsize, job));
+ cbrem -= splitsize;
+ pos = last;
+ acc = 0L;
+ }
+ acc += key.get();
+ }
+ if (cbrem != 0) {
+ splits.add(new FileSplit(src, pos, cbrem, job));
}
- return new Path(sb.toString());
+
+ return splits.toArray(new FileSplit[splits.size()]);
}
+
/**
- * Calculate how many maps to run.
- * Ideal number of maps is one per file (if the map-launching overhead
- * were 0). It is limited by jobtrackers handling capacity which, lets say,
- * is MAX_NUM_MAPS. It is also limited by MAX_MAPS_PER_NODE. Also for
- * small files it is better to determine number of maps by
- * amount of data per map.
- * @param initialEstimate Initial guess at number of maps (e.g. count of
- * files).
- * @param totalBytes Count of total bytes for job (If not known, pass -1).
- * @param client
- * @return Count of maps to run.
- * @throws IOException
+ * Returns a reader for this split of the src file list.
*/
- public int getMapCount(final int initialEstimate, final long totalBytes,
- final JobClient client)
- throws IOException {
- int numMaps = initialEstimate;
- if (numMaps > MAX_NUM_MAPS) {
- numMaps = MAX_NUM_MAPS;
- }
- if (totalBytes != -1 &&
- numMaps > (int)(totalBytes / MIN_BYTES_PER_MAP)) {
- numMaps = (int) (totalBytes / MIN_BYTES_PER_MAP);
- }
- ClusterStatus cluster = client.getClusterStatus();
- int tmpMaps = cluster.getTaskTrackers() * MAX_MAPS_PER_NODE;
- if (numMaps > tmpMaps) {
- numMaps = tmpMaps;
- }
- if (numMaps == 0) {
- numMaps = 1;
- }
- return numMaps;
- }
+ public RecordReader getRecordReader(InputSplit split, JobConf job,
+ Reporter reporter) throws IOException {
+ return new SequenceFileRecordReader(job, (FileSplit)split);
+ }
+ }
+
+ /**
+ * Return true if dst should be replaced by src and the update flag is set.
+ * Right now, this merely checks that the src and dst len are not equal. This
+ * should be improved on once modification times, CRCs, etc. can
+ * be meaningful in this context.
+ */
+ private static boolean needsUpdate(FileStatus src, FileStatus dst) {
+ return src.getLen() != dst.getLen();
}
-
+
/**
- * DFSCopyFilesMapper: The mapper for copying files from the DFS.
+ * FSCopyFilesMapper: The mapper for copying files between FileSystems.
*/
- public static class FSCopyFilesMapper extends CopyFilesMapper
- implements Mapper<Text, Writable, WritableComparable, Text>
- {
- private int sizeBuf = 4096;
- private FileSystem srcFileSys = null;
+ public static class FSCopyFilesMapper
+ implements Mapper<LongWritable, FilePair, WritableComparable, Text> {
+ // config
+ private int sizeBuf = 128 * 1024;
private FileSystem destFileSys = null;
- private Path srcPath = null;
+ private boolean ignoreReadFailures;
+ private boolean preserve_status;
+ private boolean overwrite;
+ private boolean update;
private Path destPath = null;
private byte[] buffer = null;
- private static final long reportInterval = 1L << 25;
+ private JobConf job;
+
+ // stats
+ private static final long reportInterval = BYTES_PER_MAP / 8;
private long bytesSinceLastReport = 0L;
private long totalBytesCopied = 0L;
- private static DecimalFormat percentFormat = new DecimalFormat("0.00");
- private boolean ignoreReadFailures;
-
- private void copy(String src, Reporter reporter) throws IOException {
- // open source file
- Path srcFile = new Path(srcPath, src);
- FSDataInputStream in = srcFileSys.open(srcFile);
- FileStatus srcFileStatus = srcFileSys.getFileStatus(srcFile);
- long totalBytes = srcFileStatus.getLen();
-
- // create directories to hold destination file and create destFile
- Path destFile = new Path(destPath, src);
- Path destParent = destFile.getParent();
- if (destParent != null) {
- if (!destFileSys.mkdirs(destParent)) {
- throw new IOException("Mkdirs failed to create " +
- destParent.toString());
- }
- }
- FSDataOutputStream out = destFileSys.create(destFile);
-
- // copy file
- while (true) {
- int nread = in.read(buffer);
- if (nread < 0) { break; }
- out.write(buffer, 0, nread);
- bytesSinceLastReport += nread;
- if (bytesSinceLastReport > reportInterval) {
- totalBytesCopied += bytesSinceLastReport;
- bytesSinceLastReport = 0L;
- reporter.setStatus("Copy "+ src + ": " +
- percentFormat.format(100.0 * totalBytesCopied /
- totalBytes) +
- "% and " +
- StringUtils.humanReadableInt(totalBytesCopied) +
- " bytes");
- }
- }
-
- in.close();
- out.close();
- // report at least once for each file
- totalBytesCopied += bytesSinceLastReport;
- bytesSinceLastReport = 0L;
- reporter.setStatus("Finished. Bytes copied: " +
- StringUtils.humanReadableInt(totalBytesCopied));
- }
-
+ private static final DecimalFormat pcntfmt = new DecimalFormat("0.00");
+
/**
- * Initialize DFSCopyFileMapper specific job-configuration.
- * @param conf : The dfs/mapred configuration.
- * @param jobConf : The handle to the jobConf object to be initialized.
- * @param srcPaths : The source URIs.
- * @param destPath : The destination URI.
- * @param logPath : The log Path.
- * @param ignoreReadFailures : Ignore read failures?
+ * Copy a file to a destination.
+ * @param srcstat src path and metadata
+ * @param dstpath dst path
+ * @param reporter
*/
- @Override
- public void setup(Configuration conf, JobConf jobConf,
- String[] srcPaths, String destPath,
- Path logPath, boolean ignoreReadFailures)
- throws IOException
- {
- URI srcURI = toURI(srcPaths[0]);
- URI destURI = toURI(destPath);
-
- FileSystem srcfs = FileSystem.get(srcURI, conf);
- jobConf.set("copy.src.fs", srcURI.toString());
- jobConf.set("copy.dest.fs", destURI.toString());
-
- String srcPath = srcURI.getPath();
- if ("".equals(srcPath)) { srcPath = "/"; }
- destPath = destURI.getPath();
- if ("".equals(destPath)) { destPath = "/"; }
-
- Path tmpPath = new Path(srcPath);
- Path rootPath = new Path(srcPath);
- if (srcfs.isFile(tmpPath)) {
- tmpPath = tmpPath.getParent();
- rootPath = rootPath.getParent();
- jobConf.set("copy.src.path", tmpPath.toString());
- } else {
- jobConf.set("copy.src.path", srcPath);
- }
- jobConf.set("copy.dest.path", destPath);
-
- if (!srcfs.exists(tmpPath)) {
- System.out.println(srcPath+" does not exist.");
+ private void copy(FileStatus srcstat, Path dstpath, Reporter reporter)
+ throws IOException {
+
+ int totfiles = job.getInt("distcp.file.count", -1);
+ assert totfiles >= 0 : "Invalid file count " + totfiles;
+
+ // if a directory, ensure created even if empty
+ if (srcstat.isDir()) {
+ if (!destFileSys.mkdirs(dstpath)) {
+ throw new IOException("Failed to create" + dstpath);
+ }
+ // TODO: when modification times can be set, directories should be
+ // emitted to reducers so they might be preserved. Also, mkdirs does
+ // not currently return an error when the directory already exists;
+ // if this changes, all directory work might as well be done in reduce
return;
}
-
- // turn off speculative execution, because DFS doesn't handle
- // multiple writers to the same file.
- jobConf.setSpeculativeExecution(false);
- jobConf.setInputFormat(SequenceFileInputFormat.class);
-
- jobConf.setMapperClass(FSCopyFilesMapper.class);
-
- jobConf.setNumReduceTasks(0);
- jobConf.setBoolean(readFailuresAttribute, ignoreReadFailures);
-
- Random r = new Random();
- Path jobDirectory = new Path(jobConf.getSystemDir(), "distcp_"
- + Integer.toString(r.nextInt(Integer.MAX_VALUE), 36));
- Path inDir = new Path(jobDirectory, "in");
- FileSystem fileSys = FileSystem.get(jobConf);
- if (!fileSys.mkdirs(inDir)) {
- throw new IOException("Mkdirs failed to create " +
- inDir.toString());
- }
- jobConf.set("distcp.job.dir", jobDirectory.toString());
-
- jobConf.setInputPath(inDir);
- jobConf.setOutputPath(logPath);
-
- // create new sequence-files for holding paths
- ArrayList<Path> pathList = new ArrayList<Path>();
- ArrayList<String> finalPathList = new ArrayList<String>();
- pathList.add(new Path(srcPath));
- long totalBytes = 0;
- //int part = 0;
- while(!pathList.isEmpty()) {
- Path top = pathList.remove(0);
- if (srcfs.isFile(top)) {
- totalBytes += srcfs.getFileStatus(top).getLen();
- top = makeRelative(rootPath, top);
- finalPathList.add(top.toString());
- } else {
- Path[] paths = srcfs.listPaths(top);
- for (int idx = 0; idx < paths.length; idx++) {
- pathList.add(paths[idx]);
- }
+ Path destParent = dstpath.getParent();
+ if (totfiles > 1) {
+ // create directories to hold destination file
+ if (destParent != null && !destFileSys.mkdirs(destParent)) {
+ throw new IOException("mkdirs failed to create " + destParent);
}
+ } else {
+ // Copying a single file; use dst path provided by user as destination
+ // rather than destination directory
+ dstpath = destParent;
}
-
- // ideal number of maps is one per file (if the map-launching overhead
- // were 0. It is limited by jobtrackers handling capacity, which lets say
- // is MAX_NUM_MAPS. It is also limited by MAX_MAPS_PER_NODE. Also for
- // small files it is better to determine number of maps by amount of
- // data per map.
- int numMaps = finalPathList.size();
- if (numMaps > MAX_NUM_MAPS) {
- numMaps = MAX_NUM_MAPS;
- }
-
- JobClient client = new JobClient(jobConf);
- jobConf.setNumMapTasks(getMapCount(numMaps, totalBytes, client));
-
- for(int idx=0; idx < numMaps; ++idx) {
- Path file = new Path(inDir, "part"+idx);
- SequenceFile.Writer writer =
- SequenceFile.createWriter(fileSys, conf, file, Text.class, Text.class);
- for (int ipath = idx; ipath < finalPathList.size(); ipath += numMaps) {
- String path = finalPathList.get(ipath);
- writer.append(new Text(path), new Text(""));
+
+ FSDataInputStream in = null;
+ FSDataOutputStream out = null;
+ try {
+ if (destFileSys.exists(dstpath)
+ && (!overwrite && !(update
+ && needsUpdate(srcstat, destFileSys.getFileStatus(dstpath))))) {
+ reporter.setStatus("Skipped " + srcstat.getPath());
+ return;
}
- writer.close();
- }
- finalPathList = null;
-
- }
-
- @Override
- public void cleanup(Configuration conf, JobConf jobConf,
- String srcPath, String destPath)
- throws IOException
- {
- //Clean up jobDirectory
- Path jobDirectory = new Path(jobConf.get("distcp.job.dir", "/"));
- FileSystem fs = FileSystem.get(jobConf);
-
- if (!jobDirectory.equals(new Path("/"))) {
- FileUtil.fullyDelete(fs, jobDirectory);
+ // open src file
+ in = srcstat.getPath().getFileSystem(job).open(srcstat.getPath());
+ long totalBytes = srcstat.getLen();
+ // open dst file
+ out = preserve_status
+ ? destFileSys.create(dstpath, true, sizeBuf, srcstat.getReplication(),
+ srcstat.getBlockSize(), reporter)
+ : destFileSys.create(dstpath, reporter);
+ // copy file
+ int nread;
+ while ((nread = in.read(buffer)) >= 0) {
+ out.write(buffer, 0, nread);
+ bytesSinceLastReport += nread;
+ if (bytesSinceLastReport > reportInterval) {
+ totalBytesCopied += bytesSinceLastReport;
+ bytesSinceLastReport = 0L;
+ reporter.setStatus("Copy " + dstpath + ": " +
+ pcntfmt.format(100.0 * totalBytesCopied / totalBytes) + "% and "
+ + StringUtils.humanReadableInt(totalBytesCopied) + " bytes");
+ }
+ }
+ } finally {
+ if (in != null)
+ in.close();
+ if (out != null)
+ out.close();
}
+ // report at least once for each file
+ totalBytesCopied += bytesSinceLastReport;
+ bytesSinceLastReport = 0L;
+ reporter.setStatus("Finished. Bytes copied: " +
+ StringUtils.humanReadableInt(totalBytesCopied));
}
-
+
/** Mapper configuration.
* Extracts source and destination file system, as well as
* top-level paths on source and destination directories.
* Gets the named file systems, to be used later in map.
*/
- @Override
- public void configure(JobConf job)
+ public void configure(JobConf job)
{
- String srcfs = job.get("copy.src.fs", "local");
- String destfs = job.get("copy.dest.fs", "local");
- srcPath = new Path(job.get("copy.src.path", "/"));
destPath = new Path(job.get("copy.dest.path", "/"));
try {
- srcFileSys = FileSystem.get(new URI(srcfs), job);
- destFileSys = FileSystem.get(new URI(destfs), job);
- } catch (URISyntaxException e) {
- throw new RuntimeException("Failed parse of src or dest URI.", e);
+ destFileSys = destPath.getFileSystem(job);
} catch (IOException ex) {
throw new RuntimeException("Unable to get the named file system.", ex);
}
- sizeBuf = job.getInt("copy.buf.size", 4096);
+ sizeBuf = job.getInt("copy.buf.size", 128 * 1024);
buffer = new byte[sizeBuf];
- ignoreReadFailures = job.getBoolean(readFailuresAttribute, false);
+ ignoreReadFailures = job.getBoolean("distcp.ignore.read.failures", false);
+ preserve_status = job.getBoolean("distcp.preserve.status.info", false);
+ update = job.getBoolean("distcp.overwrite.ifnewer", false);
+ overwrite = !update && job.getBoolean("distcp.overwrite.always", false);
+ this.job = job;
}
-
+
/** Map method. Copies one file from source file system to destination.
- * @param key source file name
- * @param value not-used.
- * @param out not-used.
+ * @param key src len
+ * @param value FilePair (FileStatus src, Path dst)
+ * @param out Log of failed copies
* @param reporter
*/
- public void map(Text key,
- Writable value,
+ public void map(LongWritable key,
+ FilePair value,
OutputCollector<WritableComparable, Text> out,
Reporter reporter) throws IOException {
- String src = key.toString();
+ FileStatus srcstat = value.input;
+ Path dstpath = value.output;
try {
- copy(src, reporter);
+ copy(srcstat, dstpath, reporter);
} catch (IOException except) {
- out.collect(null, new Text("Failed to copy " + src + " : " +
- StringUtils.stringifyException(except)));
+ out.collect(null, new Text("Failed to copy " + srcstat.getPath() +
+ " : " + StringUtils.stringifyException(except)));
if (ignoreReadFailures) {
- reporter.setStatus("Failed to copy " + src + " : " +
- StringUtils.stringifyException(except));
+ reporter.setStatus("Failed to copy " + srcstat.getPath() + " : " +
+ StringUtils.stringifyException(except));
try {
- destFileSys.delete(new Path(destPath, src));
+ destFileSys.delete(dstpath);
} catch (Throwable ex) {
// ignore, we are just cleaning up
+ LOG.debug("Ignoring cleanup exception", ex);
}
} else {
throw except;
}
}
}
-
- @Override
- public void close() {
- // nothing
- }
-
- }
-
- public static class HTTPCopyFilesMapper extends CopyFilesMapper
- implements Mapper
- {
- private URI srcURI = null;
- private FileSystem destFileSys = null;
- private Path destPath = null;
- private JobConf jobConf = null;
- private boolean ignoreReadFailures;
-
- /**
- * Initialize HTTPCopyFileMapper specific job.
- * @param conf : The dfs/mapred configuration.
- * @param jobConf : The handle to the jobConf object to be initialized.
- * @param srcPaths : The source URI.
- * @param destPath : The destination URI.
- * @param logPath : The log Path.
- * @param ignoreReadFailures : Ignore read failures?
- */
- @Override
- public void setup(Configuration conf, JobConf jobConf,
- String[] srcPaths, String destPath,
- Path logPath, boolean ignoreReadFailures)
- throws IOException
- {
- //Destination
- URI destURI = toURI(destPath);
- jobConf.set("copy.dest.fs", destURI.toString());
- destPath = destURI.getPath();
- jobConf.set("copy.dest.path", destPath);
-
- //Setup the MR-job configuration
- jobConf.setSpeculativeExecution(false);
-
- jobConf.setInputFormat(SequenceFileInputFormat.class);
-
- jobConf.setMapperClass(HTTPCopyFilesMapper.class);
-
- JobClient client = new JobClient(jobConf);
- jobConf.setNumMapTasks(getMapCount(srcPaths.length, -1, client));
-
- jobConf.setBoolean(readFailuresAttribute, ignoreReadFailures);
-
- FileSystem fileSystem = FileSystem.get(conf);
- Random r = new Random();
- Path jobDirectory = new Path(jobConf.getSystemDir(), "distcp_" +
- Integer.toString(r.nextInt(Integer.MAX_VALUE), 36));
- Path jobInputDir = new Path(jobDirectory, "in");
- if (!fileSystem.mkdirs(jobInputDir)) {
- throw new IOException("Mkdirs failed to create " + jobInputDir.toString());
- }
- jobConf.setInputPath(jobInputDir);
-
- jobConf.set("distcp.job.dir", jobDirectory.toString());
- jobConf.setOutputPath(logPath);
-
- for(int i=0; i < srcPaths.length; ++i) {
- Path ipFile = new Path(jobInputDir, "part" + i);
- SequenceFile.Writer writer =
- SequenceFile.createWriter(fileSystem, conf, ipFile,
- Text.class, Text.class);
- writer.append(new Text(srcPaths[i]), new Text(""));
- writer.close();
- }
- }
-
- @Override
- public void cleanup(Configuration conf, JobConf jobConf,
- String srcPath, String destPath)
- throws IOException
- {
- //Clean up jobDirectory
- Path jobDirectory = new Path(jobConf.get("distcp.job.dir", "/"));
- FileSystem fs = FileSystem.get(jobConf);
-
- if (!jobDirectory.equals(new Path("/"))) {
- FileUtil.fullyDelete(fs, jobDirectory);
- }
- }
-
- @Override
- public void configure(JobConf job)
- {
- //Save jobConf
- jobConf = job;
-
- try {
- //Destination
- destFileSys = FileSystem.get(URI.create(job.get("copy.dest.fs", "file:///")), job);
- destPath = new Path(job.get("copy.dest.path", "/"));
- if (!destFileSys.exists(destPath)) {
- return;
- }
- } catch(IOException ioe) {
- return;
- }
-
- ignoreReadFailures = job.getBoolean(readFailuresAttribute, false);
- }
-
- public void map(WritableComparable key,
- Writable val,
- OutputCollector out,
- Reporter reporter) throws IOException
- {
- //The url of the file
- try {
- srcURI = new URI(((Text)key).toString());
-
- //Construct the complete destination path
- File urlPath = new File(srcURI.getPath());
- Path destinationPath = new Path(destPath, urlPath.getName());
-
- //Copy the file
- URL url = srcURI.toURL();
- HttpURLConnection connection = (HttpURLConnection)url.openConnection();
- connection.setRequestMethod("GET");
- connection.connect();
-
- int bufferSize = jobConf.getInt("io.file.buffer.size", 4096);
- byte[] buffer = new byte[bufferSize];
- BufferedInputStream is =
- new BufferedInputStream(connection.getInputStream());
-
- FSDataOutputStream os = destFileSys.create(destinationPath, true,
- bufferSize, (short)jobConf.getInt("dfs.replication", 3),
- jobConf.getLong("dfs.block.size", 67108864));
-
- int readBytes = 0;
- while((readBytes = is.read(buffer, 0, bufferSize)) != -1) {
- os.write(buffer, 0, readBytes);
- }
-
- is.close();
- os.close();
- connection.disconnect();
-
- reporter.setStatus("Copied: " + srcURI.toString() +
- " to: " + destinationPath.toString());
-
- } catch (URISyntaxException e) {
- handleException(reporter, (Text)key, e);
- } catch (IOException ioe) {
- handleException(reporter,(Text)key, ioe);
- }
- }
- /* handle exceptions */
- private void handleException( Reporter reporter, Text key, Throwable e )
- throws IOException {
- String errMsg = "Failed to copy from: " + key;
- reporter.setStatus(errMsg);
- if ( !ignoreReadFailures ) {
- throw new IOException(errMsg);
- }
- }
+ public void close() { }
+
}
-
- /**
- * Factory to create requisite Mapper objects for distcp.
- */
- private static class CopyMapperFactory
- {
- public static CopyFilesMapper getMapper(Configuration conf, String protocol)
- throws IOException
- {
- CopyFilesMapper mapper = null;
- if (protocol == null) {
- // Use 'default' filesystem.
- protocol = FileSystem.get(conf).getUri().getScheme();
- }
- protocol = protocol.toLowerCase();
-
- if (HDFS.equalsIgnoreCase(protocol) || "file".equalsIgnoreCase(protocol) ||
- S3.equalsIgnoreCase(protocol)) {
- mapper = new FSCopyFilesMapper();
- } else if ("http".equalsIgnoreCase(protocol)) {
- mapper = new HTTPCopyFilesMapper();
- }
-
- return mapper;
- }
- }
-
- private static String[] fetchSrcURIs(Configuration conf, URI srcListURI) throws IOException
- {
- ArrayList<String> uris = new ArrayList<String>();
- BufferedReader fis = null;
-
- String srcListURIScheme = srcListURI.getScheme();
- String srcListURIPath = srcListURI.getPath();
-
- if ("file".equalsIgnoreCase(srcListURIScheme)) {
- fis = new BufferedReader(new FileReader(srcListURIPath));
- } else if (srcListURIScheme != null &&
- HDFS.equalsIgnoreCase(srcListURIScheme)) {
- FileSystem fs = FileSystem.get(srcListURI, conf);
- fis = new BufferedReader(
- new InputStreamReader(fs.open(new Path(srcListURIPath)))
- );
- } else if ("http".equalsIgnoreCase(srcListURIScheme)) {
- //Copy the file
- URL url = srcListURI.toURL();
- HttpURLConnection connection = (HttpURLConnection)url.openConnection();
- connection.setRequestMethod("GET");
- connection.connect();
-
- fis = new BufferedReader(
- new InputStreamReader(connection.getInputStream())
- );
- } else {
- throw new IOException("Unsupported source list uri: " + srcListURIScheme);
- }
+ private static List<Path> fetchFileList(Configuration conf, Path srcList)
+ throws IOException {
+ List<Path> result = new ArrayList<Path>();
+ FileSystem fs = srcList.getFileSystem(conf);
+ DataInputStream raw = fs.open(srcList);
+ BufferedReader input = null;
try {
- String uri = null;
- while((uri = fis.readLine()) != null) {
- if (!uri.startsWith("#")) {
- // Check source is parseable as URI by passing via getPathURI.
- toURI(uri);
- uris.add(uri);
- }
- }
- } catch (Exception e) {
- if (fis != null) {
- fis.close();
+ input = new BufferedReader(new InputStreamReader(raw));
+ String line = input.readLine();
+ while (line != null) {
+ result.add(new Path(line));
+ line = input.readLine();
}
} finally {
- if (fis != null) {
- fis.close();
+ if (input != null) {
+ input.close();
}
}
-
- return !uris.isEmpty()? uris.toArray(new String[0]): null;
+ return result;
}
-
- /**
- * Helper function to parse input file and return source urls for
- * a given protocol.
- * @param protocol The protocol for which to find source urls.
- * @param inputFilePath The file containing the urls.
- * @return
- */
- private static String[] parseInputFile(String protocol, String[] uris)
- throws IOException
- {
- ArrayList<String> protocolURIs = new ArrayList<String>(uris.length);
-
- for(int i=0; i < uris.length; ++i) {
- // uri must start w/ protocol
- if (uris[i].startsWith(protocol)) {
- protocolURIs.add(uris[i]);
- }
- }
-
- return !protocolURIs.isEmpty()? protocolURIs.toArray(new String[0]): null;
- }
-
- public static URI toURI(final String u) throws IOException {
- URI result = null;
- try {
- result = new URI(u);
- } catch (URISyntaxException ex) {
- throw new IOException("Path does not parse as URI: " + u);
+
+ @Deprecated
+ public static void copy(Configuration conf, String srcPath,
+ String destPath, Path logPath,
+ boolean srcAsList, boolean ignoreReadFailures)
+ throws IOException {
+ final Path src = new Path(srcPath);
+ List<Path> tmp = new ArrayList<Path>();
+ if (srcAsList) {
+ tmp.addAll(fetchFileList(conf, src));
+ } else {
+ tmp.add(src);
}
- return result;
+ EnumSet<cpOpts> flags = ignoreReadFailures
+ ? EnumSet.of(cpOpts.IGNORE_READ_FAILURES)
+ : EnumSet.noneOf(cpOpts.class);
+ copy(conf, tmp, new Path(destPath), logPath, flags);
}
-
+
/**
* Driver to copy srcPath to destPath depending on required protocol.
- * @param conf Configuration
- * @param srcPath Source path URL
- * @param destPath Destination path URL
- * @param logPath the log path
- * @param srcAsList List of source URLs to copy.
- * @param ignoreReadFailures True if we are to ignore read failures.
+ * @param srcPaths list of source paths
+ * @param destPath Destination path
+ * @param logPath Log output directory
+ * @param flags Command-line flags
*/
- public static void copy(Configuration conf, String srcPath,
- String destPath, Path logPath,
- boolean srcAsList, boolean ignoreReadFailures)
- throws IOException
- {
+ public static void copy(Configuration conf, List<Path> srcPaths,
+ Path destPath, Path logPath,
+ EnumSet<cpOpts> flags) throws IOException {
//Job configuration
- JobConf jobConf = new JobConf(conf, CopyFiles.class);
- jobConf.setJobName("distcp");
-
- //Sanity check for srcPath/destPath
- URI srcURI = toURI(srcPath);
- toURI(destPath);
-
- // default logPath
- if (logPath == null) {
- logPath = new Path(toURI(destPath).getPath() + "/_distcp_logs_" +
- System.currentTimeMillis());
- }
+ JobConf job = new JobConf(conf, CopyFiles.class);
+ job.setJobName("distcp");
- //Create the task-specific mapper
- CopyFilesMapper mapper = null;
- String[] srcPaths = null;
- if (srcAsList) {
- //Ugly?!
-
- //Source paths
- srcPaths = fetchSrcURIs(conf, srcURI);
-
- // Protocol - 'hdfs://'
- String[] dfsUrls = parseInputFile(HDFS, srcPaths);
- if (dfsUrls != null) {
- for(int i=0; i < dfsUrls.length; ++i) {
- copy(conf, dfsUrls[i], destPath, logPath, false, ignoreReadFailures);
- }
- }
-
- // Protocol - 'file://'
- String[] localUrls = parseInputFile("file", srcPaths);
- if (localUrls != null) {
- for(int i=0; i < localUrls.length; ++i) {
- copy(conf, localUrls[i], destPath, logPath, false, ignoreReadFailures);
- }
- }
-
- // Protocol - 'http://'
- String[] httpUrls = parseInputFile("http", srcPaths);
- if (httpUrls != null) {
- srcPaths = httpUrls;
- mapper = CopyMapperFactory.getMapper(conf, "http");
- } else {
- // Protocol - 's3://'
- String[] s3Urls = parseInputFile(S3, srcPaths);
- if (s3Urls != null) {
- srcPaths = s3Urls;
- mapper = CopyMapperFactory.getMapper(conf, S3);
- } else {
- return;
- }
+ //Sanity check for srcPath/destPath
+ List<IOException> rslt = new ArrayList<IOException>();
+ for (Path p : srcPaths) {
+ FileSystem fs = p.getFileSystem(conf);
+ if (!fs.exists(p)) {
+ rslt.add(new IOException("Input source " + p + " does not exist."));
}
-
- // TODO: Add support for URIs w/o scheme (In this case, use the 'default'
- // filesystem).
-
- } else {
- //Single source - ugly!
- srcPaths = new String [] {srcPath};
- mapper = CopyMapperFactory.getMapper(conf, srcURI.getScheme());
}
-
+ if (!rslt.isEmpty()) {
+ throw new InvalidInputException(rslt);
+ }
+
//Initialize the mapper
- mapper.setup(conf, jobConf, srcPaths, destPath, logPath, ignoreReadFailures);
-
- //We are good to go!
try {
- JobClient.runJob(jobConf);
+ setup(conf, job, srcPaths, destPath, logPath, flags);
+ JobClient.runJob(job);
} finally {
- mapper.cleanup(conf, jobConf, srcPath, destPath);
+ cleanup(conf, job);
}
}
-
+
+ enum cpOpts { IGNORE_READ_FAILURES,
+ PRESERVE_STATUS,
+ OVERWRITE,
+ UPDATE }
+
/**
* This is the main driver for recursively copying directories
* across file systems. It takes at least two cmdline parameters. A source
@@ -809,84 +456,255 @@
* reduce is empty.
*/
public int run(String[] args) throws Exception {
- String srcPath = null;
- String destPath = null;
+ List<Path> srcPath = new ArrayList<Path>();
+ Path destPath = null;
Path logPath = null;
- boolean ignoreReadFailures = false;
- boolean srcAsList = false;
-
+ EnumSet<cpOpts> flags = EnumSet.noneOf(cpOpts.class);
+
for (int idx = 0; idx < args.length; idx++) {
if ("-i".equals(args[idx])) {
- ignoreReadFailures = true;
+ flags.add(cpOpts.IGNORE_READ_FAILURES);
+ } else if ("-p".equals(args[idx])) {
+ flags.add(cpOpts.PRESERVE_STATUS);
+ } else if ("-overwrite".equals(args[idx])) {
+ flags.add(cpOpts.OVERWRITE);
+ } else if ("-update".equals(args[idx])) {
+ flags.add(cpOpts.UPDATE);
} else if ("-f".equals(args[idx])) {
- srcAsList = true;
- } else if (srcPath == null) {
- srcPath = args[idx];
- } else if (destPath == null) {
- destPath = args[idx];
+ if (++idx == args.length) {
+ System.out.println("urilist_uri not specified");
+ System.out.println(usage);
+ return -1;
+ }
+ srcPath.addAll(fetchFileList(conf, new Path(args[idx])));
+
} else if ("-log".equals(args[idx])) {
- logPath = new Path(toURI(args[++idx]).getPath());
- } else {
+ if (++idx == args.length) {
+ System.out.println("logdir not specified");
+ System.out.println(usage);
+ return -1;
+ }
+ logPath = new Path(args[idx]);
+ } else if ('-' == args[idx].codePointAt(0)) {
+ System.out.println("Invalid switch " + args[idx]);
System.out.println(usage);
ToolRunner.printGenericCommandUsage(System.out);
return -1;
+ } else if (idx == args.length -1) {
+ destPath = new Path(args[idx]);
+ } else {
+ srcPath.add(new Path(args[idx]));
}
}
-
// mandatory command-line parameters
- if (srcPath == null || destPath == null) {
+ if (srcPath.isEmpty() || destPath == null) {
+ System.out.println("Missing " + (destPath == null ? "dst path" : "src"));
System.out.println(usage);
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
-
- // default logPath
- if (logPath == null) {
- logPath = new Path(toURI(destPath).getPath() + "/_distcp_logs_" +
- System.currentTimeMillis());
- System.out.println("Using default logPath: " + logPath);
- }
-
- // verify if srcPath, destPath are valid and logPath is valid and doesnot exist
- try {
- URI srcURI = toURI(srcPath);
- FileSystem srcfs = FileSystem.get(srcURI, conf);
- if (!srcfs.exists(new Path(srcURI.getPath()))) {
- System.out.println(srcPath + " does not exist.");
- return -1;
- }
-
- URI destURI = toURI(destPath);
- FileSystem destfs = FileSystem.get(destURI, conf);
- if (destfs.exists(new Path(destURI.getPath()))) {
- System.out.println("WARNING: " + destPath + " already exists.");
- }
-
- FileSystem logfs = FileSystem.get(logPath.toUri(), conf);
- if (logfs.exists(logPath)) {
- System.out.println("ERROR: " + logPath + " already exists.");
- return -1;
- }
- } catch (Exception e) {
- System.err.println("Copy failed: " + StringUtils.stringifyException(e));
+ // incompatible command-line flags
+ if (flags.contains(cpOpts.OVERWRITE) && flags.contains(cpOpts.UPDATE)) {
+ System.out.println("Conflicting overwrite policies");
+ System.out.println(usage);
return -1;
}
-
try {
- copy(conf, srcPath, destPath, logPath, srcAsList, ignoreReadFailures);
+ copy(conf, srcPath, destPath, logPath, flags);
} catch (Exception e) {
- System.err.println("Copy failed: "+StringUtils.stringifyException(e));
+ System.err.println("Copy failed: " + StringUtils.stringifyException(e));
return -1;
}
-
return 0;
}
-
+
public static void main(String[] args) throws Exception {
- JobConf job = new JobConf(new Configuration(), CopyFiles.class);
- CopyFiles distcp = new CopyFiles();
- distcp.setConf(job);
+ JobConf job = new JobConf(CopyFiles.class);
+ CopyFiles distcp = new CopyFiles(job);
int res = ToolRunner.run(distcp, args);
System.exit(res);
}
+
+ /**
+ * Make a path relative with respect to a root path.
+ * absPath is always assumed to descend from root.
+ * Otherwise returned path is null.
+ */
+ public static Path makeRelative(Path root, Path absPath) {
+ if (!absPath.isAbsolute()) { return absPath; }
+ String sRoot = root.toUri().getPath();
+ String sPath = absPath.toUri().getPath();
+ Enumeration<Object> rootTokens = new StringTokenizer(sRoot, "/");
+ ArrayList rList = Collections.list(rootTokens);
+ Enumeration<Object> pathTokens = new StringTokenizer(sPath, "/");
+ ArrayList pList = Collections.list(pathTokens);
+ Iterator rIter = rList.iterator();
+ Iterator pIter = pList.iterator();
+ while (rIter.hasNext()) {
+ String rElem = (String) rIter.next();
+ String pElem = (String) pIter.next();
+ if (!rElem.equals(pElem)) { return null; }
+ }
+ StringBuffer sb = new StringBuffer();
+ while (pIter.hasNext()) {
+ String pElem = (String) pIter.next();
+ sb.append(pElem);
+ if (pIter.hasNext()) { sb.append("/"); }
+ }
+ return new Path(sb.toString());
+ }
+
+ /**
+ * Calculate how many maps to run.
+ * Number of maps is bounded by a minimum of the cumulative size of the copy /
+ * BYTES_PER_MAP and at most MAX_MAPS_PER_NODE * nodes in the
+ * cluster.
+ * @param totalBytes Count of total bytes for job
+ * @param numNodes the number of nodes in cluster
+ * @return Count of maps to run.
+ */
+ private static int getMapCount(long totalBytes, int numNodes) {
+ int numMaps = (int)(totalBytes / BYTES_PER_MAP);
+ numMaps = Math.min(numMaps, numNodes * MAX_MAPS_PER_NODE);
+ return Math.max(numMaps, 1);
+ }
+ /**
+ * Delete the temporary dir containing the src file list.
+ * @param conf The dfs/mapred configuration
+ * @param jobConf The handle to the jobConf object
+ */
+ private static void cleanup(Configuration conf, JobConf jobConf)
+ throws IOException {
+ //Clean up jobDirectory
+ String jobDirName = jobConf.get("distdp.job.dir");
+ if (jobDirName != null) {
+ Path jobDirectory = new Path(jobDirName);
+ FileSystem fs = jobDirectory.getFileSystem(jobConf);
+ FileUtil.fullyDelete(fs, jobDirectory);
+ }
+ }
+
+ /**
+ * Initialize DFSCopyFileMapper specific job-configuration.
+ * @param conf : The dfs/mapred configuration.
+ * @param jobConf : The handle to the jobConf object to be initialized.
+ * @param srcPaths : The source URIs.
+ * @param destPath : The destination URI.
+ * @param logPath : Log output directory
+ * @param flags : Command-line flags
+ */
+ private static void setup(Configuration conf, JobConf jobConf,
+ List<Path> srcPaths, Path destPath,
+ Path logPath, EnumSet<cpOpts> flags)
+ throws IOException {
+ boolean update;
+ boolean overwrite;
+ jobConf.set("copy.dest.path", destPath.toUri().toString());
+
+ // turn off speculative execution, because DFS doesn't handle
+ // multiple writers to the same file.
+ jobConf.setSpeculativeExecution(false);
+
+ jobConf.setInputFormat(CopyInputFormat.class);
+
+ jobConf.setOutputKeyClass(Text.class);
+ jobConf.setOutputValueClass(Text.class);
+
+ jobConf.setMapperClass(FSCopyFilesMapper.class);
+
+ jobConf.setNumReduceTasks(0);
+ jobConf.setBoolean("distcp.ignore.read.failures",
+ flags.contains(cpOpts.IGNORE_READ_FAILURES));
+ jobConf.setBoolean("distcp.preserve.status.info",
+ flags.contains(cpOpts.PRESERVE_STATUS));
+ jobConf.setBoolean("distcp.overwrite.ifnewer",
+ update = flags.contains(cpOpts.UPDATE));
+ jobConf.setBoolean("distcp.overwrite.always",
+ overwrite = !update && flags.contains(cpOpts.OVERWRITE));
+
+ Random r = new Random();
+ String randomId = Integer.toString(r.nextInt(Integer.MAX_VALUE), 36);
+ Path jobDirectory = new Path(jobConf.getSystemDir(), "distcp_" + randomId);
+ jobConf.set("distcp.job.dir", jobDirectory.toString());
+ Path srcfilelist = new Path(jobDirectory, "_distcp_src_files");
+ jobConf.set("distcp.src.list", srcfilelist.toString());
+
+ // default logPath
+ FileSystem dstfs = destPath.getFileSystem(conf);
+ if (logPath == null) {
+ FileStatus stat = dstfs.getFileStatus(destPath);
+ String filename = "_distcp_logs_" + randomId;
+ if (!stat.isDir()) {
+ logPath = new Path(destPath.getParent(), filename);
+ } else {
+ logPath = new Path(destPath, filename);
+ }
+ }
+ jobConf.setOutputPath(logPath);
+
+ // create src list
+ SequenceFile.Writer writer = SequenceFile.createWriter(
+ jobDirectory.getFileSystem(jobConf), jobConf, srcfilelist,
+ LongWritable.class, FilePair.class,
+ SequenceFile.CompressionType.NONE);
+
+ int cnfiles = 0;
+ long cbsize = 0L;
+ try {
+ // handle the case where the destination directory doesn't exist
+ // and we've only a single src directory OR we're updating/overwriting
+ // the contents of the destination directory.
+ final boolean special_case =
+ (srcPaths.size() == 1 && !dstfs.exists(destPath))
+ || update || overwrite;
+ int cnsyncf = 0;
+ long cbsyncs = 0L;
+ for (Path p : srcPaths) {
+ Path root = p.getParent();
+ FileSystem fs = p.getFileSystem(conf);
+
+ if (special_case && fs.getFileStatus(p).isDir()) {
+ root = p;
+ }
+
+ Stack<Path> pathstack = new Stack<Path>();
+ pathstack.push(p);
+ while (!pathstack.empty()) {
+ for (FileStatus stat : fs.listStatus(pathstack.pop())) {
+ if (stat.isDir()) {
+ pathstack.push(stat.getPath());
+ } else {
+ ++cnsyncf;
+ cbsyncs += stat.getLen();
+ ++cnfiles;
+ cbsize += stat.getLen();
+ }
+ if (cnsyncf > SYNC_FILE_MAX || cbsyncs > BYTES_PER_MAP) {
+ writer.sync();
+ cnsyncf = 0;
+ cbsyncs = 0L;
+ }
+ writer.append(new LongWritable(stat.isDir() ? 0 : stat.getLen()),
+ new FilePair(stat, new Path(destPath,
+ makeRelative(root, stat.getPath()))));
+ }
+ }
+ }
+ } finally {
+ writer.close();
+ }
+
+ // create dest path dir if copying > 1 file
+ if (cnfiles > 1 && !dstfs.mkdirs(destPath)) {
+ throw new IOException("Failed to create" + destPath);
+ }
+
+ jobConf.setInt("distcp.file.count", cnfiles);
+ jobConf.setLong("distcp.total.size", cbsize);
+
+ JobClient client = new JobClient(jobConf);
+ jobConf.setNumMapTasks(getMapCount(cbsize,
+ client.getClusterStatus().getTaskTrackers()));
+ }
+
}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java?rev=573166&r1=573165&r2=573166&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java Thu Sep 6 00:04:31 2007
@@ -50,12 +50,13 @@
private static String[] dirNames = {
"zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine"
};
- private String name = "";
- private int size;
- private long seed;
-
+ private final String name;
+ private int size = 0;
+ private long seed = 0L;
+
MyFile() {
int nLevels = gen.nextInt(MAX_LEVELS);
+ String xname = "";
if (nLevels != 0) {
int[] levels = new int[nLevels];
for (int idx = 0; idx < nLevels; idx++) {
@@ -66,20 +67,23 @@
sb.append(dirNames[levels[idx]]);
sb.append("/");
}
- name = sb.toString();
+ xname = sb.toString();
}
- long fidx = -1;
- while (fidx < 0) { fidx = gen.nextLong(); }
- name = name + Long.toString(fidx);
- size = gen.nextInt(MAX_SIZE);
- seed = gen.nextLong();
+ long fidx = gen.nextLong() & Long.MAX_VALUE;
+ name = xname + Long.toString(fidx);
+ reset();
+ }
+ void reset() {
+ final int oldsize = size;
+ do { size = gen.nextInt(MAX_SIZE); } while (oldsize == size);
+ final long oldseed = seed;
+ do { seed = gen.nextLong() & Long.MAX_VALUE; } while (oldseed == seed);
}
-
String getName() { return name; }
int getSize() { return size; }
long getSeed() { return seed; }
}
-
+
public TestCopyFiles(String testName) {
super(testName);
}
@@ -126,7 +130,7 @@
return files;
}
-
+
/** check if the files have been copied correctly. */
private static boolean checkFiles(String fsname, String topdir, MyFile[] files)
throws IOException {
@@ -155,7 +159,67 @@
return true;
}
-
+
+ private static void updateFiles(String fsname, String topdir, MyFile[] files,
+ int nupdate) throws IOException {
+ assert nupdate <= NFILES;
+
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getNamed(fsname, conf);
+ Path root = new Path(topdir);
+
+ for (int idx = 0; idx < nupdate; ++idx) {
+ Path fPath = new Path(root, files[idx].getName());
+ // overwrite file
+ assertTrue(fPath.toString() + " does not exist", fs.exists(fPath));
+ FSDataOutputStream out = fs.create(fPath);
+ files[idx].reset();
+ byte[] toWrite = new byte[files[idx].getSize()];
+ Random rb = new Random(files[idx].getSeed());
+ rb.nextBytes(toWrite);
+ out.write(toWrite);
+ out.close();
+ }
+ }
+
+ private static FileStatus[] getFileStatus(String namenode,
+ String topdir, MyFile[] files) throws IOException {
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getNamed(namenode, conf);
+ Path root = new Path(topdir);
+ FileStatus[] ret = new FileStatus[NFILES];
+ for (int idx = 0; idx < NFILES; ++idx) {
+ ret[idx] = fs.getFileStatus(new Path(root, files[idx].getName()));
+ }
+ return ret;
+ }
+
+ private static boolean checkUpdate(FileStatus[] old, String namenode,
+ String topdir, MyFile[] upd, final int nupdate) throws IOException {
+
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getNamed(namenode, conf);
+ Path root = new Path(topdir);
+
+ // overwrote updated files
+ for (int idx = 0; idx < nupdate; ++idx) {
+ final FileStatus stat =
+ fs.getFileStatus(new Path(root, upd[idx].getName()));
+ if (stat.getModificationTime() <= old[idx].getModificationTime()) {
+ return false;
+ }
+ }
+ // did not overwrite files not updated
+ for (int idx = nupdate; idx < NFILES; ++idx) {
+ final FileStatus stat =
+ fs.getFileStatus(new Path(root, upd[idx].getName()));
+ if (stat.getModificationTime() != old[idx].getModificationTime()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
/** delete directory and everything underneath it.*/
private static void deldir(String fsname, String topdir)
throws IOException {
@@ -169,8 +233,8 @@
public void testCopyFromLocalToLocal() throws Exception {
MyFile[] files = createFiles("local", TEST_ROOT_DIR+"/srcdat");
ToolRunner.run(new CopyFiles(new Configuration()),
- new String[] {"file://"+TEST_ROOT_DIR+"/srcdat",
- "file://"+TEST_ROOT_DIR+"/destdat"});
+ new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
+ "file:///"+TEST_ROOT_DIR+"/destdat"});
assertTrue("Source and destination directories do not match.",
checkFiles("local", TEST_ROOT_DIR+"/destdat", files));
deldir("local", TEST_ROOT_DIR+"/destdat");
@@ -187,14 +251,15 @@
namenode = conf.get("fs.default.name", "local");
if (!"local".equals(namenode)) {
MyFile[] files = createFiles(namenode, "/srcdat");
- ToolRunner.run(new CopyFiles(conf), new String[] {"hdfs://"+namenode+"/srcdat",
- "hdfs://"+namenode+"/destdat",
- "-log",
- "hdfs://"+namenode+"/logs"});
+ ToolRunner.run(new CopyFiles(conf), new String[] {
+ "-log",
+ "hdfs://"+namenode+"/logs",
+ "hdfs://"+namenode+"/srcdat",
+ "hdfs://"+namenode+"/destdat"});
assertTrue("Source and destination directories do not match.",
checkFiles(namenode, "/destdat", files));
FileSystem fs = FileSystem.get(URI.create("hdfs://"+namenode+"/logs"), conf);
- assertTrue("Log directory doesnot exist.",
+ assertTrue("Log directory does not exist.",
fs.exists(new Path("hdfs://"+namenode+"/logs")));
deldir(namenode, "/destdat");
deldir(namenode, "/srcdat");
@@ -215,14 +280,15 @@
namenode = conf.get("fs.default.name", "local");
if (!"local".equals(namenode)) {
MyFile[] files = createFiles("local", TEST_ROOT_DIR+"/srcdat");
- ToolRunner.run(new CopyFiles(conf), new String[] {"file://"+TEST_ROOT_DIR+"/srcdat",
- "hdfs://"+namenode+"/destdat",
- "-log",
- "hdfs://"+namenode+"/logs"});
+ ToolRunner.run(new CopyFiles(conf), new String[] {
+ "-log",
+ "hdfs://"+namenode+"/logs",
+ "file:///"+TEST_ROOT_DIR+"/srcdat",
+ "hdfs://"+namenode+"/destdat"});
assertTrue("Source and destination directories do not match.",
checkFiles(namenode, "/destdat", files));
FileSystem fs = FileSystem.get(URI.create("hdfs://"+namenode+"/logs"), conf);
- assertTrue("Log directory doesnot exist.",
+ assertTrue("Log directory does not exist.",
fs.exists(new Path("hdfs://"+namenode+"/logs")));
deldir(namenode, "/destdat");
deldir(namenode, "/logs");
@@ -243,14 +309,15 @@
namenode = conf.get("fs.default.name", "local");
if (!"local".equals(namenode)) {
MyFile[] files = createFiles(namenode, "/srcdat");
- ToolRunner.run(new CopyFiles(conf), new String[] {"hdfs://"+namenode+"/srcdat",
- "file://"+TEST_ROOT_DIR+"/destdat",
- "-log",
- "/logs"});
+ ToolRunner.run(new CopyFiles(conf), new String[] {
+ "-log",
+ "/logs",
+ "hdfs://"+namenode+"/srcdat",
+ "file:///"+TEST_ROOT_DIR+"/destdat"});
assertTrue("Source and destination directories do not match.",
checkFiles("local", TEST_ROOT_DIR+"/destdat", files));
FileSystem fs = FileSystem.get(URI.create("hdfs://"+namenode+"/logs"), conf);
- assertTrue("Log directory doesnot exist.",
+ assertTrue("Log directory does not exist.",
fs.exists(new Path("/logs")));
deldir("local", TEST_ROOT_DIR+"/destdat");
deldir(namenode, "/logs");
@@ -260,5 +327,65 @@
if (cluster != null) { cluster.shutdown(); }
}
}
-
+
+ public void testCopyDfsToDfsUpdateOverwrite() throws Exception {
+ String namenode = null;
+ MiniDFSCluster cluster = null;
+ try {
+ Configuration conf = new Configuration();
+ cluster = new MiniDFSCluster(conf, 2, true, null);
+ namenode = conf.get("fs.default.name", "local");
+ if (!"local".equals(namenode)) {
+ MyFile[] files = createFiles(namenode, "/srcdat");
+ ToolRunner.run(new CopyFiles(conf), new String[] {
+ "-p",
+ "-log",
+ "hdfs://"+namenode+"/logs",
+ "hdfs://"+namenode+"/srcdat",
+ "hdfs://"+namenode+"/destdat"});
+ assertTrue("Source and destination directories do not match.",
+ checkFiles(namenode, "/destdat", files));
+ FileSystem fs = FileSystem.get(URI.create("hdfs://"+namenode+"/logs"), conf);
+ assertTrue("Log directory does not exist.",
+ fs.exists(new Path("hdfs://"+namenode+"/logs")));
+
+ FileStatus[] dchkpoint = getFileStatus(namenode, "/destdat", files);
+ final int nupdate = NFILES>>2;
+ updateFiles(namenode, "/srcdat", files, nupdate);
+ deldir(namenode, "/logs");
+
+ ToolRunner.run(new CopyFiles(conf), new String[] {
+ "-p",
+ "-update",
+ "-log",
+ "hdfs://"+namenode+"/logs",
+ "hdfs://"+namenode+"/srcdat",
+ "hdfs://"+namenode+"/destdat"});
+ assertTrue("Source and destination directories do not match.",
+ checkFiles(namenode, "/destdat", files));
+ assertTrue("Update failed to replicate all changes in src",
+ checkUpdate(dchkpoint, namenode, "/destdat", files, nupdate));
+
+ deldir(namenode, "/logs");
+ ToolRunner.run(new CopyFiles(conf), new String[] {
+ "-p",
+ "-overwrite",
+ "-log",
+ "hdfs://"+namenode+"/logs",
+ "hdfs://"+namenode+"/srcdat",
+ "hdfs://"+namenode+"/destdat"});
+ assertTrue("Source and destination directories do not match.",
+ checkFiles(namenode, "/destdat", files));
+ assertTrue("-overwrite didn't.",
+ checkUpdate(dchkpoint, namenode, "/destdat", files, NFILES));
+
+ deldir(namenode, "/destdat");
+ deldir(namenode, "/srcdat");
+ deldir(namenode, "/logs");
+ }
+ } finally {
+ if (cluster != null) { cluster.shutdown(); }
+ }
+ }
+
}