You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/05/12 23:34:53 UTC
svn commit: r405884 - in /lucene/hadoop/trunk: CHANGES.txt bin/hadoop
src/java/org/apache/hadoop/util/CopyFiles.java
src/test/org/apache/hadoop/fs/TestCopyFiles.java
Author: cutting
Date: Fri May 12 14:34:52 2006
New Revision: 405884
URL: http://svn.apache.org/viewcvs?rev=405884&view=rev
Log:
HADOOP-209. Add MapReduce-based file copier. Contributed by Milind Bhandarkar.
Added:
lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/bin/hadoop
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/CHANGES.txt?rev=405884&r1=405883&r2=405884&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri May 12 14:34:52 2006
@@ -8,6 +8,10 @@
2. HADOOP-204. Tweaks to metrics package. (David Bowen via cutting)
+ 3. HADOOP-209. Add a MapReduce-based file copier. This will
+ copy files within or between file systems in parallel.
+ (Milind Bhandarkar via cutting)
+
Release 0.2.1 - 2006-05-12
Modified: lucene/hadoop/trunk/bin/hadoop
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/bin/hadoop?rev=405884&r1=405883&r2=405884&view=diff
==============================================================================
--- lucene/hadoop/trunk/bin/hadoop (original)
+++ lucene/hadoop/trunk/bin/hadoop Fri May 12 14:34:52 2006
@@ -39,6 +39,7 @@
echo " tasktracker run a MapReduce task Tracker node"
echo " job manipulate MapReduce jobs"
echo " jar <jar> run a jar file"
+ echo " cp <srcurl> <desturl> copy file or directories recursively"
echo " or"
echo " CLASSNAME run the class named CLASSNAME"
echo "Most commands print help when invoked w/o parameters."
@@ -136,6 +137,8 @@
CLASS=org.apache.hadoop.mapred.JobClient
elif [ "$COMMAND" = "jar" ] ; then
CLASS=org.apache.hadoop.util.RunJar
+elif [ "$COMMAND" = "cp" ] ; then
+ CLASS=org.apache.hadoop.util.CopyFiles
else
CLASS=$COMMAND
fi
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java?rev=405884&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java Fri May 12 14:34:52 2006
@@ -0,0 +1,317 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+
+/**
+ * A Map-reduce program to recursively copy directories between
+ * diffferent file-systems.
+ *
+ * @author Milind Bhandarkar
+ */
+public class CopyFiles extends MapReduceBase implements Reducer {
+
+ private static final String usage = "cp <srcurl> <desturl>";
+
+ /**
+ * Mappper class for Copying files.
+ */
+
+ public static class CopyFilesMapper extends MapReduceBase implements Mapper {
+
+ private int sizeBuf = 4096;
+ private FileSystem srcFileSys = null;
+ private FileSystem destFileSys = null;
+ private Path srcPath = null;
+ private Path destPath = null;
+ private byte[] buffer = null;
+
+ private void copy(String src) throws IOException {
+ // open source file
+ Path srcFile = new Path(srcPath, src);
+ FSDataInputStream in = srcFileSys.open(srcFile);
+
+ // create directories to hold destination file and create destFile
+ Path destFile = new Path(destPath, src);
+ Path destParent = destFile.getParent();
+ if (destParent != null) { destFileSys.mkdirs(destParent); }
+ FSDataOutputStream out = destFileSys.create(destFile);
+
+ // copy file
+ while (true) {
+ int nread = in.read(buffer);
+ if (nread < 0) {
+ break;
+ }
+ out.write(buffer, 0, nread);
+ }
+
+ in.close();
+ out.close();
+ }
+
+ /** Mapper configuration.
+ * Extracts source and destination file system, as well as
+ * top-level paths on source and destination directories.
+ * Gets the named file systems, to be used later in map.
+ */
+ public void configure(JobConf job) {
+ String srcfs = job.get("copy.src.fs", "local");
+ String destfs = job.get("copy.dest.fs", "local");
+ srcPath = new Path(job.get("copy.src.path", "/"));
+ destPath = new Path(job.get("copy.dest.path", "/"));
+ try {
+ srcFileSys = FileSystem.getNamed(srcfs, job);
+ destFileSys = FileSystem.getNamed(destfs, job);
+ } catch (IOException ex) {
+ throw new RuntimeException("Unable to get the named file system.", ex);
+ }
+ sizeBuf = job.getInt("copy.buf.size", 4096);
+ buffer = new byte[sizeBuf];
+ }
+
+ /** Map method. Copies one file from source file system to destination.
+ * @param key source file name
+ * @param value not-used.
+ * @param out not-used.
+ */
+ public void map(WritableComparable key,
+ Writable val,
+ OutputCollector out,
+ Reporter reporter) throws IOException {
+ String src = ((UTF8) key).toString();
+ copy(src);
+ }
+
+ public void close() {
+ // nothing
+ }
+ }
+
+ public void reduce(WritableComparable key,
+ Iterator values,
+ OutputCollector output,
+ Reporter reporter) throws IOException {
+ // nothing
+ }
+
+ private static String getFileSysName(URI url) {
+ String fsname = url.getScheme();
+ if ("dfs".equals(fsname)) {
+ String host = url.getHost();
+ int port = url.getPort();
+ return (port==(-1)) ? host : (host+":"+port);
+ } else {
+ return "local";
+ }
+ }
+
+ /**
+ * Make a path relative with respect to a root path.
+ * absPath is always assumed to descend from root.
+ * Otherwise returned path is null.
+ */
+ private static Path makeRelative(Path root, Path absPath) {
+ if (!absPath.isAbsolute()) { return absPath; }
+ String sRoot = root.toString();
+ String sPath = absPath.toString();
+ Enumeration rootTokens = new StringTokenizer(sRoot, "/");
+ ArrayList rList = Collections.list(rootTokens);
+ Enumeration pathTokens = new StringTokenizer(sPath, "/");
+ ArrayList pList = Collections.list(pathTokens);
+ Iterator rIter = rList.iterator();
+ Iterator pIter = pList.iterator();
+ while (rIter.hasNext()) {
+ String rElem = (String) rIter.next();
+ String pElem = (String) pIter.next();
+ if (!rElem.equals(pElem)) { return null; }
+ }
+ StringBuffer sb = new StringBuffer();
+ while (pIter.hasNext()) {
+ String pElem = (String) pIter.next();
+ sb.append(pElem);
+ if (pIter.hasNext()) { sb.append("/"); }
+ }
+ return new Path(sb.toString());
+ }
+ /**
+ * This is the main driver for recursively copying directories
+ * across file systems. It takes at least two cmdline parameters. A source
+ * URL and a destination URL. It then essentially does an "ls -lR" on the
+ * source URL, and writes the output in aa round-robin manner to all the map
+ * input files. The mapper actually copies the files allotted to it. And
+ * the reduce is empty.
+ */
+ public static void main(String[] args) throws IOException {
+ if (args.length != 2) {
+ System.out.println(usage);
+ return;
+ }
+
+ Configuration conf = new Configuration();
+
+ String srcPath = args[0];
+ String destPath = args[1];
+
+ URI srcurl = null;
+ URI desturl = null;
+ try {
+ srcurl = new URI(srcPath);
+ desturl = new URI(destPath);
+ } catch (URISyntaxException ex) {
+ throw new RuntimeException("URL syntax error.", ex);
+ }
+
+ JobConf jobConf = new JobConf(conf);
+ jobConf.setJobName("copy-files");
+
+ String srcFileSysName = getFileSysName(srcurl);
+ String destFileSysName = getFileSysName(desturl);
+
+ jobConf.set("copy.src.fs", srcFileSysName);
+ jobConf.set("copy.dest.fs", destFileSysName);
+ FileSystem srcfs;
+
+ srcfs = FileSystem.getNamed(srcFileSysName, conf);
+ FileSystem destfs = FileSystem.getNamed(destFileSysName, conf);
+
+ srcPath = srcurl.getPath();
+ if ("".equals(srcPath)) { srcPath = "/"; }
+ destPath = desturl.getPath();
+ if ("".equals(destPath)) { destPath = "/"; }
+
+ boolean isFile = false;
+ Path tmpPath = new Path(srcPath);
+ Path rootPath = new Path(srcPath);
+ if (srcfs.isFile(tmpPath)) {
+ isFile = true;
+ tmpPath = tmpPath.getParent();
+ rootPath = rootPath.getParent();
+ jobConf.set("copy.src.path", tmpPath.toString());
+ } else {
+ jobConf.set("copy.src.path", srcPath);
+ }
+ jobConf.set("copy.dest.path", destPath);
+
+ if (!srcfs.exists(tmpPath)) {
+ System.out.println(srcPath+" does not exist.");
+ return;
+ }
+
+ // turn off speculative execution, because DFS doesn't handle
+ // multiple writers to the same file.
+ jobConf.setSpeculativeExecution(false);
+ jobConf.setInputKeyClass(UTF8.class);
+ jobConf.setInputValueClass(UTF8.class);
+ jobConf.setInputFormat(SequenceFileInputFormat.class);
+
+ jobConf.setOutputKeyClass(UTF8.class);
+ jobConf.setOutputValueClass(UTF8.class);
+ jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+
+ jobConf.setMapperClass(CopyFilesMapper.class);
+ jobConf.setReducerClass(CopyFiles.class);
+
+ int filesPerMap = jobConf.getInt("copy.files_per_map", 10);
+ jobConf.setNumReduceTasks(1);
+
+ Path tmpDir = new Path("copy-files");
+ Path inDir = new Path(tmpDir, "in");
+ Path fakeOutDir = new Path(tmpDir, "out");
+ FileSystem fileSys = FileSystem.get(jobConf);
+ fileSys.delete(tmpDir);
+ fileSys.mkdirs(inDir);
+
+
+ jobConf.setInputPath(inDir);
+ jobConf.setOutputPath(fakeOutDir);
+
+ // create new sequence-files for holding paths
+ ArrayList pathList = new ArrayList();
+ ArrayList finalPathList = new ArrayList();
+ pathList.add(new Path(srcPath));
+ int part = 0;
+ while(!pathList.isEmpty()) {
+ Path top = (Path) pathList.remove(0);
+ if (srcfs.isFile(top)) {
+ top = makeRelative(rootPath, top);
+ finalPathList.add(top.toString());
+ } else {
+ Path[] paths = srcfs.listPaths(top);
+ for (int idx = 0; idx < paths.length; idx++) {
+ pathList.add(paths[idx]);
+ }
+ }
+ }
+ int numMaps = finalPathList.size() / filesPerMap;
+ if (numMaps == 0) { numMaps = 1; }
+ jobConf.setNumMapTasks(numMaps);
+ SequenceFile.Writer[] writers = new SequenceFile.Writer[numMaps];
+
+ for(int idx=0; idx < numMaps; ++idx) {
+ Path file = new Path(inDir, "part"+idx);
+ writers[idx] = new SequenceFile.Writer(fileSys, file, UTF8.class, UTF8.class);
+ }
+ while (!finalPathList.isEmpty()) {
+ String top = (String) finalPathList.remove(0);
+ UTF8 key = new UTF8(top);
+ UTF8 value = new UTF8("");
+ writers[part].append(key, value);
+ part = (part+1)%numMaps;
+ }
+
+ for(part = 0; part < numMaps; part++) {
+ writers[part].close();
+ writers[part] = null;
+ }
+
+ try {
+ JobClient.runJob(jobConf);
+ } finally {
+ fileSys.delete(tmpDir);
+ }
+
+ }
+}
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java?rev=405884&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java Fri May 12 14:34:52 2006
@@ -0,0 +1,243 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.util.Random;
+import junit.framework.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.util.CopyFiles;
+
+
+/**
+ * A JUnit test for copying files recursively.
+ *
+ * @author Milind Bhandarkar
+ */
+public class TestCopyFiles extends TestCase {
+
+ private static final int NFILES = 20;
+ private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/tmp");
+
+ /** class MyFile contains enough information to recreate the contents of
+ * a single file.
+ */
+ private static class MyFile {
+ private static Random gen = new Random();
+ private static final int MAX_LEVELS = 3;
+ private static final int MAX_SIZE = 8*1024;
+ private static String[] dirNames = {
+ "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine"
+ };
+ private String name = "";
+ private int size;
+ private long seed;
+
+ MyFile() {
+ int nLevels = gen.nextInt(MAX_LEVELS);
+ if(nLevels != 0) {
+ int[] levels = new int[nLevels];
+ for (int idx = 0; idx < nLevels; idx++) {
+ levels[idx] = gen.nextInt(10);
+ }
+ StringBuffer sb = new StringBuffer();
+ for (int idx = 0; idx < nLevels; idx++) {
+ sb.append(dirNames[levels[idx]]);
+ sb.append("/");
+ }
+ name = sb.toString();
+ }
+ long fidx = -1;
+ while (fidx < 0) { fidx = gen.nextLong(); }
+ name = name + Long.toString(fidx);
+ size = gen.nextInt(MAX_SIZE);
+ seed = gen.nextLong();
+ }
+
+ String getName() { return name; }
+ int getSize() { return size; }
+ long getSeed() { return seed; }
+ }
+
+ public TestCopyFiles(String testName) {
+ super(testName);
+ }
+
+
+
+ protected void setUp() throws Exception {
+ }
+
+ protected void tearDown() throws Exception {
+ }
+
+ /** create NFILES with random names and directory hierarchies
+ * with random (but reproducible) data in them.
+ */
+ private static MyFile[] createFiles(String fsname, String topdir)
+ throws IOException {
+ MyFile[] files = new MyFile[NFILES];
+
+ for (int idx = 0; idx < NFILES; idx++) {
+ files[idx] = new MyFile();
+ }
+
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getNamed(fsname, conf);
+ Path root = new Path(topdir);
+
+ for (int idx = 0; idx < NFILES; idx++) {
+ Path fPath = new Path(root, files[idx].getName());
+ fs.mkdirs(fPath.getParent());
+ FSDataOutputStream out = fs.create(fPath);
+ byte[] toWrite = new byte[files[idx].getSize()];
+ Random rb = new Random(files[idx].getSeed());
+ rb.nextBytes(toWrite);
+ out.write(toWrite);
+ out.close();
+ toWrite = null;
+ }
+
+ return files;
+ }
+
+ /** check if the files have been copied correctly. */
+ private static boolean checkFiles(String fsname, String topdir, MyFile[] files)
+ throws IOException {
+
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getNamed(fsname, conf);
+ Path root = new Path(topdir);
+
+ for (int idx = 0; idx < NFILES; idx++) {
+ Path fPath = new Path(root, files[idx].getName());
+ FSDataInputStream in = fs.open(fPath);
+ byte[] toRead = new byte[files[idx].getSize()];
+ byte[] toCompare = new byte[files[idx].getSize()];
+ Random rb = new Random(files[idx].getSeed());
+ rb.nextBytes(toCompare);
+ assertEquals("Cannnot read file.", toRead.length, in.read(toRead));
+ in.close();
+ for (int i = 0; i < toRead.length; i++) {
+ if (toRead[i] != toCompare[i]) {
+ return false;
+ }
+ }
+ toRead = null;
+ toCompare = null;
+ }
+
+ return true;
+ }
+
+ /** delete directory and everything underneath it.*/
+ private static void deldir(String fsname, String topdir)
+ throws IOException {
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getNamed(fsname, conf);
+ Path root = new Path(topdir);
+ fs.delete(root);
+ }
+
+ /** copy files from local file system to local file system */
+ public void testCopyFromLocalToLocal() throws IOException {
+ MyFile[] files = createFiles("local", TEST_ROOT_DIR+"/srcdat");
+ CopyFiles.main(new String[] {"file://"+TEST_ROOT_DIR+"/srcdat",
+ "file://"+TEST_ROOT_DIR+"/destdat"});
+ assertTrue("Source and destination directories do not match.",
+ checkFiles("local", TEST_ROOT_DIR+"/destdat", files));
+ deldir("local", TEST_ROOT_DIR+"/destdat");
+ deldir("local", TEST_ROOT_DIR+"/srcdat");
+ }
+
+ /** copy files from dfs file system to dfs file system */
+ public void testCopyFromDfsToDfs() throws IOException {
+ String namenode = null;
+ MiniDFSCluster cluster = null;
+ FileSystem fileSys = null;
+ try {
+ Configuration conf = new Configuration();
+ cluster = new MiniDFSCluster(65314, conf);
+ fileSys = cluster.getFileSystem();
+ namenode = fileSys.getName();
+ if (!"local".equals(namenode)) {
+ MyFile[] files = createFiles(namenode, "/srcdat");
+ CopyFiles.main(new String[] {"dfs://"+namenode+"/srcdat",
+ "dfs://"+namenode+"/destdat"});
+ assertTrue("Source and destination directories do not match.",
+ checkFiles(namenode, "/destdat", files));
+ deldir(namenode, "/destdat");
+ deldir(namenode, "/srcdat");
+ }
+ } finally {
+ if (fileSys != null) { fileSys.close(); }
+ if (cluster != null) { cluster.shutdown(); }
+ }
+ }
+
+ /** copy files from local file system to dfs file system */
+ public void testCopyFromLocalToDfs() throws IOException {
+ String namenode = null;
+ MiniDFSCluster cluster = null;
+ FileSystem fileSys = null;
+ try {
+ Configuration conf = new Configuration();
+ cluster = new MiniDFSCluster(65316, conf);
+ fileSys = cluster.getFileSystem();
+ namenode = fileSys.getName();
+ if (!"local".equals(namenode)) {
+ MyFile[] files = createFiles("local", TEST_ROOT_DIR+"/srcdat");
+ CopyFiles.main(new String[] {"file://"+TEST_ROOT_DIR+"/srcdat",
+ "dfs://"+namenode+"/destdat"});
+ assertTrue("Source and destination directories do not match.",
+ checkFiles(namenode, "/destdat", files));
+ deldir(namenode, "/destdat");
+ deldir("local", TEST_ROOT_DIR+"/srcdat");
+ }
+ } finally {
+ if (fileSys != null) { fileSys.close(); }
+ if (cluster != null) { cluster.shutdown(); }
+ }
+ }
+
+ /** copy files from dfs file system to local file system */
+ public void testCopyFromDfsToLocal() throws IOException {
+ String namenode = null;
+ MiniDFSCluster cluster = null;
+ FileSystem fileSys = null;
+ try {
+ Configuration conf = new Configuration();
+ cluster = new MiniDFSCluster(65318, conf);
+ fileSys = cluster.getFileSystem();
+ namenode = fileSys.getName();
+ if (!"local".equals(namenode)) {
+ MyFile[] files = createFiles(namenode, "/srcdat");
+ CopyFiles.main(new String[] {"dfs://"+namenode+"/srcdat",
+ "file://"+TEST_ROOT_DIR+"/destdat"});
+ assertTrue("Source and destination directories do not match.",
+ checkFiles("local", TEST_ROOT_DIR+"/destdat", files));
+ deldir("local", TEST_ROOT_DIR+"/destdat");
+ deldir(namenode, "/srcdat");
+ }
+ } finally {
+ if (fileSys != null) { fileSys.close(); }
+ if (cluster != null) { cluster.shutdown(); }
+ }
+ }
+
+}