You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/09/01 22:45:17 UTC
svn commit: r691055 - in /hadoop/core/trunk: CHANGES.txt
src/test/org/apache/hadoop/fs/TestCopyFiles.java
src/tools/org/apache/hadoop/tools/DistCp.java
Author: cdouglas
Date: Mon Sep 1 13:45:17 2008
New Revision: 691055
URL: http://svn.apache.org/viewvc?rev=691055&view=rev
Log:
HADOOP-3939. Add an option to DistCp to delete files at the destination
not present at the source. Contributed by Tsz Wo (Nicholas) Sze.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=691055&r1=691054&r2=691055&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Sep 1 13:45:17 2008
@@ -114,6 +114,9 @@
HADOOP-3948. Separate name-node edits and fsimage directories.
(Lohit Vijayarenu via shv)
+ HADOOP-3939. Add an option to DistCp to delete files at the destination
+ not present at the source. (Tsz Wo (Nicholas) Sze via cdouglas)
+
IMPROVEMENTS
HADOOP-3908. Fuse-dfs: better error message if llibhdfs.so doesn't exist.
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java?rev=691055&r1=691054&r2=691055&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java Mon Sep 1 13:45:17 2008
@@ -18,12 +18,15 @@
package org.apache.hadoop.fs;
+import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.PrintStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import java.util.StringTokenizer;
import junit.framework.TestCase;
@@ -54,7 +57,6 @@
((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.OFF);
((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.OFF);
((Log4JLogger)DistCp.LOG).getLogger().setLevel(Level.ALL);
- //((Log4JLogger)FileSystem.LOG).getLogger().setLevel(Level.ALL);
}
static final URI LOCAL_FS = URI.create("file:///");
@@ -742,4 +744,85 @@
if (cluster != null) { cluster.shutdown(); }
}
}
-}
+
+ public void testDelete() throws Exception {
+ final Configuration conf = new Configuration();
+ MiniDFSCluster cluster = null;
+ try {
+ cluster = new MiniDFSCluster(conf, 2, true, null);
+ final URI nnURI = FileSystem.getDefaultUri(conf);
+ final String nnUri = nnURI.toString();
+ final FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
+
+ final DistCp distcp = new DistCp(conf);
+ final FsShell shell = new FsShell(conf);
+
+ final String srcrootdir = "/src_root";
+ final String dstrootdir = "/dst_root";
+
+ {//test -delete
+ createFiles(nnURI, srcrootdir);
+ createFiles(nnURI, dstrootdir);
+ create(fs, new Path(dstrootdir, "foo"));
+ create(fs, new Path(dstrootdir, "foobar"));
+
+ System.out.println("srcrootdir=" + srcrootdir);
+ shell.run(new String[]{"-lsr", srcrootdir});
+
+ System.out.println("dstrootdir=" + dstrootdir);
+ shell.run(new String[]{"-lsr", dstrootdir});
+
+ ToolRunner.run(distcp,
+ new String[]{"-delete", "-update", "-log", "/log",
+ nnUri+srcrootdir, nnUri+dstrootdir});
+
+ String srcresults = execCmd(shell, "-lsr", srcrootdir);
+ srcresults = removePrefix(srcresults, srcrootdir);
+ System.out.println("srcresults=" + srcresults);
+
+ String dstresults = execCmd(shell, "-lsr", dstrootdir);
+ dstresults = removePrefix(dstresults, dstrootdir);
+ System.out.println("dstresults=" + dstresults);
+
+ assertEquals(srcresults, dstresults);
+ deldir(fs, dstrootdir);
+ deldir(fs, srcrootdir);
+ }
+ } finally {
+ if (cluster != null) { cluster.shutdown(); }
+ }
+ }
+
+ static void create(FileSystem fs, Path f) throws IOException {
+ FSDataOutputStream out = fs.create(f);
+ try {
+ byte[] b = new byte[1024 + RAN.nextInt(1024)];
+ RAN.nextBytes(b);
+ out.write(b);
+ } finally {
+ if (out != null) out.close();
+ }
+ }
+
+ static String execCmd(FsShell shell, String... args) throws Exception {
+ ByteArrayOutputStream baout = new ByteArrayOutputStream();
+ PrintStream out = new PrintStream(baout, true);
+ PrintStream old = System.out;
+ System.setOut(out);
+ shell.run(args);
+ out.close();
+ System.setOut(old);
+ return baout.toString();
+ }
+
+ private static String removePrefix(String lines, String prefix) {
+ final int prefixlen = prefix.length();
+ final StringTokenizer t = new StringTokenizer(lines, "\n");
+ final StringBuffer results = new StringBuffer();
+ for(; t.hasMoreTokens(); ) {
+ String s = t.nextToken();
+ results.append(s.substring(s.indexOf(prefix) + prefixlen) + "\n");
+ }
+ return results.toString();
+ }
+}
\ No newline at end of file
Modified: hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCp.java?rev=691055&r1=691054&r2=691055&view=diff
==============================================================================
--- hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCp.java (original)
+++ hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCp.java Mon Sep 1 13:45:17 2008
@@ -39,6 +39,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AccessControlException;
import org.apache.hadoop.fs.permission.FsPermission;
@@ -92,6 +93,7 @@
"\n-f <urilist_uri> Use list at <urilist_uri> as src list" +
"\n-filelimit <n> Limit the total number of files to be <= n" +
"\n-sizelimit <n> Limit the total size to be <= n bytes" +
+ "\n-delete Delete the files existing in the dst but not in src" +
"\n\nNOTE 1: if -overwrite or -update are set, each source URI is " +
"\n interpreted as an isomorphic update to an existing directory." +
@@ -114,6 +116,7 @@
static enum Counter { COPY, SKIP, FAIL, BYTESCOPIED, BYTESEXPECTED }
static enum Options {
+ DELETE("-delete", NAME + ".delete"),
FILE_LIMIT("-filelimit", NAME + ".limit.file"),
SIZE_LIMIT("-sizelimit", NAME + ".limit.size"),
IGNORE_READ_FAILURES("-i", NAME + ".ignore.read.failures"),
@@ -793,9 +796,17 @@
+ (dst == null ? "dst path" : "src"));
}
// incompatible command-line flags
- if (flags.contains(Options.OVERWRITE) && flags.contains(Options.UPDATE)) {
+ final boolean isOverwrite = flags.contains(Options.OVERWRITE);
+ final boolean isUpdate = flags.contains(Options.UPDATE);
+ final boolean isDelete = flags.contains(Options.DELETE);
+ if (isOverwrite && isUpdate) {
throw new IllegalArgumentException("Conflicting overwrite policies");
}
+ if (isDelete && !isOverwrite && !isUpdate) {
+ throw new IllegalArgumentException(Options.DELETE.cmd
+ + " must be specified with " + Options.OVERWRITE + " or "
+ + Options.UPDATE + ".");
+ }
return new Arguments(srcs, dst, log, flags, presevedAttributes,
filelimit, sizelimit);
}
@@ -1087,15 +1098,27 @@
checkAndClose(dir_writer);
}
+ FileStatus dststatus = null;
+ try {
+ dststatus = dstfs.getFileStatus(args.dst);
+ } catch(FileNotFoundException fnfe) {
+ LOG.info(args.dst + " does not exist.");
+ }
+
// create dest path dir if copying > 1 file
- if (!dstfs.exists(args.dst)) {
+ if (dststatus == null) {
if (srcCount > 1 && !dstfs.mkdirs(args.dst)) {
throw new IOException("Failed to create" + args.dst);
}
}
+
+ final Path sorted = new Path(jobDirectory, "_distcp_sorted");
+ checkDuplication(jobfs, dstfilelist, sorted, conf);
- checkDuplication(jobfs, dstfilelist,
- new Path(jobDirectory, "_distcp_sorted"), conf);
+ if (dststatus != null && args.flags.contains(Options.DELETE)) {
+ deleteNonexisting(dstfs, dststatus, sorted,
+ jobfs, jobDirectory, jobConf, conf);
+ }
Path tmpDir = new Path(
(dstExists && !dstIsDir) || (!dstExists && srcCount == 1)?
@@ -1121,7 +1144,106 @@
}
return src.getLen() == dst.getLen();
}
+
+ /** Delete the dst files/dirs which do not exist in src */
+ static private void deleteNonexisting(
+ FileSystem dstfs, FileStatus dstroot, Path dstsorted,
+ FileSystem jobfs, Path jobdir, JobConf jobconf, Configuration conf
+ ) throws IOException {
+ if (!dstroot.isDir()) {
+ throw new IOException("dst must be a directory when option "
+ + Options.DELETE.cmd + " is set, but dst (= " + dstroot.getPath()
+ + ") is not a directory.");
+ }
+ //write dst lsr results
+ final Path dstlsr = new Path(jobdir, "_distcp_dst_lsr");
+ final SequenceFile.Writer writer = SequenceFile.createWriter(jobfs, jobconf,
+ dstlsr, Text.class, FileStatus.class,
+ SequenceFile.CompressionType.NONE);
+ try {
+ //do lsr to get all file statuses in dstroot
+ final Stack<FileStatus> lsrstack = new Stack<FileStatus>();
+ for(lsrstack.push(dstroot); !lsrstack.isEmpty(); ) {
+ final FileStatus status = lsrstack.pop();
+ if (status.isDir()) {
+ for(FileStatus child : dstfs.listStatus(status.getPath())) {
+ String relative = makeRelative(dstroot.getPath(), child.getPath());
+ writer.append(new Text(relative), child);
+ lsrstack.push(child);
+ }
+ }
+ }
+ } finally {
+ checkAndClose(writer);
+ }
+
+ //sort lsr results
+ final Path sortedlsr = new Path(jobdir, "_distcp_dst_lsr_sorted");
+ SequenceFile.Sorter sorter = new SequenceFile.Sorter(jobfs,
+ new Text.Comparator(), Text.class, FileStatus.class, jobconf);
+ sorter.sort(dstlsr, sortedlsr);
+
+ //compare lsr list and dst list
+ SequenceFile.Reader lsrin = null;
+ SequenceFile.Reader dstin = null;
+ try {
+ lsrin = new SequenceFile.Reader(jobfs, sortedlsr, jobconf);
+ dstin = new SequenceFile.Reader(jobfs, dstsorted, jobconf);
+
+ //compare sorted lsr list and sorted dst list
+ final Text lsrpath = new Text();
+ final FileStatus lsrstatus = new FileStatus();
+ final Text dstpath = new Text();
+ final Text dstfrom = new Text();
+ final FsShell shell = new FsShell(conf);
+ final String[] shellargs = {"-rmr", null};
+
+ boolean hasnext = dstin.next(dstpath, dstfrom);
+ for(; lsrin.next(lsrpath, lsrstatus); ) {
+ int dst_cmp_lsr = dstpath.compareTo(lsrpath);
+ for(; hasnext && dst_cmp_lsr < 0; ) {
+ hasnext = dstin.next(dstpath, dstfrom);
+ dst_cmp_lsr = dstpath.compareTo(lsrpath);
+ }
+
+ if (dst_cmp_lsr == 0) {
+ //lsrpath exists in dst, skip it
+ hasnext = dstin.next(dstpath, dstfrom);
+ }
+ else {
+ //lsrpath does not exist, delete it
+ String s = new Path(dstroot.getPath(), lsrpath.toString()).toString();
+ if (shellargs[1] == null || !isAncestorPath(shellargs[1], s)) {
+ shellargs[1] = s;
+ int r = 0;
+ try {
+ r = shell.run(shellargs);
+ } catch(Exception e) {
+ throw new IOException("Exception from shell.", e);
+ }
+ if (r != 0) {
+ throw new IOException("\"" + shellargs[0] + " " + shellargs[1]
+ + "\" returns non-zero value " + r);
+ }
+ }
+ }
+ }
+ } finally {
+ checkAndClose(lsrin);
+ checkAndClose(dstin);
+ }
+ }
+
+ //is x an ancestor path of y?
+ static private boolean isAncestorPath(String x, String y) {
+ if (!y.startsWith(x)) {
+ return false;
+ }
+ final int len = x.length();
+ return y.length() == len || y.charAt(len) == Path.SEPARATOR_CHAR;
+ }
+
/** Check whether the file list have duplication. */
static private void checkDuplication(FileSystem fs, Path file, Path sorted,
Configuration conf) throws IOException {