You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/09/01 22:45:17 UTC

svn commit: r691055 - in /hadoop/core/trunk: CHANGES.txt src/test/org/apache/hadoop/fs/TestCopyFiles.java src/tools/org/apache/hadoop/tools/DistCp.java

Author: cdouglas
Date: Mon Sep  1 13:45:17 2008
New Revision: 691055

URL: http://svn.apache.org/viewvc?rev=691055&view=rev
Log:
HADOOP-3939. Add an option to DistCp to delete files at the destination
not present at the source. Contributed by Tsz Wo (Nicholas) Sze.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
    hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCp.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=691055&r1=691054&r2=691055&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Sep  1 13:45:17 2008
@@ -114,6 +114,9 @@
     HADOOP-3948. Separate name-node edits and fsimage directories.
     (Lohit Vijayarenu via shv)
 
+    HADOOP-3939. Add an option to DistCp to delete files at the destination
+    not present at the source. (Tsz Wo (Nicholas) Sze via cdouglas)
+
   IMPROVEMENTS
 
     HADOOP-3908. Fuse-dfs: better error message if llibhdfs.so doesn't exist.

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java?rev=691055&r1=691054&r2=691055&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java Mon Sep  1 13:45:17 2008
@@ -18,12 +18,15 @@
 
 package org.apache.hadoop.fs;
 
+import java.io.ByteArrayOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.PrintStream;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.util.StringTokenizer;
 
 import junit.framework.TestCase;
 
@@ -54,7 +57,6 @@
     ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.OFF);
     ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.OFF);
     ((Log4JLogger)DistCp.LOG).getLogger().setLevel(Level.ALL);
-    //((Log4JLogger)FileSystem.LOG).getLogger().setLevel(Level.ALL);
   }
   
   static final URI LOCAL_FS = URI.create("file:///");
@@ -742,4 +744,85 @@
       if (cluster != null) { cluster.shutdown(); }
     }
   }
-}
+
+  public void testDelete() throws Exception {
+    final Configuration conf = new Configuration();
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster(conf, 2, true, null);
+      final URI nnURI = FileSystem.getDefaultUri(conf);
+      final String nnUri = nnURI.toString();
+      final FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
+
+      final DistCp distcp = new DistCp(conf);
+      final FsShell shell = new FsShell(conf);  
+
+      final String srcrootdir = "/src_root";
+      final String dstrootdir = "/dst_root";
+
+      {//test -delete
+        createFiles(nnURI, srcrootdir);
+        createFiles(nnURI, dstrootdir);
+        create(fs, new Path(dstrootdir, "foo"));
+        create(fs, new Path(dstrootdir, "foobar"));
+        
+        System.out.println("srcrootdir=" +  srcrootdir);
+        shell.run(new String[]{"-lsr", srcrootdir});
+
+        System.out.println("dstrootdir=" +  dstrootdir);
+        shell.run(new String[]{"-lsr", dstrootdir});
+
+        ToolRunner.run(distcp,
+            new String[]{"-delete", "-update", "-log", "/log",
+                         nnUri+srcrootdir, nnUri+dstrootdir});
+
+        String srcresults = execCmd(shell, "-lsr", srcrootdir);
+        srcresults = removePrefix(srcresults, srcrootdir);
+        System.out.println("srcresults=" +  srcresults);
+
+        String dstresults = execCmd(shell, "-lsr", dstrootdir);
+        dstresults = removePrefix(dstresults, dstrootdir);
+        System.out.println("dstresults=" +  dstresults);
+        
+        assertEquals(srcresults, dstresults);
+        deldir(fs, dstrootdir);
+        deldir(fs, srcrootdir);
+      }
+    } finally {
+      if (cluster != null) { cluster.shutdown(); }
+    }
+  }
+
+  static void create(FileSystem fs, Path f) throws IOException {
+    FSDataOutputStream out = fs.create(f);
+    try {
+      byte[] b = new byte[1024 + RAN.nextInt(1024)];
+      RAN.nextBytes(b);
+      out.write(b);
+    } finally {
+      if (out != null) out.close();
+    }
+  }
+  
+  static String execCmd(FsShell shell, String... args) throws Exception {
+    ByteArrayOutputStream baout = new ByteArrayOutputStream();
+    PrintStream out = new PrintStream(baout, true);
+    PrintStream old = System.out;
+    System.setOut(out);
+    shell.run(args);
+    out.close();
+    System.setOut(old);
+    return baout.toString();
+  }
+  
+  private static String removePrefix(String lines, String prefix) {
+    final int prefixlen = prefix.length();
+    final StringTokenizer t = new StringTokenizer(lines, "\n");
+    final StringBuffer results = new StringBuffer(); 
+    for(; t.hasMoreTokens(); ) {
+      String s = t.nextToken();
+      results.append(s.substring(s.indexOf(prefix) + prefixlen) + "\n");
+    }
+    return results.toString();
+  }
+}
\ No newline at end of file

Modified: hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCp.java?rev=691055&r1=691054&r2=691055&view=diff
==============================================================================
--- hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCp.java (original)
+++ hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCp.java Mon Sep  1 13:45:17 2008
@@ -39,6 +39,7 @@
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.AccessControlException;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -92,6 +93,7 @@
     "\n-f <urilist_uri>       Use list at <urilist_uri> as src list" +
     "\n-filelimit <n>         Limit the total number of files to be <= n" +
     "\n-sizelimit <n>         Limit the total size to be <= n bytes" +
+    "\n-delete                Delete the files existing in the dst but not in src" +
     
     "\n\nNOTE 1: if -overwrite or -update are set, each source URI is " +
     "\n      interpreted as an isomorphic update to an existing directory." +
@@ -114,6 +116,7 @@
 
   static enum Counter { COPY, SKIP, FAIL, BYTESCOPIED, BYTESEXPECTED }
   static enum Options {
+    DELETE("-delete", NAME + ".delete"),
     FILE_LIMIT("-filelimit", NAME + ".limit.file"),
     SIZE_LIMIT("-sizelimit", NAME + ".limit.size"),
     IGNORE_READ_FAILURES("-i", NAME + ".ignore.read.failures"),
@@ -793,9 +796,17 @@
             + (dst == null ? "dst path" : "src"));
       }
       // incompatible command-line flags
-      if (flags.contains(Options.OVERWRITE) && flags.contains(Options.UPDATE)) {
+      final boolean isOverwrite = flags.contains(Options.OVERWRITE);
+      final boolean isUpdate = flags.contains(Options.UPDATE);
+      final boolean isDelete = flags.contains(Options.DELETE);
+      if (isOverwrite && isUpdate) {
         throw new IllegalArgumentException("Conflicting overwrite policies");
       }
+      if (isDelete && !isOverwrite && !isUpdate) {
+        throw new IllegalArgumentException(Options.DELETE.cmd
+            + " must be specified with " + Options.OVERWRITE + " or "
+            + Options.UPDATE + ".");
+      }
       return new Arguments(srcs, dst, log, flags, presevedAttributes,
           filelimit, sizelimit);
     }
@@ -1087,15 +1098,27 @@
       checkAndClose(dir_writer);
     }
 
+    FileStatus dststatus = null;
+    try {
+      dststatus = dstfs.getFileStatus(args.dst);
+    } catch(FileNotFoundException fnfe) {
+      LOG.info(args.dst + " does not exist.");
+    }
+
     // create dest path dir if copying > 1 file
-    if (!dstfs.exists(args.dst)) {
+    if (dststatus == null) {
       if (srcCount > 1 && !dstfs.mkdirs(args.dst)) {
         throw new IOException("Failed to create" + args.dst);
       }
     }
+    
+    final Path sorted = new Path(jobDirectory, "_distcp_sorted"); 
+    checkDuplication(jobfs, dstfilelist, sorted, conf);
 
-    checkDuplication(jobfs, dstfilelist,
-        new Path(jobDirectory, "_distcp_sorted"), conf);
+    if (dststatus != null && args.flags.contains(Options.DELETE)) {
+      deleteNonexisting(dstfs, dststatus, sorted,
+          jobfs, jobDirectory, jobConf, conf);
+    }
 
     Path tmpDir = new Path(
         (dstExists && !dstIsDir) || (!dstExists && srcCount == 1)?
@@ -1121,7 +1144,106 @@
     }
     return src.getLen() == dst.getLen();
   }
+  
+  /** Delete the dst files/dirs which do not exist in src */
+  static private void deleteNonexisting(
+      FileSystem dstfs, FileStatus dstroot, Path dstsorted,
+      FileSystem jobfs, Path jobdir, JobConf jobconf, Configuration conf
+      ) throws IOException {
+    if (!dstroot.isDir()) {
+      throw new IOException("dst must be a directory when option "
+          + Options.DELETE.cmd + " is set, but dst (= " + dstroot.getPath()
+          + ") is not a directory.");
+    }
 
+    //write dst lsr results
+    final Path dstlsr = new Path(jobdir, "_distcp_dst_lsr");
+    final SequenceFile.Writer writer = SequenceFile.createWriter(jobfs, jobconf,
+        dstlsr, Text.class, FileStatus.class,
+        SequenceFile.CompressionType.NONE);
+    try {
+      //do lsr to get all file statuses in dstroot
+      final Stack<FileStatus> lsrstack = new Stack<FileStatus>();
+      for(lsrstack.push(dstroot); !lsrstack.isEmpty(); ) {
+        final FileStatus status = lsrstack.pop();
+        if (status.isDir()) {
+          for(FileStatus child : dstfs.listStatus(status.getPath())) {
+            String relative = makeRelative(dstroot.getPath(), child.getPath());
+            writer.append(new Text(relative), child);
+            lsrstack.push(child);
+          }
+        }
+      }
+    } finally {
+      checkAndClose(writer);
+    }
+
+    //sort lsr results
+    final Path sortedlsr = new Path(jobdir, "_distcp_dst_lsr_sorted");
+    SequenceFile.Sorter sorter = new SequenceFile.Sorter(jobfs,
+        new Text.Comparator(), Text.class, FileStatus.class, jobconf);
+    sorter.sort(dstlsr, sortedlsr);
+
+    //compare lsr list and dst list  
+    SequenceFile.Reader lsrin = null;
+    SequenceFile.Reader dstin = null;
+    try {
+      lsrin = new SequenceFile.Reader(jobfs, sortedlsr, jobconf);
+      dstin = new SequenceFile.Reader(jobfs, dstsorted, jobconf);
+
+      //compare sorted lsr list and sorted dst list
+      final Text lsrpath = new Text();
+      final FileStatus lsrstatus = new FileStatus();
+      final Text dstpath = new Text();
+      final Text dstfrom = new Text();
+      final FsShell shell = new FsShell(conf);
+      final String[] shellargs = {"-rmr", null};
+
+      boolean hasnext = dstin.next(dstpath, dstfrom);
+      for(; lsrin.next(lsrpath, lsrstatus); ) {
+        int dst_cmp_lsr = dstpath.compareTo(lsrpath);
+        for(; hasnext && dst_cmp_lsr < 0; ) {
+          hasnext = dstin.next(dstpath, dstfrom);
+          dst_cmp_lsr = dstpath.compareTo(lsrpath);
+        }
+        
+        if (dst_cmp_lsr == 0) {
+          //lsrpath exists in dst, skip it
+          hasnext = dstin.next(dstpath, dstfrom);
+        }
+        else {
+          //lsrpath does not exist, delete it
+          String s = new Path(dstroot.getPath(), lsrpath.toString()).toString();
+          if (shellargs[1] == null || !isAncestorPath(shellargs[1], s)) {
+            shellargs[1] = s;
+            int r = 0;
+            try {
+               r = shell.run(shellargs);
+            } catch(Exception e) {
+              throw new IOException("Exception from shell.", e);
+            }
+            if (r != 0) {
+              throw new IOException("\"" + shellargs[0] + " " + shellargs[1]
+                  + "\" returns non-zero value " + r);
+            }
+          }
+        }
+      }
+    } finally {
+      checkAndClose(lsrin);
+      checkAndClose(dstin);
+    }
+  }
+
+  //is x an ancestor path of y?
+  static private boolean isAncestorPath(String x, String y) {
+    if (!y.startsWith(x)) {
+      return false;
+    }
+    final int len = x.length();
+    return y.length() == len || y.charAt(len) == Path.SEPARATOR_CHAR;  
+  }
+  
   /** Check whether the file list have duplication. */
   static private void checkDuplication(FileSystem fs, Path file, Path sorted,
     Configuration conf) throws IOException {