You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/05/19 21:52:07 UTC
svn commit: r407894 - in /lucene/hadoop/trunk: bin/hadoop
src/java/org/apache/hadoop/util/CopyFiles.java
Author: cutting
Date: Fri May 19 12:52:06 2006
New Revision: 407894
URL: http://svn.apache.org/viewvc?rev=407894&view=rev
Log:
HADOOP-220. Improvements to mapreduce-based file copier. Contributed by Milind.
Modified:
lucene/hadoop/trunk/bin/hadoop
lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
Modified: lucene/hadoop/trunk/bin/hadoop
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/bin/hadoop?rev=407894&r1=407893&r2=407894&view=diff
==============================================================================
--- lucene/hadoop/trunk/bin/hadoop (original)
+++ lucene/hadoop/trunk/bin/hadoop Fri May 19 12:52:06 2006
@@ -39,7 +39,7 @@
echo " tasktracker run a MapReduce task Tracker node"
echo " job manipulate MapReduce jobs"
echo " jar <jar> run a jar file"
- echo " cp <srcurl> <desturl> copy file or directories recursively"
+ echo " distcp <srcurl> <desturl> copy file or directories recursively"
echo " or"
echo " CLASSNAME run the class named CLASSNAME"
echo "Most commands print help when invoked w/o parameters."
@@ -137,7 +137,7 @@
CLASS=org.apache.hadoop.mapred.JobClient
elif [ "$COMMAND" = "jar" ] ; then
CLASS=org.apache.hadoop.util.RunJar
-elif [ "$COMMAND" = "cp" ] ; then
+elif [ "$COMMAND" = "distcp" ] ; then
CLASS=org.apache.hadoop.util.CopyFiles
else
CLASS=$COMMAND
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java?rev=407894&r1=407893&r2=407894&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java Fri May 19 12:52:06 2006
@@ -53,8 +53,13 @@
*/
public class CopyFiles extends MapReduceBase implements Reducer {
- private static final String usage = "cp <srcurl> <desturl>";
+ private static final String usage = "distcp <srcurl> <desturl> "+
+ "[-dfs <namenode:port | local> ] [-jt <jobtracker:port | local>] " +
+ "[-config <config-file.xml>]";
+ private static final long MIN_BYTES_PER_MAP = 1L << 28;
+ private static final int MAX_NUM_MAPS = 10000;
+ private static final int MAX_MAPS_PER_NODE = 10;
/**
* Mappper class for Copying files.
*/
@@ -67,8 +72,11 @@
private Path srcPath = null;
private Path destPath = null;
private byte[] buffer = null;
+ private static final long reportInterval = 1L << 25;
+ private long bytesSinceLastReport = 0L;
+ private long totalBytesCopied = 0L;
- private void copy(String src) throws IOException {
+ private void copy(String src, Reporter reporter) throws IOException {
// open source file
Path srcFile = new Path(srcPath, src);
FSDataInputStream in = srcFileSys.open(srcFile);
@@ -82,14 +90,22 @@
// copy file
while (true) {
int nread = in.read(buffer);
- if (nread < 0) {
- break;
- }
+ if (nread < 0) { break; }
out.write(buffer, 0, nread);
+ bytesSinceLastReport += nread;
+ if (bytesSinceLastReport > reportInterval) {
+ totalBytesCopied += bytesSinceLastReport;
+ bytesSinceLastReport = 0L;
+ reporter.setStatus("Total bytes copied: "+totalBytesCopied);
+ }
}
in.close();
out.close();
+ // report at least once for each file
+ totalBytesCopied += bytesSinceLastReport;
+ bytesSinceLastReport = 0L;
+ reporter.setStatus("Total bytes copied: "+totalBytesCopied);
}
/** Mapper configuration.
@@ -122,7 +138,7 @@
OutputCollector out,
Reporter reporter) throws IOException {
String src = ((UTF8) key).toString();
- copy(src);
+ copy(src, reporter);
}
public void close() {
@@ -185,15 +201,47 @@
* the reduce is empty.
*/
public static void main(String[] args) throws IOException {
- if (args.length != 2) {
- System.out.println(usage);
- return;
- }
-
+
Configuration conf = new Configuration();
+ String srcPath = null;
+ String destPath = null;
+
+ for (int idx = 0; idx < args.length; idx++) {
+ if ("-dfs".equals(args[idx])) {
+ if (idx == (args.length-1)) {
+ System.out.println(usage);
+ return;
+ }
+ conf.set("fs.default.name", args[++idx]);
+ } else if ("-jt".equals(args[idx])) {
+ if (idx == (args.length-1)) {
+ System.out.println(usage);
+ return;
+ }
+ conf.set("mapred.job.tracker", args[++idx]);
+ } else if ("-config".equals(args[idx])) {
+ if (idx == (args.length-1)) {
+ System.out.println(usage);
+ return;
+ }
+ conf.addFinalResource(new Path(args[++idx]));
+ } else {
+ if (srcPath == null) {
+ srcPath = args[idx];
+ } else if (destPath == null) {
+ destPath = args[idx];
+ } else {
+ System.out.println(usage);
+ return;
+ }
+ }
+ }
- String srcPath = args[0];
- String destPath = args[1];
+ // mandatory command-line parameters
+ if (srcPath == null || destPath == null) {
+ System.out.println(usage);
+ return;
+ }
URI srcurl = null;
URI desturl = null;
@@ -204,7 +252,7 @@
throw new RuntimeException("URL syntax error.", ex);
}
- JobConf jobConf = new JobConf(conf);
+ JobConf jobConf = new JobConf(conf, CopyFiles.class);
jobConf.setJobName("copy-files");
String srcFileSysName = getFileSysName(srcurl);
@@ -254,7 +302,6 @@
jobConf.setMapperClass(CopyFilesMapper.class);
jobConf.setReducerClass(CopyFiles.class);
- int filesPerMap = jobConf.getInt("copy.files_per_map", 10);
jobConf.setNumReduceTasks(1);
Path tmpDir = new Path("copy-files");
@@ -272,10 +319,12 @@
ArrayList pathList = new ArrayList();
ArrayList finalPathList = new ArrayList();
pathList.add(new Path(srcPath));
+ long totalBytes = 0;
int part = 0;
while(!pathList.isEmpty()) {
Path top = (Path) pathList.remove(0);
if (srcfs.isFile(top)) {
+ totalBytes += srcfs.getLength(top);
top = makeRelative(rootPath, top);
finalPathList.add(top.toString());
} else {
@@ -285,27 +334,34 @@
}
}
}
- int numMaps = finalPathList.size() / filesPerMap;
+ // ideal number of maps is one per file (if the map-launching overhead
+ // were 0. It is limited by jobtrackers handling capacity, which lets say
+ // is MAX_NUM_MAPS. It is also limited by MAX_MAPS_PER_NODE. Also for small
+ // files it is better to determine number of maps by amount of data per map.
+
+ int nFiles = finalPathList.size();
+ int numMaps = nFiles;
+ if (numMaps > MAX_NUM_MAPS) { numMaps = MAX_NUM_MAPS; }
+ if (numMaps > (int) (totalBytes / MIN_BYTES_PER_MAP)) {
+ numMaps = (int) (totalBytes / MIN_BYTES_PER_MAP);
+ }
+ JobClient client = new JobClient(jobConf);
+ ClusterStatus cluster = client.getClusterStatus();
+ int tmpMaps = cluster.getTaskTrackers() * MAX_MAPS_PER_NODE;
+ if (numMaps > tmpMaps) { numMaps = tmpMaps; }
if (numMaps == 0) { numMaps = 1; }
jobConf.setNumMapTasks(numMaps);
- SequenceFile.Writer[] writers = new SequenceFile.Writer[numMaps];
for(int idx=0; idx < numMaps; ++idx) {
Path file = new Path(inDir, "part"+idx);
- writers[idx] = new SequenceFile.Writer(fileSys, file, UTF8.class, UTF8.class);
- }
- while (!finalPathList.isEmpty()) {
- String top = (String) finalPathList.remove(0);
- UTF8 key = new UTF8(top);
- UTF8 value = new UTF8("");
- writers[part].append(key, value);
- part = (part+1)%numMaps;
- }
-
- for(part = 0; part < numMaps; part++) {
- writers[part].close();
- writers[part] = null;
+ SequenceFile.Writer writer = new SequenceFile.Writer(fileSys, file, UTF8.class, UTF8.class);
+ for (int ipath = idx; ipath < nFiles; ipath += numMaps) {
+ String path = (String) finalPathList.get(ipath);
+ writer.append(new UTF8(path), new UTF8(""));
+ }
+ writer.close();
}
+ finalPathList = null;
try {
JobClient.runJob(jobConf);