You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2008/11/20 02:31:40 UTC
svn commit: r719152 - in /hadoop/core/trunk: CHANGES.txt
src/test/org/apache/hadoop/tools/
src/test/org/apache/hadoop/tools/TestDistCh.java
src/tools/org/apache/hadoop/tools/DistCh.java
src/tools/org/apache/hadoop/tools/DistTool.java
Author: szetszwo
Date: Wed Nov 19 17:31:39 2008
New Revision: 719152
URL: http://svn.apache.org/viewvc?rev=719152&view=rev
Log:
HADOOP-4661. Add DistCh, a new tool for distributed ch{mod,own,grp}. (szetszwo)
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/tools/
hadoop/core/trunk/src/test/org/apache/hadoop/tools/TestDistCh.java
hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCh.java
hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistTool.java
Modified:
hadoop/core/trunk/CHANGES.txt
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=719152&r1=719151&r2=719152&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Nov 19 17:31:39 2008
@@ -39,6 +39,9 @@
Includes client authentication via user certificates and config-based
access control. (Kan Zhang via cdouglas)
+ HADOOP-4661. Add DistCh, a new tool for distributed ch{mod,own,grp}.
+ (szetszwo)
+
IMPROVEMENTS
HADOOP-4234. Fix KFS "glue" layer to allow applications to interface
Added: hadoop/core/trunk/src/test/org/apache/hadoop/tools/TestDistCh.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/tools/TestDistCh.java?rev=719152&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/tools/TestDistCh.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/tools/TestDistCh.java Wed Nov 19 17:31:39 2008
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.TaskTracker;
+import org.apache.log4j.Level;
+
+public class TestDistCh extends junit.framework.TestCase {
+ {
+ ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.StateChange")
+ ).getLogger().setLevel(Level.OFF);
+ ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.OFF);
+ ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.OFF);
+ ((Log4JLogger)TaskTracker.LOG).getLogger().setLevel(Level.OFF);
+ }
+
+ static final Long RANDOM_NUMBER_GENERATOR_SEED = null;
+
+ private static final Random RANDOM = new Random();
+ static {
+ final long seed = RANDOM_NUMBER_GENERATOR_SEED == null?
+ RANDOM.nextLong(): RANDOM_NUMBER_GENERATOR_SEED;
+ System.out.println("seed=" + seed);
+ RANDOM.setSeed(seed);
+ }
+
+ static final String TEST_ROOT_DIR =
+ new Path(System.getProperty("test.build.data","/tmp")
+ ).toString().replace(' ', '+');
+
+ static final int NUN_SUBS = 5;
+
+ static class FileTree {
+ private final FileSystem fs;
+ private final String root;
+ private final Path rootdir;
+ private int fcount = 0;
+
+ Path createSmallFile(Path dir) throws IOException {
+ final Path f = new Path(dir, "f" + ++fcount);
+ assertTrue(!fs.exists(f));
+ final DataOutputStream out = fs.create(f);
+ try {
+ out.writeBytes("createSmallFile: f=" + f);
+ } finally {
+ out.close();
+ }
+ assertTrue(fs.exists(f));
+ return f;
+ }
+
+ Path mkdir(Path dir) throws IOException {
+ assertTrue(fs.mkdirs(dir));
+ assertTrue(fs.getFileStatus(dir).isDir());
+ return dir;
+ }
+
+ FileTree(FileSystem fs, String name) throws IOException {
+ this.fs = fs;
+ this.root = "/test/" + name;
+ this.rootdir = mkdir(new Path(root));
+
+ for(int i = 0; i < 3; i++) {
+ createSmallFile(rootdir);
+ }
+
+ for(int i = 0; i < NUN_SUBS; i++) {
+ final Path sub = mkdir(new Path(root, "sub" + i));
+ int num_files = RANDOM.nextInt(3);
+ for(int j = 0; j < num_files; j++) {
+ createSmallFile(sub);
+ }
+ }
+
+ System.out.println("rootdir = " + rootdir);
+ }
+ }
+
+ static class ChPermissionStatus extends PermissionStatus {
+ ChPermissionStatus(FileStatus filestatus) {
+ this(filestatus, "", "", "");
+ }
+
+ ChPermissionStatus(FileStatus filestatus, String owner, String group, String permission) {
+ super("".equals(owner)? filestatus.getOwner(): owner,
+ "".equals(group)? filestatus.getGroup(): group,
+ "".equals(permission)? filestatus.getPermission(): new FsPermission(Short.parseShort(permission, 8)));
+ }
+ }
+
+ public void testDistCh() throws Exception {
+ final Configuration conf = new Configuration();
+ final MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
+ final FileSystem fs = cluster.getFileSystem();
+ final MiniMRCluster mr = new MiniMRCluster(2, fs.getUri().toString(), 1);
+ final FsShell shell = new FsShell(conf);
+
+ try {
+ final FileTree tree = new FileTree(fs, "testDistCh");
+ final FileStatus rootstatus = fs.getFileStatus(tree.rootdir);
+
+ runLsr(shell, tree.root, 0);
+
+ //generate random arguments
+ final String[] args = new String[RANDOM.nextInt(NUN_SUBS-1) + 1];
+ final PermissionStatus[] newstatus = new PermissionStatus[NUN_SUBS];
+ final List<Integer> indices = new LinkedList<Integer>();
+ for(int i = 0; i < NUN_SUBS; i++) {
+ indices.add(i);
+ }
+ for(int i = 0; i < args.length; i++) {
+ final int index = indices.remove(RANDOM.nextInt(indices.size()));
+ final String sub = "sub" + index;
+ final boolean changeOwner = RANDOM.nextBoolean();
+ final boolean changeGroup = RANDOM.nextBoolean();
+ final boolean changeMode = !changeOwner && !changeGroup? true: RANDOM.nextBoolean();
+
+ final String owner = changeOwner? sub: "";
+ final String group = changeGroup? sub: "";
+ final String permission = changeMode? RANDOM.nextInt(8) + "" + RANDOM.nextInt(8) + "" + RANDOM.nextInt(8): "";
+
+ args[i] = tree.root + "/" + sub + ":" + owner + ":" + group + ":" + permission;
+ newstatus[index] = new ChPermissionStatus(rootstatus, owner, group, permission);
+ }
+ for(int i = 0; i < NUN_SUBS; i++) {
+ if (newstatus[i] == null) {
+ newstatus[i] = new ChPermissionStatus(rootstatus);
+ }
+ }
+ System.out.println("args=" + Arrays.asList(args).toString().replace(",", ",\n "));
+ System.out.println("newstatus=" + Arrays.asList(newstatus).toString().replace(",", ",\n "));
+
+ //run DistCh
+ new DistCh(mr.createJobConf()).run(args);
+ runLsr(shell, tree.root, 0);
+
+ //check results
+ for(int i = 0; i < NUN_SUBS; i++) {
+ Path sub = new Path(tree.root + "/sub" + i);
+ checkFileStatus(newstatus[i], fs.getFileStatus(sub));
+ for(FileStatus status : fs.listStatus(sub)) {
+ checkFileStatus(newstatus[i], status);
+ }
+ }
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ static final FsPermission UMASK = FsPermission.createImmutable((short)0111);
+
+ static void checkFileStatus(PermissionStatus expected, FileStatus actual) {
+ assertEquals(expected.getUserName(), actual.getOwner());
+ assertEquals(expected.getGroupName(), actual.getGroup());
+ FsPermission perm = expected.getPermission();
+ if (!actual.isDir()) {
+ perm = perm.applyUMask(UMASK);
+ }
+ assertEquals(perm, actual.getPermission());
+ }
+
+ private static String runLsr(final FsShell shell, String root, int returnvalue
+ ) throws Exception {
+ System.out.println("root=" + root + ", returnvalue=" + returnvalue);
+ final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+ final PrintStream out = new PrintStream(bytes);
+ final PrintStream oldOut = System.out;
+ final PrintStream oldErr = System.err;
+ System.setOut(out);
+ System.setErr(out);
+ final String results;
+ try {
+ assertEquals(returnvalue, shell.run(new String[]{"-lsr", root}));
+ results = bytes.toString();
+ } finally {
+ IOUtils.closeStream(out);
+ System.setOut(oldOut);
+ System.setErr(oldErr);
+ }
+ System.out.println("results:\n" + results);
+ return results;
+ }
+}
\ No newline at end of file
Added: hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCh.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCh.java?rev=719152&view=auto
==============================================================================
--- hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCh.java (added)
+++ hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistCh.java Wed Nov 19 17:31:39 2008
@@ -0,0 +1,511 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.InvalidInputException;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * A Map-reduce program to recursively change files properties
+ * such as owner, group and permission.
+ */
+public class DistCh extends DistTool {
+ static final String NAME = "distch";
+ static final String JOB_DIR_LABEL = NAME + ".job.dir";
+ static final String OP_LIST_LABEL = NAME + ".op.list";
+ static final String OP_COUNT_LABEL = NAME + ".op.count";
+
+ static final String USAGE = "java " + DistCh.class.getName()
+ + " [OPTIONS] <path:owner:group:permission>+ "
+
+ + "\n\nThe values of owner, group and permission can be empty."
+ + "\nPermission is a octal number."
+
+ + "\n\nOPTIONS:"
+ + "\n-f <urilist_uri> Use list at <urilist_uri> as src list"
+ + "\n-i Ignore failures"
+ + "\n-log <logdir> Write logs to <logdir>"
+ ;
+
+ private static final long OP_PER_MAP = 1000;
+ private static final int MAX_MAPS_PER_NODE = 20;
+ private static final int SYNC_FILE_MAX = 10;
+
+ static enum Counter { SUCCEED, FAIL }
+
+ static enum Option {
+ IGNORE_FAILURES("-i", NAME + ".ignore.failures");
+
+ final String cmd, propertyname;
+
+ private Option(String cmd, String propertyname) {
+ this.cmd = cmd;
+ this.propertyname = propertyname;
+ }
+ }
+
+ DistCh(Configuration conf) {
+ super(createJobConf(conf));
+ }
+
+ private static JobConf createJobConf(Configuration conf) {
+ JobConf jobconf = new JobConf(conf, DistCh.class);
+ jobconf.setJobName(NAME);
+ jobconf.setMapSpeculativeExecution(false);
+
+ jobconf.setInputFormat(ChangeInputFormat.class);
+ jobconf.setOutputKeyClass(Text.class);
+ jobconf.setOutputValueClass(Text.class);
+
+ jobconf.setMapperClass(ChangeFilesMapper.class);
+ jobconf.setNumReduceTasks(0);
+ return jobconf;
+ }
+
+ /** File operations. */
+ static class FileOperation implements Writable {
+ private Path src;
+ private String owner;
+ private String group;
+ private FsPermission permission;
+
+ FileOperation() {}
+
+ FileOperation(Path src, FileOperation that) {
+ this.src = src;
+ this.owner = that.owner;
+ this.group = that.group;
+ this.permission = that.permission;
+ checkState();
+ }
+
+ /**
+ * path:owner:group:permission
+ * e.g.
+ * /user/foo:foo:bar:700
+ */
+ FileOperation(String line) {
+ try {
+ String[] t = line.split(":", 4);
+ for(int i = 0; i < t.length; i++) {
+ if ("".equals(t[i])) {
+ t[i] = null;
+ }
+ }
+
+ src = new Path(t[0]);
+ owner = t[1];
+ group = t[2];
+ permission = t[3] == null? null:
+ new FsPermission(Short.parseShort(t[3], 8));
+
+ checkState();
+ }
+ catch(Exception e) {
+ throw (IllegalArgumentException)new IllegalArgumentException(
+ "line=" + line).initCause(e);
+ }
+ }
+
+ private void checkState() throws IllegalStateException {
+ if (owner == null && group == null && permission == null) {
+ throw new IllegalStateException(
+ "owner == null && group == null && permission == null");
+ }
+ }
+
+ static final FsPermission FILE_UMASK
+ = FsPermission.createImmutable((short)0111);
+
+ private boolean isDifferent(FileStatus original) {
+ if (owner != null && !owner.equals(original.getOwner())) {
+ return true;
+ }
+ if (group != null && !group.equals(original.getGroup())) {
+ return true;
+ }
+ if (permission != null) {
+ FsPermission orig = original.getPermission();
+ return original.isDir()? !permission.equals(orig):
+ !permission.applyUMask(FILE_UMASK).equals(orig);
+ }
+ return false;
+ }
+
+ void run(Configuration conf) throws IOException {
+ FileSystem fs = src.getFileSystem(conf);
+ if (permission != null) {
+ fs.setPermission(src, permission);
+ }
+ if (owner != null || group != null) {
+ fs.setOwner(src, owner, group);
+ }
+ }
+
+ /** {@inheritDoc} */
+ public void readFields(DataInput in) throws IOException {
+ this.src = new Path(Text.readString(in));
+ owner = DistTool.readString(in);
+ group = DistTool.readString(in);
+ permission = in.readBoolean()? FsPermission.read(in): null;
+ }
+
+ /** {@inheritDoc} */
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, src.toString());
+ DistTool.writeString(out, owner);
+ DistTool.writeString(out, group);
+
+ boolean b = permission != null;
+ out.writeBoolean(b);
+ if (b) {permission.write(out);}
+ }
+
+ /** {@inheritDoc} */
+ public String toString() {
+ return src + ":" + owner + ":" + group + ":" + permission;
+ }
+ }
+
+ /** Responsible for generating splits of the src file list. */
+ static class ChangeInputFormat implements InputFormat<Text, FileOperation> {
+ /** Do nothing. */
+ public void validateInput(JobConf job) {}
+
+ /**
+ * Produce splits such that each is no greater than the quotient of the
+ * total size and the number of splits requested.
+ * @param job The handle to the JobConf object
+ * @param numSplits Number of splits requested
+ */
+ public InputSplit[] getSplits(JobConf job, int numSplits
+ ) throws IOException {
+ final int srcCount = job.getInt(OP_COUNT_LABEL, -1);
+ final int targetcount = srcCount / numSplits;
+ String srclist = job.get(OP_LIST_LABEL, "");
+ if (srcCount < 0 || "".equals(srclist)) {
+ throw new RuntimeException("Invalid metadata: #files(" + srcCount +
+ ") listuri(" + srclist + ")");
+ }
+ Path srcs = new Path(srclist);
+ FileSystem fs = srcs.getFileSystem(job);
+
+ List<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
+
+ Text key = new Text();
+ FileOperation value = new FileOperation();
+ SequenceFile.Reader in = null;
+ long prev = 0L;
+ int count = 0; //count src
+ try {
+ for(in = new SequenceFile.Reader(fs, srcs, job); in.next(key, value); ) {
+ long curr = in.getPosition();
+ long delta = curr - prev;
+ if (++count > targetcount) {
+ count = 0;
+ splits.add(new FileSplit(srcs, prev, delta, (String[])null));
+ prev = curr;
+ }
+ }
+ }
+ finally {
+ in.close();
+ }
+ long remaining = fs.getFileStatus(srcs).getLen() - prev;
+ if (remaining != 0) {
+ splits.add(new FileSplit(srcs, prev, remaining, (String[])null));
+ }
+ LOG.info("numSplits=" + numSplits + ", splits.size()=" + splits.size());
+ return splits.toArray(new FileSplit[splits.size()]);
+ }
+
+ /** {@inheritDoc} */
+ public RecordReader<Text, FileOperation> getRecordReader(InputSplit split,
+ JobConf job, Reporter reporter) throws IOException {
+ return new SequenceFileRecordReader<Text, FileOperation>(job,
+ (FileSplit)split);
+ }
+ }
+
+ /** The mapper for changing files. */
+ static class ChangeFilesMapper
+ implements Mapper<Text, FileOperation, WritableComparable<?>, Text> {
+ private JobConf jobconf;
+ private boolean ignoreFailures;
+
+ private int failcount = 0;
+ private int succeedcount = 0;
+
+ private String getCountString() {
+ return "Succeeded: " + succeedcount + " Failed: " + failcount;
+ }
+
+ /** {@inheritDoc} */
+ public void configure(JobConf job) {
+ this.jobconf = job;
+ ignoreFailures=job.getBoolean(Option.IGNORE_FAILURES.propertyname,false);
+ }
+
+ /** Run a FileOperation */
+ public void map(Text key, FileOperation value,
+ OutputCollector<WritableComparable<?>, Text> out, Reporter reporter
+ ) throws IOException {
+ try {
+ value.run(jobconf);
+ ++succeedcount;
+ reporter.incrCounter(Counter.SUCCEED, 1);
+ } catch (IOException e) {
+ ++failcount;
+ reporter.incrCounter(Counter.FAIL, 1);
+
+ String s = "FAIL: " + value + ", " + StringUtils.stringifyException(e);
+ out.collect(null, new Text(s));
+ LOG.info(s);
+ } finally {
+ reporter.setStatus(getCountString());
+ }
+ }
+
+ /** {@inheritDoc} */
+ public void close() throws IOException {
+ if (failcount == 0 || ignoreFailures) {
+ return;
+ }
+ throw new IOException(getCountString());
+ }
+ }
+
+ private static void check(Configuration conf, List<FileOperation> ops
+ ) throws InvalidInputException {
+ List<Path> srcs = new ArrayList<Path>();
+ for(FileOperation op : ops) {
+ srcs.add(op.src);
+ }
+ DistTool.checkSource(conf, srcs);
+ }
+
+ private static List<FileOperation> fetchList(Configuration conf, Path inputfile
+ ) throws IOException {
+ List<FileOperation> result = new ArrayList<FileOperation>();
+ for(String line : readFile(conf, inputfile)) {
+ result.add(new FileOperation(line));
+ }
+ return result;
+ }
+
+ /** This is the main driver for recursively changing files properties. */
+ public int run(String[] args) throws Exception {
+ List<FileOperation> ops = new ArrayList<FileOperation>();
+ Path logpath = null;
+ boolean isIgnoreFailures = false;
+
+ try {
+ for (int idx = 0; idx < args.length; idx++) {
+ if ("-f".equals(args[idx])) {
+ if (++idx == args.length) {
+ System.out.println("urilist_uri not specified");
+ System.out.println(USAGE);
+ return -1;
+ }
+ ops.addAll(fetchList(jobconf, new Path(args[idx])));
+ } else if (Option.IGNORE_FAILURES.cmd.equals(args[idx])) {
+ isIgnoreFailures = true;
+ } else if ("-log".equals(args[idx])) {
+ if (++idx == args.length) {
+ System.out.println("logdir not specified");
+ System.out.println(USAGE);
+ return -1;
+ }
+ logpath = new Path(args[idx]);
+ } else if ('-' == args[idx].codePointAt(0)) {
+ System.out.println("Invalid switch " + args[idx]);
+ System.out.println(USAGE);
+ ToolRunner.printGenericCommandUsage(System.out);
+ return -1;
+ } else {
+ ops.add(new FileOperation(args[idx]));
+ }
+ }
+ // mandatory command-line parameters
+ if (ops.isEmpty()) {
+ throw new IllegalStateException("Operation is empty");
+ }
+ LOG.info("ops=" + ops);
+ LOG.info("isIgnoreFailures=" + isIgnoreFailures);
+ jobconf.setBoolean(Option.IGNORE_FAILURES.propertyname, isIgnoreFailures);
+ check(jobconf, ops);
+
+ try {
+ if (setup(ops, logpath)) {
+ JobClient.runJob(jobconf);
+ }
+ } finally {
+ try {
+ if (logpath == null) {
+ //delete log directory
+ final Path logdir = FileOutputFormat.getOutputPath(jobconf);
+ if (logdir != null) {
+ logdir.getFileSystem(jobconf).delete(logdir, true);
+ }
+ }
+ }
+ finally {
+ //delete job directory
+ final String jobdir = jobconf.get(JOB_DIR_LABEL);
+ if (jobdir != null) {
+ final Path jobpath = new Path(jobdir);
+ jobpath.getFileSystem(jobconf).delete(jobpath, true);
+ }
+ }
+ }
+ } catch(DuplicationException e) {
+ LOG.error("Input error:", e);
+ return DuplicationException.ERROR_CODE;
+ } catch(Exception e) {
+ LOG.error(NAME + " failed: ", e);
+ System.out.println(USAGE);
+ ToolRunner.printGenericCommandUsage(System.out);
+ return -1;
+ }
+ return 0;
+ }
+
+ /** Calculate how many maps to run. */
+ private static int getMapCount(int srcCount, int numNodes) {
+ int numMaps = (int)(srcCount / OP_PER_MAP);
+ numMaps = Math.min(numMaps, numNodes * MAX_MAPS_PER_NODE);
+ return Math.max(numMaps, 1);
+ }
+
+ private boolean setup(List<FileOperation> ops, Path log) throws IOException {
+ final String randomId = getRandomId();
+ JobClient jClient = new JobClient(jobconf);
+ Path jobdir = new Path(jClient.getSystemDir(), NAME + "_" + randomId);
+ LOG.info(JOB_DIR_LABEL + "=" + jobdir);
+
+ if (log == null) {
+ log = new Path(jobdir, "_logs");
+ }
+ FileOutputFormat.setOutputPath(jobconf, log);
+ LOG.info("log=" + log);
+
+ //create operation list
+ FileSystem fs = jobdir.getFileSystem(jobconf);
+ Path opList = new Path(jobdir, "_" + OP_LIST_LABEL);
+ jobconf.set(OP_LIST_LABEL, opList.toString());
+ int opCount = 0, synCount = 0;
+ SequenceFile.Writer opWriter = null;
+ try {
+ opWriter = SequenceFile.createWriter(fs, jobconf, opList, Text.class,
+ FileOperation.class, SequenceFile.CompressionType.NONE);
+ for(FileOperation op : ops) {
+ FileStatus srcstat = fs.getFileStatus(op.src);
+ if (srcstat.isDir() && op.isDifferent(srcstat)) {
+ ++opCount;
+ opWriter.append(new Text(op.src.toString()), op);
+ }
+
+ Stack<Path> pathstack = new Stack<Path>();
+ for(pathstack.push(op.src); !pathstack.empty(); ) {
+ for(FileStatus stat : fs.listStatus(pathstack.pop())) {
+ if (stat.isDir()) {
+ pathstack.push(stat.getPath());
+ }
+
+ if (op.isDifferent(stat)) {
+ ++opCount;
+ if (++synCount > SYNC_FILE_MAX) {
+ opWriter.sync();
+ synCount = 0;
+ }
+ Path f = stat.getPath();
+ opWriter.append(new Text(f.toString()), new FileOperation(f, op));
+ }
+ }
+ }
+ }
+ } finally {
+ opWriter.close();
+ }
+
+ checkDuplication(fs, opList, new Path(jobdir, "_sorted"), jobconf);
+ jobconf.setInt(OP_COUNT_LABEL, opCount);
+ LOG.info(OP_COUNT_LABEL + "=" + opCount);
+ jobconf.setNumMapTasks(getMapCount(opCount,
+ new JobClient(jobconf).getClusterStatus().getTaskTrackers()));
+ return opCount != 0;
+ }
+
+ private static void checkDuplication(FileSystem fs, Path file, Path sorted,
+ Configuration conf) throws IOException {
+ SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs,
+ new Text.Comparator(), Text.class, FileOperation.class, conf);
+ sorter.sort(file, sorted);
+ SequenceFile.Reader in = null;
+ try {
+ in = new SequenceFile.Reader(fs, sorted, conf);
+ FileOperation curop = new FileOperation();
+ Text prevsrc = null, cursrc = new Text();
+ for(; in.next(cursrc, curop); ) {
+ if (prevsrc != null && cursrc.equals(prevsrc)) {
+ throw new DuplicationException(
+ "Invalid input, there are duplicated files in the sources: "
+ + prevsrc + ", " + cursrc);
+ }
+ prevsrc = cursrc;
+ cursrc = new Text();
+ curop = new FileOperation();
+ }
+ }
+ finally {
+ in.close();
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner.run(new DistCh(new Configuration()), args));
+ }
+}
\ No newline at end of file
Added: hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistTool.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistTool.java?rev=719152&view=auto
==============================================================================
--- hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistTool.java (added)
+++ hadoop/core/trunk/src/tools/org/apache/hadoop/tools/DistTool.java Wed Nov 19 17:31:39 2008
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools;
+
+import java.io.BufferedReader;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InvalidInputException;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * An abstract class for distributed tool for file related operations.
+ */
+abstract class DistTool implements org.apache.hadoop.util.Tool {
+ protected static final Log LOG = LogFactory.getLog(DistTool.class);
+
+ protected JobConf jobconf;
+
+ /** {@inheritDoc} */
+ public void setConf(Configuration conf) {
+ if (jobconf != conf) {
+ jobconf = conf instanceof JobConf? (JobConf)conf: new JobConf(conf);
+ }
+ }
+
+ /** {@inheritDoc} */
+ public JobConf getConf() {return jobconf;}
+
+ protected DistTool(Configuration conf) {setConf(conf);}
+
+ private static final Random RANDOM = new Random();
+ protected static String getRandomId() {
+ return Integer.toString(RANDOM.nextInt(Integer.MAX_VALUE), 36);
+ }
+
+ /** Sanity check for source */
+ protected static void checkSource(Configuration conf, List<Path> srcs
+ ) throws InvalidInputException {
+ List<IOException> ioes = new ArrayList<IOException>();
+ for(Path p : srcs) {
+ try {
+ if (!p.getFileSystem(conf).exists(p)) {
+ ioes.add(new FileNotFoundException("Source "+p+" does not exist."));
+ }
+ }
+ catch(IOException e) {ioes.add(e);}
+ }
+ if (!ioes.isEmpty()) {
+ throw new InvalidInputException(ioes);
+ }
+ }
+
+ protected static String readString(DataInput in) throws IOException {
+ if (in.readBoolean()) {
+ return Text.readString(in);
+ }
+ return null;
+ }
+
+ protected static void writeString(DataOutput out, String s
+ ) throws IOException {
+ boolean b = s != null;
+ out.writeBoolean(b);
+ if (b) {Text.writeString(out, s);}
+ }
+
+ protected static List<String> readFile(Configuration conf, Path inputfile
+ ) throws IOException {
+ List<String> result = new ArrayList<String>();
+ FileSystem fs = inputfile.getFileSystem(conf);
+ BufferedReader input = null;
+ try {
+ input = new BufferedReader(new InputStreamReader(fs.open(inputfile)));
+ for(String line; (line = input.readLine()) != null;) {
+ result.add(line);
+ }
+ } finally {
+ input.close();
+ }
+ return result;
+ }
+
+ /** An exception class for duplicated source files. */
+ public static class DuplicationException extends IOException {
+ private static final long serialVersionUID = 1L;
+ /** Error code for this exception */
+ public static final int ERROR_CODE = -2;
+ DuplicationException(String message) {super(message);}
+ }
+}
\ No newline at end of file