You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2008/11/20 02:31:40 UTC

svn commit: r719152 - in /hadoop/core/trunk: CHANGES.txt src/test/org/apache/hadoop/tools/ src/test/org/apache/hadoop/tools/TestDistCh.java src/tools/org/apache/hadoop/tools/DistCh.java src/tools/org/apache/hadoop/tools/DistTool.java

Author: szetszwo
Date: Wed Nov 19 17:31:39 2008
New Revision: 719152

URL: http://svn.apache.org/viewvc?rev=719152&view=rev
Log:
HADOOP-4661. Add DistCh, a new tool for distributed ch{mod,own,grp}. (szetszwo)

Added:
    hadoop/core/trunk/src/test/org/apache/hadoop/tools/
    hadoop/core/trunk/src/test/org/apache/hadoop/tools/TestDistCh.java
    hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCh.java
    hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistTool.java
Modified:
    hadoop/core/trunk/CHANGES.txt

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=719152&r1=719151&r2=719152&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Nov 19 17:31:39 2008
@@ -39,6 +39,9 @@
     Includes client authentication via user certificates and config-based
     access control. (Kan Zhang via cdouglas)
 
+    HADOOP-4661. Add DistCh, a new tool for distributed ch{mod,own,grp}.
+    (szetszwo)
+
   IMPROVEMENTS
 
     HADOOP-4234. Fix KFS "glue" layer to allow applications to interface

Added: hadoop/core/trunk/src/test/org/apache/hadoop/tools/TestDistCh.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/tools/TestDistCh.java?rev=719152&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/tools/TestDistCh.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/tools/TestDistCh.java Wed Nov 19 17:31:39 2008
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.TaskTracker;
+import org.apache.log4j.Level;
+
+public class TestDistCh extends junit.framework.TestCase {
+  {
+    ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.StateChange")
+        ).getLogger().setLevel(Level.OFF);
+    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.OFF);
+    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.OFF);
+    ((Log4JLogger)TaskTracker.LOG).getLogger().setLevel(Level.OFF);
+  }
+
+  static final Long RANDOM_NUMBER_GENERATOR_SEED = null;
+
+  private static final Random RANDOM = new Random();
+  static {
+    final long seed = RANDOM_NUMBER_GENERATOR_SEED == null?
+        RANDOM.nextLong(): RANDOM_NUMBER_GENERATOR_SEED;
+    System.out.println("seed=" + seed);
+    RANDOM.setSeed(seed);
+  }
+
+  static final String TEST_ROOT_DIR =
+    new Path(System.getProperty("test.build.data","/tmp")
+        ).toString().replace(' ', '+');
+
+  static final int NUN_SUBS = 5;
+
+  static class FileTree {
+    private final FileSystem fs;
+    private final String root;
+    private final Path rootdir;
+    private int fcount = 0;
+
+    Path createSmallFile(Path dir) throws IOException {
+      final Path f = new Path(dir, "f" + ++fcount);
+      assertTrue(!fs.exists(f));
+      final DataOutputStream out = fs.create(f);
+      try {
+        out.writeBytes("createSmallFile: f=" + f);
+      } finally {
+        out.close();
+      }
+      assertTrue(fs.exists(f));
+      return f;
+    }
+
+    Path mkdir(Path dir) throws IOException {
+      assertTrue(fs.mkdirs(dir));
+      assertTrue(fs.getFileStatus(dir).isDir());
+      return dir;
+    }
+    
+    FileTree(FileSystem fs, String name) throws IOException {
+      this.fs = fs;
+      this.root = "/test/" + name;
+      this.rootdir = mkdir(new Path(root));
+  
+      for(int i = 0; i < 3; i++) {
+        createSmallFile(rootdir);
+      }
+      
+      for(int i = 0; i < NUN_SUBS; i++) {
+        final Path sub = mkdir(new Path(root, "sub" + i));
+        int num_files = RANDOM.nextInt(3);
+        for(int j = 0; j < num_files; j++) {
+          createSmallFile(sub);
+        }
+      }
+      
+      System.out.println("rootdir = " + rootdir);
+    }
+  }
+
+  static class ChPermissionStatus extends PermissionStatus {
+    ChPermissionStatus(FileStatus filestatus) {
+      this(filestatus, "", "", "");
+    }
+
+    ChPermissionStatus(FileStatus filestatus, String owner, String group, String permission) {
+      super("".equals(owner)? filestatus.getOwner(): owner, 
+          "".equals(group)? filestatus.getGroup(): group,
+          "".equals(permission)? filestatus.getPermission(): new FsPermission(Short.parseShort(permission, 8)));
+    }
+  }
+  
+  public void testDistCh() throws Exception {
+    final Configuration conf = new Configuration();
+    final MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
+    final FileSystem fs = cluster.getFileSystem();
+    final MiniMRCluster mr = new MiniMRCluster(2, fs.getUri().toString(), 1);
+    final FsShell shell = new FsShell(conf);
+    
+    try {
+      final FileTree tree = new FileTree(fs, "testDistCh");
+      final FileStatus rootstatus = fs.getFileStatus(tree.rootdir);
+
+      runLsr(shell, tree.root, 0);
+
+      //generate random arguments
+      final String[] args = new String[RANDOM.nextInt(NUN_SUBS-1) + 1];
+      final PermissionStatus[] newstatus = new PermissionStatus[NUN_SUBS];
+      final List<Integer> indices = new LinkedList<Integer>();
+      for(int i = 0; i < NUN_SUBS; i++) {
+        indices.add(i);
+      }
+      for(int i = 0; i < args.length; i++) {
+        final int index = indices.remove(RANDOM.nextInt(indices.size()));
+        final String sub = "sub" + index;
+        final boolean changeOwner = RANDOM.nextBoolean();
+        final boolean changeGroup = RANDOM.nextBoolean();
+        final boolean changeMode = !changeOwner && !changeGroup? true: RANDOM.nextBoolean();
+        
+        final String owner = changeOwner? sub: "";
+        final String group = changeGroup? sub: "";
+        final String permission = changeMode? RANDOM.nextInt(8) + "" + RANDOM.nextInt(8) + "" + RANDOM.nextInt(8): "";
+
+        args[i] = tree.root + "/" + sub + ":" + owner + ":" + group + ":" + permission;
+        newstatus[index] = new ChPermissionStatus(rootstatus, owner, group, permission);
+      }
+      for(int i = 0; i < NUN_SUBS; i++) {
+        if (newstatus[i] == null) {
+          newstatus[i] = new ChPermissionStatus(rootstatus);
+        }
+      }
+      System.out.println("args=" + Arrays.asList(args).toString().replace(",", ",\n  "));
+      System.out.println("newstatus=" + Arrays.asList(newstatus).toString().replace(",", ",\n  "));
+
+      //run DistCh
+      new DistCh(mr.createJobConf()).run(args);
+      runLsr(shell, tree.root, 0);
+
+      //check results
+      for(int i = 0; i < NUN_SUBS; i++) {
+        Path sub = new Path(tree.root + "/sub" + i);
+        checkFileStatus(newstatus[i], fs.getFileStatus(sub));
+        for(FileStatus status : fs.listStatus(sub)) {
+          checkFileStatus(newstatus[i], status);
+        }
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  static final FsPermission UMASK = FsPermission.createImmutable((short)0111);
+
+  static void checkFileStatus(PermissionStatus expected, FileStatus actual) {
+    assertEquals(expected.getUserName(), actual.getOwner());
+    assertEquals(expected.getGroupName(), actual.getGroup());
+    FsPermission perm = expected.getPermission(); 
+    if (!actual.isDir()) {
+      perm = perm.applyUMask(UMASK);
+    }
+    assertEquals(perm, actual.getPermission());
+  }
+
+  private static String runLsr(final FsShell shell, String root, int returnvalue
+      ) throws Exception {
+    System.out.println("root=" + root + ", returnvalue=" + returnvalue);
+    final ByteArrayOutputStream bytes = new ByteArrayOutputStream(); 
+    final PrintStream out = new PrintStream(bytes);
+    final PrintStream oldOut = System.out;
+    final PrintStream oldErr = System.err;
+    System.setOut(out);
+    System.setErr(out);
+    final String results;
+    try {
+      assertEquals(returnvalue, shell.run(new String[]{"-lsr", root}));
+      results = bytes.toString();
+    } finally {
+      IOUtils.closeStream(out);
+      System.setOut(oldOut);
+      System.setErr(oldErr);
+    }
+    System.out.println("results:\n" + results);
+    return results;
+  }
+}
\ No newline at end of file

Added: hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCh.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCh.java?rev=719152&view=auto
==============================================================================
--- hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCh.java (added)
+++ hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCh.java Wed Nov 19 17:31:39 2008
@@ -0,0 +1,511 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.InvalidInputException;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * A Map-reduce program to recursively change files properties
+ * such as owner, group and permission.
+ */
+public class DistCh extends DistTool {
+  static final String NAME = "distch";
+  static final String JOB_DIR_LABEL = NAME + ".job.dir";
+  static final String OP_LIST_LABEL = NAME + ".op.list";
+  static final String OP_COUNT_LABEL = NAME + ".op.count";
+
+  static final String USAGE = "java " + DistCh.class.getName() 
+      + " [OPTIONS] <path:owner:group:permission>+ "
+
+      + "\n\nThe values of owner, group and permission can be empty."
+      + "\nPermission is a octal number."
+
+      + "\n\nOPTIONS:"
+      + "\n-f <urilist_uri>       Use list at <urilist_uri> as src list"
+      + "\n-i                     Ignore failures"
+      + "\n-log <logdir>          Write logs to <logdir>"
+      ;
+
+  private static final long OP_PER_MAP =  1000;
+  private static final int MAX_MAPS_PER_NODE = 20;
+  private static final int SYNC_FILE_MAX = 10;
+
+  static enum Counter { SUCCEED, FAIL }
+
+  static enum Option {
+    IGNORE_FAILURES("-i", NAME + ".ignore.failures");
+
+    final String cmd, propertyname;
+
+    private Option(String cmd, String propertyname) {
+      this.cmd = cmd;
+      this.propertyname = propertyname;
+    }
+  }
+
+  DistCh(Configuration conf) {
+    super(createJobConf(conf));
+  }
+
+  private static JobConf createJobConf(Configuration conf) {
+    JobConf jobconf = new JobConf(conf, DistCh.class);
+    jobconf.setJobName(NAME);
+    jobconf.setMapSpeculativeExecution(false);
+
+    jobconf.setInputFormat(ChangeInputFormat.class);
+    jobconf.setOutputKeyClass(Text.class);
+    jobconf.setOutputValueClass(Text.class);
+
+    jobconf.setMapperClass(ChangeFilesMapper.class);
+    jobconf.setNumReduceTasks(0);
+    return jobconf;
+  }
+
+  /** File operations. */
+  static class FileOperation implements Writable {
+    private Path src;
+    private String owner;
+    private String group;
+    private FsPermission permission;
+
+    FileOperation() {}
+
+    FileOperation(Path src, FileOperation that) {
+      this.src = src;
+      this.owner = that.owner;
+      this.group = that.group;
+      this.permission = that.permission;
+      checkState();
+    }
+
+    /**
+     * path:owner:group:permission
+     * e.g.
+     * /user/foo:foo:bar:700 
+     */
+    FileOperation(String line) {
+      try {
+        String[] t = line.split(":", 4);
+        for(int i = 0; i < t.length; i++) {
+          if ("".equals(t[i])) {
+            t[i] = null;
+          }
+        }
+
+        src = new Path(t[0]);
+        owner = t[1];
+        group = t[2];
+        permission = t[3] == null? null:
+          new FsPermission(Short.parseShort(t[3], 8));
+
+        checkState();
+      }
+      catch(Exception e) {
+        throw (IllegalArgumentException)new IllegalArgumentException(
+            "line=" + line).initCause(e);
+      }
+    }
+
+    private void checkState() throws IllegalStateException {
+      if (owner == null && group == null && permission == null) {
+        throw new IllegalStateException(
+            "owner == null && group == null && permission == null");
+      }
+    }
+
+    static final FsPermission FILE_UMASK
+        = FsPermission.createImmutable((short)0111);
+
+    private boolean isDifferent(FileStatus original) {
+      if (owner != null && !owner.equals(original.getOwner())) {
+        return true;
+      }
+      if (group != null && !group.equals(original.getGroup())) {
+        return true;
+      }
+      if (permission != null) {
+        FsPermission orig = original.getPermission();
+        return original.isDir()? !permission.equals(orig):
+          !permission.applyUMask(FILE_UMASK).equals(orig);
+      }
+      return false;
+    }
+
+    void run(Configuration conf) throws IOException {
+      FileSystem fs = src.getFileSystem(conf);
+      if (permission != null) {
+        fs.setPermission(src, permission);
+      }
+      if (owner != null || group != null) {
+        fs.setOwner(src, owner, group);
+      }
+    }
+
+    /** {@inheritDoc} */
+    public void readFields(DataInput in) throws IOException {
+      this.src = new Path(Text.readString(in));
+      owner = DistTool.readString(in);
+      group = DistTool.readString(in);
+      permission = in.readBoolean()? FsPermission.read(in): null;
+    }
+
+    /** {@inheritDoc} */
+    public void write(DataOutput out) throws IOException {
+      Text.writeString(out, src.toString());
+      DistTool.writeString(out, owner);
+      DistTool.writeString(out, group);
+
+      boolean b = permission != null;
+      out.writeBoolean(b);
+      if (b) {permission.write(out);}
+    }
+
+    /** {@inheritDoc} */
+    public String toString() {
+      return src + ":" + owner + ":" + group + ":" + permission; 
+    }
+  }
+
+  /** Responsible for generating splits of the src file list. */
+  static class ChangeInputFormat implements InputFormat<Text, FileOperation> {
+    /** Do nothing. */
+    public void validateInput(JobConf job) {}
+
+    /**
+     * Produce splits such that each is no greater than the quotient of the
+     * total size and the number of splits requested.
+     * @param job The handle to the JobConf object
+     * @param numSplits Number of splits requested
+     */
+    public InputSplit[] getSplits(JobConf job, int numSplits
+        ) throws IOException {
+      final int srcCount = job.getInt(OP_COUNT_LABEL, -1);
+      final int targetcount = srcCount / numSplits;
+      String srclist = job.get(OP_LIST_LABEL, "");
+      if (srcCount < 0 || "".equals(srclist)) {
+        throw new RuntimeException("Invalid metadata: #files(" + srcCount +
+                                   ") listuri(" + srclist + ")");
+      }
+      Path srcs = new Path(srclist);
+      FileSystem fs = srcs.getFileSystem(job);
+
+      List<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
+
+      Text key = new Text();
+      FileOperation value = new FileOperation();
+      SequenceFile.Reader in = null;
+      long prev = 0L;
+      int count = 0; //count src
+      try {
+        for(in = new SequenceFile.Reader(fs, srcs, job); in.next(key, value); ) {
+          long curr = in.getPosition();
+          long delta = curr - prev;
+          if (++count > targetcount) {
+            count = 0;
+            splits.add(new FileSplit(srcs, prev, delta, (String[])null));
+            prev = curr;
+          }
+        }
+      }
+      finally {
+        in.close();
+      }
+      long remaining = fs.getFileStatus(srcs).getLen() - prev;
+      if (remaining != 0) {
+        splits.add(new FileSplit(srcs, prev, remaining, (String[])null));
+      }
+      LOG.info("numSplits="  + numSplits + ", splits.size()=" + splits.size());
+      return splits.toArray(new FileSplit[splits.size()]);
+    }
+
+    /** {@inheritDoc} */
+    public RecordReader<Text, FileOperation> getRecordReader(InputSplit split,
+        JobConf job, Reporter reporter) throws IOException {
+      return new SequenceFileRecordReader<Text, FileOperation>(job,
+          (FileSplit)split);
+    }
+  }
+
+  /** The mapper for changing files. */
+  static class ChangeFilesMapper 
+      implements Mapper<Text, FileOperation, WritableComparable<?>, Text> {
+    private JobConf jobconf;
+    private boolean ignoreFailures;
+
+    private int failcount = 0;
+    private int succeedcount = 0;
+
+    private String getCountString() {
+      return "Succeeded: " + succeedcount + " Failed: " + failcount;
+    }
+
+    /** {@inheritDoc} */
+    public void configure(JobConf job) {
+      this.jobconf = job;
+      ignoreFailures=job.getBoolean(Option.IGNORE_FAILURES.propertyname,false);
+    }
+
+    /** Run a FileOperation */
+    public void map(Text key, FileOperation value,
+        OutputCollector<WritableComparable<?>, Text> out, Reporter reporter
+        ) throws IOException {
+      try {
+        value.run(jobconf);
+        ++succeedcount;
+        reporter.incrCounter(Counter.SUCCEED, 1);
+      } catch (IOException e) {
+        ++failcount;
+        reporter.incrCounter(Counter.FAIL, 1);
+
+        String s = "FAIL: " + value + ", " + StringUtils.stringifyException(e);
+        out.collect(null, new Text(s));
+        LOG.info(s);
+      } finally {
+        reporter.setStatus(getCountString());
+      }
+    }
+
+    /** {@inheritDoc} */
+    public void close() throws IOException {
+      if (failcount == 0 || ignoreFailures) {
+        return;
+      }
+      throw new IOException(getCountString());
+    }
+  }
+
+  private static void check(Configuration conf, List<FileOperation> ops
+      ) throws InvalidInputException {
+    List<Path> srcs = new ArrayList<Path>();
+    for(FileOperation op : ops) {
+      srcs.add(op.src);
+    }
+    DistTool.checkSource(conf, srcs);
+  }
+
+  private static List<FileOperation> fetchList(Configuration conf, Path inputfile
+      ) throws IOException {
+    List<FileOperation> result = new ArrayList<FileOperation>();
+    for(String line : readFile(conf, inputfile)) {
+      result.add(new FileOperation(line));
+    }
+    return result;
+  }
+
+  /** This is the main driver for recursively changing files properties. */
+  public int run(String[] args) throws Exception {
+    List<FileOperation> ops = new ArrayList<FileOperation>();
+    Path logpath = null;
+    boolean isIgnoreFailures = false;
+
+    try {
+      for (int idx = 0; idx < args.length; idx++) {
+        if ("-f".equals(args[idx])) {
+          if (++idx ==  args.length) {
+            System.out.println("urilist_uri not specified");
+            System.out.println(USAGE);
+            return -1;
+          }
+          ops.addAll(fetchList(jobconf, new Path(args[idx])));
+        } else if (Option.IGNORE_FAILURES.cmd.equals(args[idx])) {
+          isIgnoreFailures = true;
+        } else if ("-log".equals(args[idx])) {
+          if (++idx ==  args.length) {
+            System.out.println("logdir not specified");
+            System.out.println(USAGE);
+            return -1;
+          }
+          logpath = new Path(args[idx]);
+        } else if ('-' == args[idx].codePointAt(0)) {
+          System.out.println("Invalid switch " + args[idx]);
+          System.out.println(USAGE);
+          ToolRunner.printGenericCommandUsage(System.out);
+          return -1;
+        } else {
+          ops.add(new FileOperation(args[idx]));
+        }
+      }
+      // mandatory command-line parameters
+      if (ops.isEmpty()) {
+        throw new IllegalStateException("Operation is empty");
+      }
+      LOG.info("ops=" + ops);
+      LOG.info("isIgnoreFailures=" + isIgnoreFailures);
+      jobconf.setBoolean(Option.IGNORE_FAILURES.propertyname, isIgnoreFailures);
+      check(jobconf, ops);
+
+      try {
+        if (setup(ops, logpath)) {
+          JobClient.runJob(jobconf);
+        }
+      } finally {
+        try {
+          if (logpath == null) {
+            //delete log directory
+            final Path logdir = FileOutputFormat.getOutputPath(jobconf);
+            if (logdir != null) {
+              logdir.getFileSystem(jobconf).delete(logdir, true);
+            }
+          }
+        }
+        finally {
+          //delete job directory
+          final String jobdir = jobconf.get(JOB_DIR_LABEL);
+          if (jobdir != null) {
+            final Path jobpath = new Path(jobdir);
+            jobpath.getFileSystem(jobconf).delete(jobpath, true);
+          }
+        }
+      }
+    } catch(DuplicationException e) {
+      LOG.error("Input error:", e);
+      return DuplicationException.ERROR_CODE;
+    } catch(Exception e) {
+      LOG.error(NAME + " failed: ", e);
+      System.out.println(USAGE);
+      ToolRunner.printGenericCommandUsage(System.out);
+      return -1;
+    }
+    return 0;
+  }
+
+  /** Calculate how many maps to run. */
+  private static int getMapCount(int srcCount, int numNodes) {
+    int numMaps = (int)(srcCount / OP_PER_MAP);
+    numMaps = Math.min(numMaps, numNodes * MAX_MAPS_PER_NODE);
+    return Math.max(numMaps, 1);
+  }
+
+  private boolean setup(List<FileOperation> ops, Path log) throws IOException {
+    final String randomId = getRandomId();
+    JobClient jClient = new JobClient(jobconf);
+    Path jobdir = new Path(jClient.getSystemDir(), NAME + "_" + randomId);
+    LOG.info(JOB_DIR_LABEL + "=" + jobdir);
+
+    if (log == null) {
+      log = new Path(jobdir, "_logs");
+    }
+    FileOutputFormat.setOutputPath(jobconf, log);
+    LOG.info("log=" + log);
+
+    //create operation list
+    FileSystem fs = jobdir.getFileSystem(jobconf);
+    Path opList = new Path(jobdir, "_" + OP_LIST_LABEL);
+    jobconf.set(OP_LIST_LABEL, opList.toString());
+    int opCount = 0, synCount = 0;
+    SequenceFile.Writer opWriter = null;
+    try {
+      opWriter = SequenceFile.createWriter(fs, jobconf, opList, Text.class,
+          FileOperation.class, SequenceFile.CompressionType.NONE);
+      for(FileOperation op : ops) {
+        FileStatus srcstat = fs.getFileStatus(op.src); 
+        if (srcstat.isDir() && op.isDifferent(srcstat)) {
+          ++opCount;
+          opWriter.append(new Text(op.src.toString()), op);
+        }
+
+        Stack<Path> pathstack = new Stack<Path>();
+        for(pathstack.push(op.src); !pathstack.empty(); ) {
+          for(FileStatus stat : fs.listStatus(pathstack.pop())) {
+            if (stat.isDir()) {
+              pathstack.push(stat.getPath());
+            }
+
+            if (op.isDifferent(stat)) {              
+              ++opCount;
+              if (++synCount > SYNC_FILE_MAX) {
+                opWriter.sync();
+                synCount = 0;
+              }
+              Path f = stat.getPath();
+              opWriter.append(new Text(f.toString()), new FileOperation(f, op));
+            }
+          }
+        }
+      }
+    } finally {
+      opWriter.close();
+    }
+
+    checkDuplication(fs, opList, new Path(jobdir, "_sorted"), jobconf);
+    jobconf.setInt(OP_COUNT_LABEL, opCount);
+    LOG.info(OP_COUNT_LABEL + "=" + opCount);
+    jobconf.setNumMapTasks(getMapCount(opCount,
+        new JobClient(jobconf).getClusterStatus().getTaskTrackers()));
+    return opCount != 0;    
+  }
+
+  private static void checkDuplication(FileSystem fs, Path file, Path sorted,
+    Configuration conf) throws IOException {
+    SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs,
+        new Text.Comparator(), Text.class, FileOperation.class, conf);
+    sorter.sort(file, sorted);
+    SequenceFile.Reader in = null;
+    try {
+      in = new SequenceFile.Reader(fs, sorted, conf);
+      FileOperation curop = new FileOperation();
+      Text prevsrc = null, cursrc = new Text(); 
+      for(; in.next(cursrc, curop); ) {
+        if (prevsrc != null && cursrc.equals(prevsrc)) {
+          throw new DuplicationException(
+            "Invalid input, there are duplicated files in the sources: "
+            + prevsrc + ", " + cursrc);
+        }
+        prevsrc = cursrc;
+        cursrc = new Text();
+        curop = new FileOperation();
+      }
+    }
+    finally {
+      in.close();
+    }
+  } 
+
+  public static void main(String[] args) throws Exception {
+    System.exit(ToolRunner.run(new DistCh(new Configuration()), args));
+  }
+}
\ No newline at end of file

Added: hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistTool.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistTool.java?rev=719152&view=auto
==============================================================================
--- hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistTool.java (added)
+++ hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistTool.java Wed Nov 19 17:31:39 2008
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools;
+
+import java.io.BufferedReader;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InvalidInputException;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * An abstract class for distributed tool for file related operations.
+ */
+abstract class DistTool implements org.apache.hadoop.util.Tool {
+  protected static final Log LOG = LogFactory.getLog(DistTool.class);
+
+  protected JobConf jobconf;
+
+  /** {@inheritDoc} */
+  public void setConf(Configuration conf) {
+    if (jobconf != conf) {
+      jobconf = conf instanceof JobConf? (JobConf)conf: new JobConf(conf);
+    }
+  }
+
+  /** {@inheritDoc} */
+  public JobConf getConf() {return jobconf;}
+
+  protected DistTool(Configuration conf) {setConf(conf);}
+
+  private static final Random RANDOM = new Random();
+  protected static String getRandomId() {
+    return Integer.toString(RANDOM.nextInt(Integer.MAX_VALUE), 36);
+  }
+
+  /** Sanity check for source */
+  protected static void checkSource(Configuration conf, List<Path> srcs
+      ) throws InvalidInputException {
+    List<IOException> ioes = new ArrayList<IOException>();
+    for(Path p : srcs) {
+      try {
+        if (!p.getFileSystem(conf).exists(p)) {
+          ioes.add(new FileNotFoundException("Source "+p+" does not exist."));
+        }
+      }
+      catch(IOException e) {ioes.add(e);}
+    }
+    if (!ioes.isEmpty()) {
+      throw new InvalidInputException(ioes);
+    }
+  }
+
+  protected static String readString(DataInput in) throws IOException {
+    if (in.readBoolean()) {
+      return Text.readString(in);
+    }
+    return null;
+  }
+
+  protected static void writeString(DataOutput out, String s
+      ) throws IOException {
+    boolean b = s != null;
+    out.writeBoolean(b);
+    if (b) {Text.writeString(out, s);}
+  }
+
+  protected static List<String> readFile(Configuration conf, Path inputfile
+      ) throws IOException {
+    List<String> result = new ArrayList<String>();
+    FileSystem fs = inputfile.getFileSystem(conf);
+    BufferedReader input = null;
+    try {
+      input = new BufferedReader(new InputStreamReader(fs.open(inputfile)));
+      for(String line; (line = input.readLine()) != null;) {
+        result.add(line);
+      }
+    } finally {
+      input.close();
+    }
+    return result;
+  }
+
+  /** An exception class for duplicated source files. */
+  public static class DuplicationException extends IOException {
+    private static final long serialVersionUID = 1L;
+    /** Error code for this exception */
+    public static final int ERROR_CODE = -2;
+    DuplicationException(String message) {super(message);}
+  }
+}
\ No newline at end of file