You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/05/12 23:34:53 UTC

svn commit: r405884 - in /lucene/hadoop/trunk: CHANGES.txt bin/hadoop src/java/org/apache/hadoop/util/CopyFiles.java src/test/org/apache/hadoop/fs/TestCopyFiles.java

Author: cutting
Date: Fri May 12 14:34:52 2006
New Revision: 405884

URL: http://svn.apache.org/viewcvs?rev=405884&view=rev
Log:
HADOOP-209.  Add MapReduce-based file copier.  Contributed by Milind Bhandarkar.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/bin/hadoop

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/CHANGES.txt?rev=405884&r1=405883&r2=405884&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri May 12 14:34:52 2006
@@ -8,6 +8,10 @@
 
  2. HADOOP-204.  Tweaks to metrics package.  (David Bowen via cutting)
 
+ 3. HADOOP-209.  Add a MapReduce-based file copier.  This will
+    copy files within or between file systems in parallel.
+    (Milind Bhandarkar via cutting)
+
 
 Release 0.2.1 - 2006-05-12
 

Modified: lucene/hadoop/trunk/bin/hadoop
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/bin/hadoop?rev=405884&r1=405883&r2=405884&view=diff
==============================================================================
--- lucene/hadoop/trunk/bin/hadoop (original)
+++ lucene/hadoop/trunk/bin/hadoop Fri May 12 14:34:52 2006
@@ -39,6 +39,7 @@
   echo "  tasktracker       run a MapReduce task Tracker node" 
   echo "  job               manipulate MapReduce jobs" 
   echo "  jar <jar>         run a jar file"
+  echo "  cp <srcurl> <desturl> copy file or directories recursively"
   echo " or"
   echo "  CLASSNAME         run the class named CLASSNAME"
   echo "Most commands print help when invoked w/o parameters."
@@ -136,6 +137,8 @@
   CLASS=org.apache.hadoop.mapred.JobClient
 elif [ "$COMMAND" = "jar" ] ; then
   CLASS=org.apache.hadoop.util.RunJar
+elif [ "$COMMAND" = "cp" ] ; then
+  CLASS=org.apache.hadoop.util.CopyFiles
 else
   CLASS=$COMMAND
 fi

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java?rev=405884&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java Fri May 12 14:34:52 2006
@@ -0,0 +1,317 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+
+/**
+ * A Map-reduce program to recursively copy directories between
+ * diffferent file-systems.
+ *
+ * @author Milind Bhandarkar
+ */
+public class CopyFiles extends MapReduceBase implements Reducer {
+  
+  private static final String usage = "cp <srcurl> <desturl>";
+  
+  /**
+   * Mappper class for Copying files.
+   */
+  
+  public static class CopyFilesMapper extends MapReduceBase implements Mapper {
+    
+    private int sizeBuf = 4096;
+    private FileSystem srcFileSys = null;
+    private FileSystem destFileSys = null;
+    private Path srcPath = null;
+    private Path destPath = null;
+    private byte[] buffer = null;
+    
+    private void copy(String src) throws IOException {
+      // open source file
+      Path srcFile = new Path(srcPath, src);
+      FSDataInputStream in = srcFileSys.open(srcFile);
+      
+      // create directories to hold destination file and create destFile
+      Path destFile = new Path(destPath, src);
+      Path destParent = destFile.getParent();
+      if (destParent != null) { destFileSys.mkdirs(destParent); }
+      FSDataOutputStream out = destFileSys.create(destFile);
+      
+      // copy file
+      while (true) {
+        int nread = in.read(buffer);
+        if (nread < 0) {
+          break;
+        }
+        out.write(buffer, 0, nread);
+      }
+      
+      in.close();
+      out.close();
+    }
+    
+    /** Mapper configuration.
+     * Extracts source and destination file system, as well as
+     * top-level paths on source and destination directories.
+     * Gets the named file systems, to be used later in map.
+     */
+    public void configure(JobConf job) {
+      String srcfs = job.get("copy.src.fs", "local");
+      String destfs = job.get("copy.dest.fs", "local");
+      srcPath = new Path(job.get("copy.src.path", "/"));
+      destPath = new Path(job.get("copy.dest.path", "/"));
+      try {
+        srcFileSys = FileSystem.getNamed(srcfs, job);
+        destFileSys = FileSystem.getNamed(destfs, job);
+      } catch (IOException ex) {
+        throw new RuntimeException("Unable to get the named file system.", ex);
+      }
+      sizeBuf = job.getInt("copy.buf.size", 4096);
+      buffer = new byte[sizeBuf];
+    }
+    
+    /** Map method. Copies one file from source file system to destination.
+     * @param key source file name
+     * @param value not-used.
+     * @param out not-used.
+     */
+    public void map(WritableComparable key,
+        Writable val,
+        OutputCollector out,
+        Reporter reporter) throws IOException {
+      String src = ((UTF8) key).toString();
+      copy(src);
+    }
+    
+    public void close() {
+      // nothing
+    }
+  }
+  
+  public void reduce(WritableComparable key,
+      Iterator values,
+      OutputCollector output,
+      Reporter reporter) throws IOException {
+    // nothing
+  }
+  
+  private static String getFileSysName(URI url) {
+    String fsname = url.getScheme();
+    if ("dfs".equals(fsname)) {
+      String host = url.getHost();
+      int port = url.getPort();
+      return (port==(-1)) ? host : (host+":"+port);
+    } else {
+      return "local";
+    }
+  }
+
+  /**
+   * Make a path relative with respect to a root path.
+   * absPath is always assumed to descend from root.
+   * Otherwise returned path is null.
+   */
+  private static Path makeRelative(Path root, Path absPath) {
+    if (!absPath.isAbsolute()) { return absPath; }
+    String sRoot = root.toString();
+    String sPath = absPath.toString();
+    Enumeration rootTokens = new StringTokenizer(sRoot, "/");
+    ArrayList rList = Collections.list(rootTokens);
+    Enumeration pathTokens = new StringTokenizer(sPath, "/");
+    ArrayList pList = Collections.list(pathTokens);
+    Iterator rIter = rList.iterator();
+    Iterator pIter = pList.iterator();
+    while (rIter.hasNext()) {
+      String rElem = (String) rIter.next();
+      String pElem = (String) pIter.next();
+      if (!rElem.equals(pElem)) { return null; }
+    }
+    StringBuffer sb = new StringBuffer();
+    while (pIter.hasNext()) {
+      String pElem = (String) pIter.next();
+      sb.append(pElem);
+      if (pIter.hasNext()) { sb.append("/"); }
+    }
+    return new Path(sb.toString());
+  }
+  /**
+   * This is the main driver for recursively copying directories
+   * across file systems. It takes at least two cmdline parameters. A source
+   * URL and a destination URL. It then essentially does an "ls -lR" on the
+   * source URL, and writes the output in aa round-robin manner to all the map
+   * input files. The mapper actually copies the files allotted to it. And
+   * the reduce is empty.
+   */
+  public static void main(String[] args) throws IOException {
+    if (args.length != 2) {
+      System.out.println(usage);
+      return;
+    }
+    
+    Configuration conf = new Configuration();
+    
+    String srcPath = args[0];
+    String destPath = args[1];
+    
+    URI srcurl = null;
+    URI desturl = null;
+    try {
+      srcurl = new URI(srcPath);
+      desturl = new URI(destPath);
+    } catch (URISyntaxException ex) {
+      throw new RuntimeException("URL syntax error.", ex);
+    }
+    
+    JobConf jobConf = new JobConf(conf);
+    jobConf.setJobName("copy-files");
+    
+    String srcFileSysName = getFileSysName(srcurl);
+    String destFileSysName = getFileSysName(desturl);
+    
+    jobConf.set("copy.src.fs", srcFileSysName);
+    jobConf.set("copy.dest.fs", destFileSysName);
+    FileSystem srcfs;
+   
+    srcfs = FileSystem.getNamed(srcFileSysName, conf);
+    FileSystem destfs = FileSystem.getNamed(destFileSysName, conf);
+ 
+    srcPath = srcurl.getPath();
+    if ("".equals(srcPath)) { srcPath = "/"; }
+    destPath = desturl.getPath();
+    if ("".equals(destPath)) { destPath = "/"; }
+    
+    boolean isFile = false;
+    Path tmpPath = new Path(srcPath);
+    Path rootPath = new Path(srcPath);
+    if (srcfs.isFile(tmpPath)) {
+      isFile = true;
+      tmpPath = tmpPath.getParent();
+      rootPath = rootPath.getParent();
+      jobConf.set("copy.src.path", tmpPath.toString());
+    } else {
+      jobConf.set("copy.src.path", srcPath);
+    }
+    jobConf.set("copy.dest.path", destPath);
+    
+    if (!srcfs.exists(tmpPath)) {
+      System.out.println(srcPath+" does not exist.");
+      return;
+    }
+    
+    // turn off speculative execution, because DFS doesn't handle
+    // multiple writers to the same file.
+    jobConf.setSpeculativeExecution(false);
+    jobConf.setInputKeyClass(UTF8.class);
+    jobConf.setInputValueClass(UTF8.class);
+    jobConf.setInputFormat(SequenceFileInputFormat.class);
+        
+    jobConf.setOutputKeyClass(UTF8.class);
+    jobConf.setOutputValueClass(UTF8.class);
+    jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+    
+    jobConf.setMapperClass(CopyFilesMapper.class);
+    jobConf.setReducerClass(CopyFiles.class);
+    
+    int filesPerMap = jobConf.getInt("copy.files_per_map", 10);
+    jobConf.setNumReduceTasks(1);
+
+    Path tmpDir = new Path("copy-files");
+    Path inDir = new Path(tmpDir, "in");
+    Path fakeOutDir = new Path(tmpDir, "out");
+    FileSystem fileSys = FileSystem.get(jobConf);
+    fileSys.delete(tmpDir);
+    fileSys.mkdirs(inDir);
+    
+    
+    jobConf.setInputPath(inDir);
+    jobConf.setOutputPath(fakeOutDir);
+    
+    // create new sequence-files for holding paths
+    ArrayList pathList = new ArrayList();
+    ArrayList finalPathList = new ArrayList();
+    pathList.add(new Path(srcPath));
+    int part = 0;
+    while(!pathList.isEmpty()) {
+      Path top = (Path) pathList.remove(0);
+      if (srcfs.isFile(top)) {
+        top = makeRelative(rootPath, top);
+        finalPathList.add(top.toString());
+      } else {
+        Path[] paths = srcfs.listPaths(top);
+        for (int idx = 0; idx < paths.length; idx++) {
+          pathList.add(paths[idx]);
+        }
+      }
+    }
+    int numMaps = finalPathList.size() / filesPerMap;
+    if (numMaps == 0) { numMaps = 1; }
+    jobConf.setNumMapTasks(numMaps);
+    SequenceFile.Writer[] writers = new SequenceFile.Writer[numMaps];
+    
+    for(int idx=0; idx < numMaps; ++idx) {
+      Path file = new Path(inDir, "part"+idx);
+      writers[idx] = new SequenceFile.Writer(fileSys, file, UTF8.class, UTF8.class);
+    }
+    while (!finalPathList.isEmpty()) {
+      String top = (String) finalPathList.remove(0);
+      UTF8 key = new UTF8(top);
+      UTF8 value = new UTF8("");
+      writers[part].append(key, value);
+      part = (part+1)%numMaps;
+    }
+
+    for(part = 0; part < numMaps; part++) {
+      writers[part].close();
+      writers[part] = null;
+    }
+    
+    try {
+      JobClient.runJob(jobConf);
+    } finally {
+      fileSys.delete(tmpDir);
+    }
+  
+  }
+}

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java?rev=405884&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java Fri May 12 14:34:52 2006
@@ -0,0 +1,243 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.util.Random;
+import junit.framework.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.util.CopyFiles;
+
+
+/**
+ * A JUnit test for copying files recursively.
+ *
+ * @author Milind Bhandarkar
+ */
+public class TestCopyFiles extends TestCase {
+  
+  private static final int NFILES = 20;
+  private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/tmp");
+  
+  /** class MyFile contains enough information to recreate the contents of
+   * a single file.
+   */
+  private static class MyFile {
+    private static Random gen = new Random();
+    private static final int MAX_LEVELS = 3;
+    private static final int MAX_SIZE = 8*1024;
+    private static String[] dirNames = {
+      "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine"
+    };
+    private String name = "";
+    private int size;
+    private long seed;
+    
+    MyFile() {
+      int nLevels = gen.nextInt(MAX_LEVELS);
+      if(nLevels != 0) {
+        int[] levels = new int[nLevels];
+        for (int idx = 0; idx < nLevels; idx++) {
+          levels[idx] = gen.nextInt(10);
+        }
+        StringBuffer sb = new StringBuffer();
+        for (int idx = 0; idx < nLevels; idx++) {
+          sb.append(dirNames[levels[idx]]);
+          sb.append("/");
+        }
+        name = sb.toString();
+      }
+      long fidx = -1;
+      while (fidx < 0) { fidx = gen.nextLong(); }
+      name = name + Long.toString(fidx);
+      size = gen.nextInt(MAX_SIZE);
+      seed = gen.nextLong();
+    }
+    
+    String getName() { return name; }
+    int getSize() { return size; }
+    long getSeed() { return seed; }
+  }
+  
+  public TestCopyFiles(String testName) {
+    super(testName);
+  }
+
+  
+  
+  protected void setUp() throws Exception {
+  }
+
+  protected void tearDown() throws Exception {
+  }
+  
+  /** create NFILES with random names and directory hierarchies
+   * with random (but reproducible) data in them.
+   */
+  private static MyFile[] createFiles(String fsname, String topdir)
+  throws IOException {
+    MyFile[] files = new MyFile[NFILES];
+    
+    for (int idx = 0; idx < NFILES; idx++) {
+      files[idx] = new MyFile();
+    }
+    
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getNamed(fsname, conf);
+    Path root = new Path(topdir);
+    
+    for (int idx = 0; idx < NFILES; idx++) {
+      Path fPath = new Path(root, files[idx].getName());
+      fs.mkdirs(fPath.getParent());
+      FSDataOutputStream out = fs.create(fPath);
+      byte[] toWrite = new byte[files[idx].getSize()];
+      Random rb = new Random(files[idx].getSeed());
+      rb.nextBytes(toWrite);
+      out.write(toWrite);
+      out.close();
+      toWrite = null;
+    }
+    
+    return files;
+  }
+  
+  /** check if the files have been copied correctly. */
+  private static boolean checkFiles(String fsname, String topdir, MyFile[] files) 
+  throws IOException {
+    
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getNamed(fsname, conf);
+    Path root = new Path(topdir);
+    
+    for (int idx = 0; idx < NFILES; idx++) {
+      Path fPath = new Path(root, files[idx].getName());
+      FSDataInputStream in = fs.open(fPath);
+      byte[] toRead = new byte[files[idx].getSize()];
+      byte[] toCompare = new byte[files[idx].getSize()];
+      Random rb = new Random(files[idx].getSeed());
+      rb.nextBytes(toCompare);
+      assertEquals("Cannnot read file.", toRead.length, in.read(toRead));
+      in.close();
+      for (int i = 0; i < toRead.length; i++) {
+        if (toRead[i] != toCompare[i]) {
+          return false;
+        }
+      }
+      toRead = null;
+      toCompare = null;
+    }
+    
+    return true;
+  }
+  
+  /** delete directory and everything underneath it.*/
+  private static void deldir(String fsname, String topdir)
+  throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getNamed(fsname, conf);
+    Path root = new Path(topdir);
+    fs.delete(root);
+  }
+  
+  /** copy files from local file system to local file system */
+  public void testCopyFromLocalToLocal() throws IOException {
+    MyFile[] files = createFiles("local", TEST_ROOT_DIR+"/srcdat");
+    CopyFiles.main(new String[] {"file://"+TEST_ROOT_DIR+"/srcdat",
+      "file://"+TEST_ROOT_DIR+"/destdat"});
+    assertTrue("Source and destination directories do not match.",
+        checkFiles("local", TEST_ROOT_DIR+"/destdat", files));
+    deldir("local", TEST_ROOT_DIR+"/destdat");
+    deldir("local", TEST_ROOT_DIR+"/srcdat");
+  }
+  
+  /** copy files from dfs file system to dfs file system */
+  public void testCopyFromDfsToDfs() throws IOException {
+    String namenode = null;
+    MiniDFSCluster cluster = null;
+    FileSystem fileSys = null;
+    try {
+      Configuration conf = new Configuration();
+      cluster = new MiniDFSCluster(65314, conf);
+      fileSys = cluster.getFileSystem();
+      namenode = fileSys.getName();
+      if (!"local".equals(namenode)) {
+        MyFile[] files = createFiles(namenode, "/srcdat");
+        CopyFiles.main(new String[] {"dfs://"+namenode+"/srcdat",
+        "dfs://"+namenode+"/destdat"});
+        assertTrue("Source and destination directories do not match.",
+            checkFiles(namenode, "/destdat", files));
+        deldir(namenode, "/destdat");
+        deldir(namenode, "/srcdat");
+      }
+    } finally {
+      if (fileSys != null) { fileSys.close(); }
+      if (cluster != null) { cluster.shutdown(); }
+    }
+  }
+  
+  /** copy files from local file system to dfs file system */
+  public void testCopyFromLocalToDfs() throws IOException {
+    String namenode = null;
+    MiniDFSCluster cluster = null;
+    FileSystem fileSys = null;
+    try {
+      Configuration conf = new Configuration();
+      cluster = new MiniDFSCluster(65316, conf);
+      fileSys = cluster.getFileSystem();
+      namenode = fileSys.getName();
+      if (!"local".equals(namenode)) {
+        MyFile[] files = createFiles("local", TEST_ROOT_DIR+"/srcdat");
+        CopyFiles.main(new String[] {"file://"+TEST_ROOT_DIR+"/srcdat",
+        "dfs://"+namenode+"/destdat"});
+        assertTrue("Source and destination directories do not match.",
+            checkFiles(namenode, "/destdat", files));
+        deldir(namenode, "/destdat");
+        deldir("local", TEST_ROOT_DIR+"/srcdat");
+      }
+    } finally {
+      if (fileSys != null) { fileSys.close(); }
+      if (cluster != null) { cluster.shutdown(); }
+    }
+  }
+
+  /** copy files from dfs file system to local file system */
+  public void testCopyFromDfsToLocal() throws IOException {
+    String namenode = null;
+    MiniDFSCluster cluster = null;
+    FileSystem fileSys = null;
+    try {
+      Configuration conf = new Configuration();
+      cluster = new MiniDFSCluster(65318, conf);
+      fileSys = cluster.getFileSystem();
+      namenode = fileSys.getName();
+      if (!"local".equals(namenode)) {
+        MyFile[] files = createFiles(namenode, "/srcdat");
+        CopyFiles.main(new String[] {"dfs://"+namenode+"/srcdat",
+        "file://"+TEST_ROOT_DIR+"/destdat"});
+        assertTrue("Source and destination directories do not match.",
+            checkFiles("local", TEST_ROOT_DIR+"/destdat", files));
+        deldir("local", TEST_ROOT_DIR+"/destdat");
+        deldir(namenode, "/srcdat");
+      }
+    } finally {
+      if (fileSys != null) { fileSys.close(); }
+      if (cluster != null) { cluster.shutdown(); }
+    }
+ }
+  
+}